在批量数据处理任务中,同时使用Polars加载多个同格式数据文件,并为合并后的数据集添加自定义元数据列是常见需求,这类操作可以帮我们快速区分不同来源的数据,后续分析时也能更方便做分组统计。

准备工作
首先需要确保已经安装Polars库,如果未安装可以通过包管理工具完成安装:
# 安装Polars库 pip install polars
本文示例将使用CSV格式的文件作为演示,假设我们有三个同结构的销售数据CSV文件,分别存放在data目录下,文件内容都包含order_id、product_name、sale_amount三个字段。
使用Polars批量加载多文件
加载同目录下的所有目标文件
Polars提供了scan_csv方法配合glob路径匹配,可以高效批量读取多个CSV文件,相比逐个读取再合并的方式,这种方式在内存占用和读取速度上都有优势。
import polars as pl
import glob
# 匹配data目录下所有CSV文件
file_paths = glob.glob("data/*.csv")
# 批量扫描并读取所有CSV文件,合并为一个LazyFrame
lazy_df = pl.scan_csv(file_paths)
# 执行计算,转换为普通DataFrame
df = lazy_df.collect()
print(df.head())
处理不同分隔符的文件
如果文件不是CSV格式,或者使用了其他分隔符,可以更换对应的扫描方法,比如读取TSV文件可以使用scan_csv并指定分隔符参数:
# 读取TSV格式的多文件
lazy_tsv_df = pl.scan_csv(
glob.glob("data/*.tsv"),
separator="t"
)
tsv_df = lazy_tsv_df.collect()
添加自定义元数据列
添加文件来源元数据列
我们可以在加载过程中为每个文件的记录添加对应的文件名作为元数据,方便后续区分数据来源。通过with_columns方法结合pl.lit可以实现这个需求。
# 逐个读取文件,添加文件名元数据后合并
df_list = []
for file_path in file_paths:
# 读取单个文件
temp_df = pl.read_csv(file_path)
# 添加自定义元数据列:文件名
temp_df = temp_df.with_columns(
pl.lit(file_path.split("/")[-1]).alias("source_file")
)
df_list.append(temp_df)
# 合并所有DataFrame
result_df = pl.concat(df_list)
print(result_df.columns)
print(result_df.head())
添加加载时间元数据列
除了文件来源,还可以添加数据加载时间作为元数据,方便追踪数据的处理节点。这里使用pl.datetime函数获取当前时间。
from datetime import datetime
# 添加加载时间元数据列
result_df = result_df.with_columns(
pl.lit(datetime.now().strftime("%Y-%m-%d %H:%M:%S")).alias("load_time")
)
print(result_df["load_time"].unique())
添加自定义标签元数据列
如果有自定义的标签需要添加,比如标记数据所属的业务线,也可以直接通过pl.lit传入固定值实现。
# 添加自定义业务线标签元数据
result_df = result_df.with_columns(
pl.lit("sales_data").alias("business_line")
)
print(result_df["business_line"].unique())
性能优化建议
当处理的文件数量较多或者单个文件体积较大时,可以优先使用LazyFrame的延迟计算特性,避免提前将所有数据加载到内存中。另外如果需要添加的元数据是基于文件内容的,可以先完成元数据列添加再进行合并,减少不必要的内存开销。
以下是结合延迟计算和元数据添加的优化示例:
# 优化后的批量加载并添加元数据示例
df_list = []
for file_path in file_paths:
# 使用LazyFrame延迟读取
lazy_temp = pl.scan_csv(file_path)
# 延迟添加元数据列
lazy_temp = lazy_temp.with_columns(
pl.lit(file_path.split("/")[-1]).alias("source_file"),
pl.lit("sales_data").alias("business_line")
)
df_list.append(lazy_temp.collect())
# 合并结果
optimized_df = pl.concat(df_list)
print(optimized_df.shape)
常见问题说明
- 如果文件结构不一致,批量加载时会报错,需要先统一所有文件的字段结构,或者单独处理结构特殊的文件。
- 添加元数据列时,pl.lit传入的值需要和DataFrame的行数匹配,固定值场景下不需要额外处理,因为Polars会自动广播。
- 如果文件路径包含中文,需要确保Python环境的编码设置正确,避免读取时出现乱码问题。