在处理大规模流式数据时,Polars凭借其基于Rust实现的底层引擎,能够提供远超传统Python数据处理工具的性能表现。很多场景下我们需要逐步接收流式数据并构建DataFrame,之后还需要将处理结果持久化到磁盘供后续使用,掌握高效的操作方法能够大幅降低资源消耗。

流式构建Polars DataFrame的高效方法
流式场景下最常见的需求是逐步接收数据片段并合并为完整的DataFrame,传统逐行追加的方式会触发大量内存重新分配,效率极低。推荐使用collect_all配合scan_csv或者使用列表缓存批量合并的方式。
批量缓存合并方案
先将流式接收的数据片段缓存到列表中,达到指定批次大小后再一次性合并,减少合并次数:
import polars as pl
def stream_build_dataframe(stream_generator, batch_size=10000):
batch_list = []
result_df = None
for data_chunk in stream_generator:
# data_chunk为字典列表或者小DataFrame
chunk_df = pl.DataFrame(data_chunk)
batch_list.append(chunk_df)
# 达到批次大小后合并
if len(batch_list) >= batch_size:
merged_batch = pl.concat(batch_list)
if result_df is None:
result_df = merged_batch
else:
result_df = pl.concat([result_df, merged_batch])
batch_list = []
# 处理剩余未合并的批次
if batch_list:
merged_batch = pl.concat(batch_list)
if result_df is None:
result_df = merged_batch
else:
result_df = pl.concat([result_df, merged_batch])
return result_df
# 模拟流式数据生成器
def mock_stream_generator(total=50000, chunk_size=1000):
for i in range(0, total, chunk_size):
yield [{"id": j, "value": j * 2} for j in range(i, min(i + chunk_size, total))]
# 执行流式构建
final_df = stream_build_dataframe(mock_stream_generator())
print(final_df.shape)
基于LazyFrame的流式扫描方案
如果流式数据本身就是按文件分片的,可以使用LazyFrame逐步扫描合并,充分利用Polars的延迟计算特性:
import polars as pl
import glob
def lazy_stream_build(file_pattern):
# 获取所有分片文件路径
file_list = sorted(glob.glob(file_pattern))
# 逐个扫描文件并合并LazyFrame
lazy_df = pl.scan_csv(file_list[0])
for file_path in file_list[1:]:
lazy_df = pl.concat([lazy_df, pl.scan_csv(file_path)])
# 触发计算返回最终DataFrame
return lazy_df.collect()
# 假设存在多个分片的csv文件
# final_df = lazy_stream_build("data_chunk_*.csv")
Polars DataFrame持久化的最佳实践
持久化时需要结合后续使用场景选择格式,不同格式的读写速度、压缩率、兼容性差异较大,以下是常见格式的对比:
| 持久化格式 | 读写速度 | 压缩率 | 适用场景 |
|---|---|---|---|
| Parquet | 快 | 高 | 大规模数据长期存储、跨工具数据交换 |
| CSV | 慢 | 低 | 小批量数据、需要人工查看内容的场景 |
| IPC(Feather V2) | 极快 | 中 | Polars/Arrow生态内快速读写、临时数据存储 |
| JSON | 慢 | 低 | 嵌套结构数据、与其他JSON接口对接的场景 |
推荐持久化操作示例
优先选择Parquet格式存储大规模数据,开启合适的压缩算法平衡速度和存储空间:
import polars as pl
# 构建示例DataFrame
df = pl.DataFrame({
"id": range(100000),
"name": [f"user_{i}" for i in range(100000)],
"score": [i % 100 for i in range(100000)]
})
# 持久化为Parquet格式,使用zstd压缩,压缩级别为3
df.write_parquet(
"output_data.parquet",
compression="zstd",
compression_level=3,
statistics=True
)
# 读取Parquet文件,使用多线程加速
read_df = pl.read_parquet("output_data.parquet", use_pyarrow=False)
print(read_df.head())
如果是Polars生态内的临时数据存储,优先选择IPC格式,读写速度比Parquet更快:
# 持久化为IPC格式
df.write_ipc("temp_data.ipc")
# 读取IPC文件
ipc_df = pl.read_ipc("temp_data.ipc")
print(ipc_df.shape)
注意事项
- 流式构建时尽量避免单个批次数据过小,否则合并次数过多会抵消批量合并的优势,建议批次大小不低于1000行。
- 持久化Parquet时如果数据包含大量字符串类型,可以开启字典编码进一步降低存储空间。
- 读取持久化文件时尽量使用Polars原生读取接口,不要走PyArrow中转,能够提升20%以上的读取速度。
- 如果流式数据存在 schema 变化的情况,需要在合并前统一字段类型,避免合并失败。