Kedro作为模块化数据管道框架,提供了标准化的数据目录管理机制,而Streamlit可以快速构建交互式数据Web应用,将二者集成能够实现数据管道与应用展示的无缝衔接。动态数据目录作为Kedro的重要特性,支持根据运行参数、环境变量等动态调整数据读写路径,非常适合在Web应用中适配不同用户、不同场景的数据需求。
核心概念说明
在正式集成之前,需要先明确几个核心概念:
- Kedro数据目录:用于管理数据管道中所有数据的读写配置,支持多种数据源类型,默认通过
conf/base/catalog.yml文件定义静态数据路径。 - 动态数据目录:指在Kedro运行时,通过参数、环境变量等方式动态修改数据目录的读写路径,而非使用固定的配置文件路径。
- Streamlit会话状态:Streamlit提供的会话级状态管理功能,可以在用户会话期间保存临时数据,适合存储动态配置参数。
环境准备与基础集成
首先完成基础环境搭建,安装必要的依赖包:
# 安装kedro和streamlit pip install kedro streamlit # 创建kedro项目 kedro new --starter=pandas-iris # 进入项目目录 cd <你的项目名>
基础集成只需要将Kedro的上下文加载到Streamlit应用中即可,示例代码如下:
import streamlit as st
from kedro.framework.session import KedroSession
from kedro.framework.startup import bootstrap_project
from pathlib import Path
# 项目根路径
project_path = Path.cwd()
# 启动kedro项目
bootstrap_project(project_path)
# 创建kedro会话
with KedroSession.create(project_path=project_path) as session:
context = session.load_context()
st.write("Kedro上下文加载成功")
动态数据目录实现方案
要实现动态数据目录,核心是修改Kedro数据目录的加载逻辑,支持从Streamlit的输入参数中获取路径配置。以下是两种常用的实现方式:
方式一:通过运行时参数动态修改
在Kedro会话创建时,传入自定义的参数,覆盖默认的数据目录配置:
import streamlit as st
from kedro.framework.session import KedroSession
from kedro.framework.startup import bootstrap_project
from pathlib import Path
import yaml
# 项目根路径
project_path = Path.cwd()
bootstrap_project(project_path)
# streamlit侧边栏输入动态数据路径
st.sidebar.header("动态数据目录配置")
data_input_path = st.sidebar.text_input("输入数据路径", value="data/01_raw")
data_output_path = st.sidebar.text_input("输出数据路径", value="data/02_intermediate")
# 自定义动态目录配置
def get_dynamic_catalog():
# 基础目录配置
base_catalog = {
"iris_data": {
"type": "pandas.CSVDataSet",
"filepath": f"{data_input_path}/iris.csv"
},
"processed_iris": {
"type": "pandas.CSVDataSet",
"filepath": f"{data_output_path}/processed_iris.csv"
}
}
return base_catalog
# 创建会话时传入动态目录
with KedroSession.create(
project_path=project_path,
# 传入自定义配置覆盖默认catalog
extra_params={"catalog": get_dynamic_catalog()}
) as session:
context = session.load_context()
# 获取动态目录实例
catalog = context.catalog
if st.button("加载输入数据"):
try:
df = catalog.load("iris_data")
st.dataframe(df.head())
st.success(f"成功从动态路径加载数据:{data_input_path}/iris.csv")
except Exception as e:
st.error(f"数据加载失败:{str(e)}")
方式二:基于环境变量适配多场景
如果需要根据不同运行环境(开发、测试、生产)切换数据目录,可以通过环境变量实现动态配置:
import streamlit as st
import os
from kedro.framework.session import KedroSession
from kedro.framework.startup import bootstrap_project
from pathlib import Path
project_path = Path.cwd()
bootstrap_project(project_path)
# 从环境变量或streamlit输入获取环境标识
env = st.sidebar.selectbox("运行环境", ["dev", "test", "prod"])
# 设置环境变量,kedro会自动读取conf/{env}/下的配置
os.environ["KEDRO_ENV"] = env
with KedroSession.create(project_path=project_path) as session:
context = session.load_context()
catalog = context.catalog
st.write(f"当前运行环境:{env}")
st.write(f"数据目录配置路径:conf/{env}/catalog.yml")
if st.button("查看当前目录配置"):
# 展示当前加载的目录信息
for dataset_name in catalog.list():
dataset = catalog._get_dataset(dataset_name)
st.text(f"数据集:{dataset_name},路径:{dataset._filepath}")
实际应用优化技巧
在实际开发中,还可以结合以下技巧提升动态数据目录的使用效率:
- 将常用路径配置保存到Streamlit的
session_state中,避免重复输入:
# 初始化session_state
if "data_path" not in st.session_state:
st.session_state.data_path = "data/01_raw"
# 使用session_state存储路径
new_path = st.text_input("数据路径", value=st.session_state.data_path)
if st.button("保存路径"):
st.session_state.data_path = new_path
st.success("路径已保存到会话状态")
- 对动态路径做合法性校验,避免无效路径导致程序报错:
from pathlib import Path
def validate_data_path(path_str):
path = Path(path_str)
if not path.exists():
# 尝试创建目录
try:
path.mkdir(parents=True, exist_ok=True)
return True, f"路径不存在已自动创建:{path_str}"
except Exception as e:
return False, f"路径创建失败:{str(e)}"
return True, "路径合法"
# 在加载数据前校验
is_valid, msg = validate_data_path(st.session_state.data_path)
if not is_valid:
st.warning(msg)
常见问题与解决方案
| 问题场景 | 解决方案 |
|---|---|
| 动态路径修改后目录未更新 | 每次修改路径后重新创建Kedro会话,或者使用catalog的add()方法动态添加数据集配置 |
| 多用户访问时路径冲突 | 为每个用户会话生成唯一的临时路径,结合Streamlit的session_id区分不同用户的目录 |
| 数据加载速度慢 | 对常用动态数据做缓存,使用Streamlit的@st.cache_data装饰器减少重复加载 |
通过以上方式,就可以在Kedro与Streamlit集成时高效实现动态数据目录,让数据Web应用具备更灵活的数据适配能力,满足不同场景下的数据读写需求。