Skip to content

基类模型的生命周期

NextRec里几乎所有模型都继承自基类模型(nextrec.basic.model.BaseModel),它封装了模型训练/评估/推理等通用方法。要构建一个推荐算法模型,需要经过初始化模型,模型定义,编译,训练,验证,推理环节

初始化

以下是一个简单的DeepFM模型的初始化定义流程,在此之前我们需要定义特征,然后初始化模型参数。

python
from nextrec.basic.features import DenseFeature, SparseFeature
from nextrec.models.ranking.deepfm import DeepFM

# 定义特征
dense_features = [DenseFeature("age"), DenseFeature("income")]
sparse_features = [
    SparseFeature("user_id", vocab_size=1000000, embedding_dim=32),
    SparseFeature("item_id", vocab_size=2000000, embedding_dim=32),
]

# 创建模型
model = DeepFM(
    dense_features=dense_features,
    sparse_features=sparse_features,
    mlp_params={                    # DNN 部分的 MLP 参数
        "hidden_dims": [256, 128],  # 隐藏层维度
        "activation": "relu",       # 激活函数
        "dropout": 0.2,             # Dropout 比例
    },    
    target="click",
    task="binary",
    device="cpu",
    embedding_l2_reg=1e-5,
    dense_l2_reg=1e-4,
    session_id="deepfm_exp001",
)

参数说明

参数说明
dense_features稠密连续特征列表 List[DenseFeature]
sparse_features稀疏特征列表 List[SparseFeature]
sequence_features序列特征列表 List[SequenceFeature]
target目标列名,如 "click"["click", "buy"](多任务)
id_columns用于计算 GAUC 或推理时透传的 ID 列
task任务类型:"binary""regression"
training_mode训练模式:"pointwise""pairwise""listwise"
embedding_l1_reg嵌入向量参数 L1 正则化强度
dense_l1_reg密集向量参数 L1 正则化强度
embedding_l2_reg嵌入向量参数 L2 正则化强度,如 1e-5
dense_l2_reg密集向量参数 L2 正则化强度,如 1e-4
device计算设备:"cpu""cuda:0""mps"
session_id实验会话名,用于日志管理
distributed[无需主动设置] 是否启用 DistributedDataParallel
rank[无需主动设置] 全局排名,默认从环境变量 RANK 获取
world_size[无需主动设置] 进程数,默认从环境变量 WORLD_SIZE 获取
local_rank[无需主动设置] 本地设备的RANK
ddp_find_unused_parameters[无需主动设置] DDP 模型中是否存在未使用参数

配置训练参数

模型初始化以后,需要为训练配置一些必要的参数,例如优化器,学习率调度器,损失函数等。简单示例如下:

python
# 单任务训练
model.compile(
    optimizer="adam",
    optimizer_params={"lr": 0.001, "weight_decay": 1e-5},
    loss="binary_crossentropy"
)

# 带学习率调度
model.compile(
    optimizer="adam",
    optimizer_params={"lr": 0.001},
    scheduler="cosine",
    scheduler_params={"T_max": 10, "eta_min": 1e-6}
)

# 带warmup的学习率调度
model.compile(
    optimizer="adam",
    optimizer_params={"lr": 0.001},
    scheduler="cosine",
    scheduler_params={"T_max": 10},
    warmup={"epochs": 2, "start_factor": 0.1, "end_factor": 1.0}
)

# 多任务训练(带损失权重)
model.compile(
    optimizer="adam",
    optimizer_params={"lr": 0.001},
    loss="binary_crossentropy",
    loss_weights={"click": 1.0, "buy": 0.5}
)

# 使用 GradNorm 动态权重
model.compile(
    optimizer="adam",
    optimizer_params={"lr": 0.001},
    loss="binary_crossentropy",
    loss_weights={"click": 1.0, "buy": 1.0, "method": "grad_norm", "alpha": 1.5}
)

参数说明

