Skip to content

数据加载

数据预处理完成后,需要构建一个数据加载器进行分批的数据加载和读取。在NextRec中对Pytorch的DataLoader进行了封装,提供了RecDataLoader作为统一的推荐系统数据加载接口,支持内存数据和大规模流式数据。

通过from nextrec.data.dataloader import RecDataLoader来进行调用。

核心能力

RecDataLoader提供了对多种数据格式的支持,包括DataFrame、Dict、文件路径(csv/parquet),并针对不同的场景,提供了内存加载与流式加载两种方式,以及基于分布式/多进程训练的自动数据分片。

示例代码

提供了两种加载模式的示例代码:

内存加载模式

适合中小规模数据,可直接加载到内存:

python
from nextrec.data.dataloader import RecDataLoader

rec_loader = RecDataLoader(
    dense_features=dense_features,
    sparse_features=sparse_features,
    sequence_features=sequence_features,
    target=["label"],
    id_columns=['user_id'],
    processor=fitted_processor    # 支持加载已经拟合过的DataProcessor,在训练同时对数据做预处理变换
)

# 从 DataFrame 创建
loader = rec_loader.create_dataloader(
    data=df,
    batch_size=256,
    shuffle=True,
    streaming=False,
    num_workers=4

)

流式加载模式

适合大规模数据,按块流式读取:

python
from nextrec.data.dataloader import RecDataLoader

rec_loader = RecDataLoader(
    dense_features=dense_features,
    sparse_features=sparse_features,
    sequence_features=sequence_features,
    target=["label"],
    id_columns=['user_id'],
    processor=fitted_processor    # 支持加载已经拟合过的DataProcessor,在训练同时对数据做预处理变换
)

loader = rec_loader.create_dataloader(
    data="/path/to/data_dir",       # 文件或目录
    streaming=True,                 # 开启流式
    chunk_size=20000,               # 加载数据时每个 chunk 的大小,也就是流式训练下的一个batch size
    num_workers=4
)

参数说明

RecDataLoader对外提供了两个方法:初始化方法init与构造方法create_dataloader

初始化

通过实例化RecDataLoader传入参数,为需要的特征进行注册。

参数说明
dense_features稠密连续特征列表 List[DenseFeature]
sparse_features稀疏特征列表 List[SparseFeature]
sequence_features序列特征列表 List[SequenceFeature]
target目标列名,如 "click"["click", "buy"](多任务)
id_columns数据中的id列
processor[可选] fit后的DataProcessor,有值时会在加载数据同时进行数据预处理

构造方法

通过使用RecDataLoader.create_dataloader方法,来返回标准的Pytorch DataLoader实例。

参数说明
batch_size批次大小
streaming是否开启流式模式
chunk_size流式模式时每个 chunk 的行数
num_workers数据加载线程数
prefetch_factor预取因子
shuffle是否 shuffle
shard_rank[通常无需主动设置] 分布式训练分片 rank
shard_count[通常无需主动设置] 分布式训练分片数
sampler[通常无需主动设置] 分布式sampler

NextRec CLI 集成

在命令行工具NextRec CLI中,通过修改对应的配置文件,能够调整数据加载器的参数。例如在train.yaml中配置data以及dataloader参数

yaml
data:
  train_path: ./data/train
  streaming: true        # 对应 streaming=True

dataloader:
  batch_size: 1024
  shuffle: true
  chunk_size: 20000       
  num_workers: 6
  prefetch_factor: 2

完整示例

以下是一个RecDataLoader的简单完整使用示例代码:

python
from nextrec.data.dataloader import RecDataLoader
from nextrec.basic.features import DenseFeature, SparseFeature, SequenceFeature

# 1. 定义特征
dense_features = [DenseFeature("age")]
sparse_features = [
    SparseFeature("user_id", vocab_size=100000, embedding_dim=32),
    SparseFeature("item_id", vocab_size=200000, embedding_dim=32),
]
sequence_features = [
    SequenceFeature("hist_item_ids", vocab_size=200000, embedding_dim=32, max_len=50)
]

# 2. 创建 DataLoader
rec_loader = RecDataLoader(
    dense_features=dense_features,
    sparse_features=sparse_features,
    sequence_features=sequence_features,
    target=["label"]
)

# 3. 创建 loader(支持 DataFrame 或文件路径)
train_loader = rec_loader.create_dataloader(
    data=train_df,  # 或文件路径,例如 "/mnt/home/train_df"
    batch_size=256,
    shuffle=True,
    num_workers=4
)

下一步

基于 MIT 许可证开源