协同过滤是推荐系统中最经典的算法分支,核心思想是通过分析用户的历史行为数据,找到兴趣相似的用户或者相似的商品,从而为目标用户生成个性化推荐。传统的协同过滤算法在处理大规模稀疏数据时效果会明显下降,而矩阵分解算法通过将用户-商品评分矩阵分解为低维的用户隐向量和商品隐向量,能有效缓解稀疏性问题,提升推荐准确率。

矩阵分解算法核心原理
矩阵分解的核心是将原始的m×n用户-商品评分矩阵R,分解为两个低维矩阵的乘积:R ≈ P × Q^T,其中P是m×k的用户隐向量矩阵,Q是n×k的商品隐向量矩阵,k是隐向量的维度,通常远小于m和n。用户u对商品i的预测评分可以通过两个隐向量的内积计算得到:r̂_ui = p_u · q_i。
模型训练的目标是最小化预测评分和实际评分的误差,通常使用均方误差作为损失函数,同时加入正则化项防止过拟合,损失函数公式如下:
L = Σ(u,i)∈K (r_ui - p_u·q_i)² + λ(||p_u||² + ||q_i||²)
其中K是已知评分的集合,λ是正则化系数。
Python实现步骤
1. 准备数据集
我们使用MovieLens的简化版数据集作为示例,数据集包含用户ID、电影ID、评分三个核心字段,下面是数据加载的代码:
import numpy as np
import pandas as pd
# 加载数据,假设数据文件为ratings.csv,包含user_id,movie_id,rating三列
data = pd.read_csv("ratings.csv")
# 构建用户-商品评分矩阵,缺失值用0填充
user_ids = data["user_id"].unique()
movie_ids = data["movie_id"].unique()
user_index = {uid: idx for idx, uid in enumerate(user_ids)}
movie_index = {mid: idx for idx, mid in enumerate(movie_ids)}
R = np.zeros((len(user_ids), len(movie_ids)))
for row in data.itertuples():
u_idx = user_index[row.user_id]
m_idx = movie_index[row.movie_id]
R[u_idx, m_idx] = row.rating
2. 实现矩阵分解模型
我们使用随机梯度下降法来优化损失函数,更新用户隐向量和商品隐向量:
class MatrixFactorization:
def __init__(self, R, k=10, learning_rate=0.01, reg_param=0.01, epochs=50):
self.R = R # 原始评分矩阵
self.k = k # 隐向量维度
self.lr = learning_rate # 学习率
self.reg = reg_param # 正则化系数
self.epochs = epochs # 迭代次数
self.m, self.n = R.shape # 用户数、商品数
# 初始化用户隐向量和商品隐向量,使用随机小值
self.P = np.random.normal(scale=0.1, size=(self.m, self.k))
self.Q = np.random.normal(scale=0.1, size=(self.n, self.k))
def fit(self):
# 获取所有已知评分的位置
known_ratings = np.where(self.R != 0)
for epoch in range(self.epochs):
total_error = 0
for u, i in zip(known_ratings[0], known_ratings[1]):
actual_r = self.R[u, i]
pred_r = np.dot(self.P[u], self.Q[i])
error = actual_r - pred_r
total_error += error ** 2
# 更新隐向量
self.P[u] += self.lr * (error * self.Q[i] - self.reg * self.P[u])
self.Q[i] += self.lr * (error * self.P[u] - self.reg * self.Q[i])
# 计算带正则化的总损失
loss = total_error + self.reg * (np.sum(self.P ** 2) + np.sum(self.Q ** 2))
if (epoch + 1) % 10 == 0:
print(f"Epoch {epoch+1}, Loss: {loss:.4f}")
def predict(self, user_idx, item_idx):
# 预测指定用户和商品的评分
return np.dot(self.P[user_idx], self.Q[item_idx])
def recommend(self, user_idx, top_n=5):
# 为用户生成top_n推荐
scores = np.dot(self.P[user_idx], self.Q.T)
# 过滤已经评分过的商品
rated_items = np.where(self.R[user_idx] != 0)[0]
scores[rated_items] = -1
# 取评分最高的top_n个商品
top_items = np.argsort(scores)[::-1][:top_n]
return top_items
3. 模型训练与推荐
初始化模型并训练,之后为目标用户生成推荐:
# 初始化模型,隐向量维度设为20,迭代100次
model = MatrixFactorization(R, k=20, learning_rate=0.005, reg_param=0.02, epochs=100)
model.fit()
# 为用户0生成5个推荐
user_idx = 0
recommend_items = model.recommend(user_idx, top_n=5)
print(f"为用户{user_ids[user_idx]}推荐的5个商品索引:{recommend_items}")
# 如果需要映射回原始商品ID
original_movie_ids = [movie_ids[idx] for idx in recommend_items]
print(f"对应原始商品ID:{original_movie_ids}")
模型评估方法
我们可以使用均方根误差(RMSE)和平均绝对误差(MAE)来评估模型的预测效果,将数据集划分为训练集和测试集后计算指标:
from sklearn.model_selection import train_test_split
# 将评分数据拆分为训练集和测试集
def split_data(data, test_ratio=0.2):
train_data = data.copy()
test_data = pd.DataFrame(columns=data.columns)
for user_id in data["user_id"].unique():
user_ratings = data[data["user_id"] == user_id]
if len(user_ratings) < 2:
train_data = train_data[train_data["user_id"] != user_id]
test_data = pd.concat([test_data, user_ratings])
continue
train, test = train_test_split(user_ratings, test_size=test_ratio, random_state=42)
train_data = train_data[train_data["user_id"] != user_id]
train_data = pd.concat([train_data, train])
test_data = pd.concat([test_data, test])
return train_data, test_data
train_data, test_data = split_data(data)
# 用训练数据重新构建评分矩阵并训练模型
train_user_index = {uid: idx for idx, uid in enumerate(train_data["user_id"].unique())}
train_movie_index = {mid: idx for idx, mid in enumerate(train_data["movie_id"].unique())}
R_train = np.zeros((len(train_user_index), len(train_movie_index)))
for row in train_data.itertuples():
u_idx = train_user_index[row.user_id]
m_idx = train_movie_index[row.movie_id]
R_train[u_idx, m_idx] = row.rating
model_eval = MatrixFactorization(R_train, k=20, learning_rate=0.005, reg_param=0.02, epochs=100)
model_eval.fit()
# 计算测试集的RMSE和MAE
rmse = 0
mae = 0
count = 0
for row in test_data.itertuples():
u_idx = train_user_index.get(row.user_id)
m_idx = train_movie_index.get(row.movie_id)
if u_idx is None or m_idx is None:
continue
pred_r = model_eval.predict(u_idx, m_idx)
actual_r = row.rating
rmse += (pred_r - actual_r) ** 2
mae += abs(pred_r - actual_r)
count += 1
rmse = np.sqrt(rmse / count)
mae = mae / count
print(f"测试集RMSE:{rmse:.4f},MAE:{mae:.4f}")
算法优化方向
基础的矩阵分解算法还有很多可优化的空间,比如加入用户偏置和商品偏置,考虑不同用户的评分习惯差异和不同商品的基础评分差异;或者使用交替最小二乘法(ALS)替代随机梯度下降,更适合并行计算;还可以结合内容特征,实现混合推荐系统,进一步提升推荐效果。