参数说明
optimizer优化器:"adam","sgd","adamw","adagrad","rmsprop" 或pytorch优化器实例
optimizer_params优化器参数,如 {"lr": 0.001, "weight_decay": 1e-5}
scheduler学习率调度器:"step","cosine"
scheduler_params调度器参数,如 {"step_size": 10, "gamma": 0.1}
warmup可选 warmup 配置,支持 False/None/True/dictTrue 使用默认值:{"epochs":1,"start_factor":0.1,"end_factor":1.0}dict 可显式配置 enabled/epochs/start_factor/end_factor。即使不配置 scheduler 也可单独使用 warmup(结束后保持 base_lr)。
loss损失函数:"bce","weighted_bce","mse","focal","bpr"
loss_params损失函数参数,多任务时可为列表,其中为每个损失函数的参数
loss_weights多任务损失权重,如 {"click": 1.0, "buy": 0.5}时代表click任务的样本权重为1,buy任务的样本权重为0.5;使用时 {"method": "grad_norm", "alpha": 1.5, "lr": 0.025} 启用 GradNorm 进行动态权重计算
ignore_label计算损失时忽略的标签值,默认 -1 意味着当模型看到标签为-1的样本时不会计算损失

模型训练

准备工作完成以后就可以开始训练。示例代码如下:

python
# 简单训练
model.fit(
    train_data=train_df,
    epochs=10,
    batch_size=256,
    metrics=["auc", "logloss"]
)

# 带验证集和早停
model.fit(
    train_data=train_df,
    valid_data=valid_df,
    epochs=10,
    batch_size=256,
    metrics=["auc", "gauc", "logloss"],
    user_id_column="user_id",
    early_stop_patience=5,
    early_stop_monitor_task="click"
)

# 使用 W&B
model.fit(
    train_data=train_df,
    epochs=10,
    use_wandb=True,
    wandb_api="your-api-key",
    wandb_kwargs={"project": "nextrec-exp"}
)

参数说明

参数说明
train_data训练数据:DataFrame / Dict / DataLoader
valid_data验证数据:DataFrame / Dict / DataLoader
metrics评估指标,如 ['auc', 'logloss']
epochs训练轮数
shuffle是否打乱训练数据
batch_size批次大小
user_id_columnGAUC 计算所需的用户 ID 列
valid_split当未输入valid_data时,从train_data进行划分出验证集的比例,如 0.1
early_stop_patience早停轮数
early_stop_monitor_task早停参考的任务,会根据指定任务和 metrics 中第一个指标进行早停,例如 metrics=['auc', ...] 时监控 val_auc_{task}
num_workersPytorch DataLoader 进程数,提高以加速数据加载
use_tensorboard是否启用 tensorboard 日志
use_wandb是否启用 Weights & Biases 日志
use_swanlab是否启用 SwanLab 日志
wandb_apiW&B API 密钥
swanlab_apiSwanLab API 密钥
wandb_kwargswandb.init 的额外参数
swanlab_kwargsswanlab.init 的额外参数,例如{"project": "nextrec", "name": nextrec_model}
auto_ddp_sampler[无需主动设置] 自动附加 DistributedSampler
log_interval每 N 轮记录验证指标
note训练运行的备注
summary_sections打印的摘要部分,如 ["feature", "model", "train", "data"]

说明:

  • early_stop_monitor_task 仅在多任务训练下生效。
  • 当前早停监控的指标固定取 metrics 中的第一个指标。

训练产物

开始训练后,会在命令执行的当前路径下生成nextrec_logs文件夹,并在其中的session_id 路径下生成一系列训练产物,包括日志等

nextrec_logs/
└── deepfm_exp001/
    ├── checkpoint_deepfm.pt          # 模型检查点
    ├── best_deepfm.pt                # 最佳模型
    ├── metrics.json                  # 训练指标
    ├── predictions/                  # 预测结果
    └── processor.pkl                 # 数据处理器

模型评估

在模型训练的每一轮里,都会计算评估指标,这是通过基类模型的evaluate方法实现的。在训练完成后,也可以单独调用这个方法,在其他数据集上进行评估。

python
# 基础评估
metrics = model.evaluate(
    valid_df,
    metrics=["auc", "logloss"]
)
print(metrics)  # {'auc': 0.8532, 'logloss': 0.3421}

