在企业数据处理体系中,数据仓库承担着存储整合结构化数据的核心作用,而ETL流程是将分散的原始数据转化为可用数据的关键步骤,Airflow作为开源调度工具,能够高效管理ETL任务的执行顺序与依赖关系,三者结合可以搭建稳定可靠的数据处理 pipeline。

数据仓库分层设计
用Python构建数据仓库时,通常采用分层架构降低数据耦合度,常见分层如下:
- ODS层:原始数据层,存储未经处理的原始业务数据,保持数据原貌
- DWD层:明细数据层,对ODS层数据进行清洗、过滤、脱敏等处理,形成统一格式的明细数据
- DWS层:汇总数据层,基于DWD层数据按照业务主题进行轻度汇总
- ADS层:应用数据层,直接对接业务分析需求,提供可直接使用的数据集
Python实现ETL核心逻辑
ETL包含抽取(Extract)、转换(Transform)、加载(Load)三个环节,下面用Python实现各环础逻辑。
数据抽取
从MySQL数据库抽取原始订单数据,使用pymysql库实现:
import pymysql
import pandas as pd
def extract_order_data():
# 建立数据库连接
conn = pymysql.connect(
host='127.0.0.1',
port=3306,
user='root',
password='123456',
database='business_db',
charset='utf8mb4'
)
# 执行查询语句
sql = "SELECT order_id, user_id, order_amount, create_time FROM raw_orders"
df = pd.read_sql(sql, conn)
conn.close()
return df
数据转换
对抽取到的原始数据进行清洗,处理空值、格式统一等逻辑:
def transform_order_data(df):
# 处理空值,删除订单金额为空的记录
df = df.dropna(subset=['order_amount'])
# 统一时间格式
df['create_time'] = pd.to_datetime(df['create_time'])
# 新增日期字段,方便后续按天汇总
df['order_date'] = df['create_time'].dt.date
return df
数据加载
将转换后的数据加载到数据仓库的DWD层表中:
from sqlalchemy import create_engine
def load_order_data(df):
# 创建数据仓库连接引擎
engine = create_engine('mysql+pymysql://root:123456@127.0.0.1:3306/data_warehouse')
# 写入DWD层订单明细表,如果表存在则追加数据
df.to_sql(
name='dwd_order_detail',
con=engine,
if_exists='append',
index=False
)
engine.dispose()
Airflow核心概念与调度配置
Airflow通过DAG(有向无环图)定义任务流程,核心组件包含DAG、Operator、Task、Scheduler等,下面介绍关键配置。
安装与初始化
首先通过pip安装Airflow,初始化相关配置:
# 安装Airflow
pip install apache-airflow
# 初始化Airflow数据库
airflow db init
# 创建管理员用户
airflow users create
--username admin
--firstname admin
--lastname admin
--role Admin
--email test@ipipp.com
--password admin123
# 启动调度器与Web服务
airflow scheduler -D
airflow webserver -D
编写ETL调度DAG
将前面实现的ETL函数封装为Airflow任务,定义完整的调度流程:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
# 导入前面定义的ETL函数
from etl_scripts import extract_order_data, transform_order_data, load_order_data
# 定义DAG默认参数
default_args = {
'owner': 'data_team',
'depends_on_past': False,
'email': ['alert@ipipp.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5)
}
# 创建DAG实例,设置每天凌晨1点执行
with DAG(
dag_id='etl_order_to_dwd',
default_args=default_args,
description='订单数据ETL到DWD层调度任务',
schedule_interval='0 1 * * *',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['etl', 'data_warehouse']
) as dag:
# 定义抽取任务
extract_task = PythonOperator(
task_id='extract_order_data',
python_callable=extract_order_data
)
# 定义转换任务
transform_task = PythonOperator(
task_id='transform_order_data',
python_callable=transform_order_data
)
# 定义加载任务
load_task = PythonOperator(
task_id='load_order_data',
python_callable=load_order_data
)
# 设置任务依赖:抽取完成后再转换,转换完成后再加载
extract_task >> transform_task >> load_task
流程验证与问题排查
完成DAG编写后,将脚本放到Airflow的dags目录下,在Web界面中可以看到对应DAG。手动触发执行后,可以在任务实例页面查看每个任务的执行状态、日志。如果出现任务失败,可以通过日志定位问题,比如数据库连接失败、数据格式异常等,调整对应逻辑后重新执行即可。
通过Python实现ETL逻辑,结合Airflow的调度能力,可以搭建出稳定可扩展的数据仓库处理流程,后续可以根据业务需求扩展更多ETL任务,调整调度策略,满足复杂的数据处理场景需求。