# 带 GAUC
metrics = model.evaluate(
    valid_df,
    metrics=["auc", "gauc", "logloss", "precision@10"],
    user_id_column="user_id"
)

# 多任务评估
metrics = model.evaluate(
    valid_df,
    metrics={
        "click": ["auc", "logloss"],
        "buy": ["auc", "logloss"]
    }
)

# 按场景分组评估
metrics = model.evaluate(
    valid_df,
    metrics=["auc", "precision", "recall"],
    group_by="product"
)
print(metrics["overall"])
print(metrics["grouped"])

参数说明

参数说明
data评估数据:DataFrame / Dict / DataLoader
metrics指标名称,如 ['auc', 'logloss']
batch_size批次大小
user_idsGAUC 计算的用户 ID 列名
user_id_column用户 ID 列名
group_by按列分组评估,可传单列或列名列表
num_workersPytorch DataLoader 进程数,提高以加速数据加载
thresholds二分类任务的阈值
show_data_summary是否记录数据分布的统计摘要
show_confusion_matrix是否输出混淆矩阵

模型推理

模型训练完后,可以调用模型的predict方法进行推理。示例代码如下:

python
# 简单预测
predictions = model.predict(test_df)

# 预测并保存
predictions = model.predict(
    test_df,
    save_path="./predictions.csv",
    save_format="csv"
)

# 返回 numpy 数组
predictions = model.predict(test_df, return_dataframe=False)
# {'click': array([0.85, 0.12, ...])}

参数说明

参数说明
data预测数据:文件路径 / 字典 / DataFrame / DataLoader
batch_size批次大小
save_path保存预测结果的路径
save_format保存格式:"csv""parquet"
return_dataframe是否返回 DataFrame,否则返回 NumPy 数组
stream_chunk_size大数据集流式处理时每块数据的行数,需要根据机器内存压力酌情设置
num_workersPytorch DataLoader 进程数,提高以加速数据加载
prefetch_factorPytorch Dataloader预取数据的批次数,提高以加速数据加载
num_processes流式文件推理的进程数,用于多进程推理
processor可选的 DataProcessor 用于转换输入数据
profiler[Bool] 可选的管道阶段性能分析器,设置为True时会输出推理流程中各环节计算时间

ONNX 导出与推理

ONNX(Open Neural Network Exchange)由Microsoft和Facebook共同发起,是一个开放的深度学习模型交换格式,用于在不同的深度学习框架之间导出、共享和部署模型。使用ONNX的官方推理引擎进行推理,会比原框架推理更快,适用于生产环境的部署。在NextRec中支持将模型导出为onnx格式并进行推理。

python
# 导出 ONNX
onnx_path = model.export_onnx(
    save_path="./model.onnx",
    batch_size=1
)

# 使用 ONNX 推理
import nextrec.utils.onnx as onnx

predictions = onnx.predict_onnx(
    onnx_path="./model.onnx",
    data=test_df,
    batch_size=512,
    return_dataframe=True
)

完整示例

python
import pandas as pd
from nextrec.models.ranking.deepfm import DeepFM
from nextrec.basic.features import DenseFeature, SparseFeature

# 1. 准备数据
train_df = pd.read_csv("train.csv")

# 2. 定义特征
dense_features = [DenseFeature("age")]
sparse_features = [
    SparseFeature("user_id", vocab_size=100000, embedding_dim=32),
    SparseFeature("item_id", vocab_size=200000, embedding_dim=32),
]

# 3. 创建模型
model = DeepFM(
    dense_features=dense_features,
    sparse_features=sparse_features,
    target="click",
    task="binary",
    device="cuda:0"
)

# 4. 配置训练
model.compile(
    optimizer="adam",
    optimizer_params={"lr": 0.001},
    loss="binary_crossentropy"
)

# 5. 训练
model.fit(
    train_data=train_df,
    epochs=10,
    batch_size=256,
    metrics=["auc", "logloss"],
    early_stop_patience=5
)

# 6. 评估
metrics = model.evaluate(valid_df, metrics=["auc", "logloss"])
print(f"AUC: {metrics['auc']:.4f}")

# 7. 推理
predictions = model.predict(test_df)

下一步

基于 MIT 许可证开源