如何用Python构建数据仓库并详解Airflow调度ETL流程

来源:编程网作者:日本程序员头衔:程序员
导读:本期聚焦于小伙伴创作的《如何用Python构建数据仓库并详解Airflow调度ETL流程》,敬请观看详情,探索知识的价值。以下视频、文章将为您系统阐述其核心内容与价值。如果您觉得《如何用Python构建数据仓库并详解Airflow调度ETL流程》有用,将其分享出去将是对创作者最好的鼓励。

在企业数据处理体系中,数据仓库承担着存储整合结构化数据的核心作用,而ETL流程是将分散的原始数据转化为可用数据的关键步骤,Airflow作为开源调度工具,能够高效管理ETL任务的执行顺序与依赖关系,三者结合可以搭建稳定可靠的数据处理 pipeline。

如何用Python构建数据仓库并详解Airflow调度ETL流程

数据仓库分层设计

用Python构建数据仓库时,通常采用分层架构降低数据耦合度,常见分层如下:

  • ODS层:原始数据层,存储未经处理的原始业务数据,保持数据原貌
  • DWD层:明细数据层,对ODS层数据进行清洗、过滤、脱敏等处理,形成统一格式的明细数据
  • DWS层:汇总数据层,基于DWD层数据按照业务主题进行轻度汇总
  • ADS层:应用数据层,直接对接业务分析需求,提供可直接使用的数据集

Python实现ETL核心逻辑

ETL包含抽取(Extract)、转换(Transform)、加载(Load)三个环节,下面用Python实现各环础逻辑。

数据抽取

从MySQL数据库抽取原始订单数据,使用pymysql库实现:

import pymysql
import pandas as pd

def extract_order_data():
    # 建立数据库连接
    conn = pymysql.connect(
        host='127.0.0.1',
        port=3306,
        user='root',
        password='123456',
        database='business_db',
        charset='utf8mb4'
    )
    # 执行查询语句
    sql = "SELECT order_id, user_id, order_amount, create_time FROM raw_orders"
    df = pd.read_sql(sql, conn)
    conn.close()
    return df

数据转换

对抽取到的原始数据进行清洗,处理空值、格式统一等逻辑:

def transform_order_data(df):
    # 处理空值,删除订单金额为空的记录
    df = df.dropna(subset=['order_amount'])
    # 统一时间格式
    df['create_time'] = pd.to_datetime(df['create_time'])
    # 新增日期字段,方便后续按天汇总
    df['order_date'] = df['create_time'].dt.date
    return df

数据加载

将转换后的数据加载到数据仓库的DWD层表中:

from sqlalchemy import create_engine

def load_order_data(df):
    # 创建数据仓库连接引擎
    engine = create_engine('mysql+pymysql://root:123456@127.0.0.1:3306/data_warehouse')
    # 写入DWD层订单明细表,如果表存在则追加数据
    df.to_sql(
        name='dwd_order_detail',
        con=engine,
        if_exists='append',
        index=False
    )
    engine.dispose()

Airflow核心概念与调度配置

Airflow通过DAG(有向无环图)定义任务流程,核心组件包含DAG、Operator、Task、Scheduler等,下面介绍关键配置。

安装与初始化

首先通过pip安装Airflow,初始化相关配置:

# 安装Airflow
pip install apache-airflow
# 初始化Airflow数据库
airflow db init
# 创建管理员用户
airflow users create 
    --username admin 
    --firstname admin 
    --lastname admin 
    --role Admin 
    --email test@ipipp.com 
    --password admin123
# 启动调度器与Web服务
airflow scheduler -D
airflow webserver -D

编写ETL调度DAG

将前面实现的ETL函数封装为Airflow任务,定义完整的调度流程:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
# 导入前面定义的ETL函数
from etl_scripts import extract_order_data, transform_order_data, load_order_data

# 定义DAG默认参数
default_args = {
    'owner': 'data_team',
    'depends_on_past': False,
    'email': ['alert@ipipp.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5)
}

# 创建DAG实例,设置每天凌晨1点执行
with DAG(
    dag_id='etl_order_to_dwd',
    default_args=default_args,
    description='订单数据ETL到DWD层调度任务',
    schedule_interval='0 1 * * *',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['etl', 'data_warehouse']
) as dag:
    # 定义抽取任务
    extract_task = PythonOperator(
        task_id='extract_order_data',
        python_callable=extract_order_data
    )
    # 定义转换任务
    transform_task = PythonOperator(
        task_id='transform_order_data',
        python_callable=transform_order_data
    )
    # 定义加载任务
    load_task = PythonOperator(
        task_id='load_order_data',
        python_callable=load_order_data
    )
    # 设置任务依赖:抽取完成后再转换,转换完成后再加载
    extract_task >> transform_task >> load_task

流程验证与问题排查

完成DAG编写后,将脚本放到Airflow的dags目录下,在Web界面中可以看到对应DAG。手动触发执行后,可以在任务实例页面查看每个任务的执行状态、日志。如果出现任务失败,可以通过日志定位问题,比如数据库连接失败、数据格式异常等,调整对应逻辑后重新执行即可。

通过Python实现ETL逻辑,结合Airflow的调度能力,可以搭建出稳定可扩展的数据仓库处理流程,后续可以根据业务需求扩展更多ETL任务,调整调度策略,满足复杂的数据处理场景需求。

Python数据仓库ETLAirflow调度流程修改时间:2026-06-21 07:00:38

免责声明:​ 已尽一切努力确保本网站所含信息的准确性。网站内容多为原创整理与精心编撰,观点力求客观中立。本站旨在免费分享,内容仅供个人学习、研究或参考使用。若引用了第三方作品,版权归原作者所有。如内容涉及您的权益,请联系我们处理。
内容垂直聚焦
专注技术核心技术栏目,确保每篇文章深度聚焦于实用技能。从代码技巧到架构设计,为用户提供无干扰的纯技术知识沉淀,精准满足专业提升需求。
知识结构清晰
覆盖从开发到部署的全链路。AI、前端、编程、数据库、服务器、建站、系统层层递进,构建清晰学习路径,帮助用户系统化掌握开发与运维所需的核心技术。
深度技术解析
拒绝泛泛而谈,深入技术细节与实践难点。无论是数据库优化还是服务器配置,均结合真实场景与代码示例进行剖析,致力于提供可直接应用于工作的解决方案。
专业领域覆盖
精准对应开发生命周期。从前端界面到后端编程,从数据库操作到服务器运维,形成完整闭环,一站式满足全栈工程师和运维人员的技术需求。
即学即用高效
内容强调实操性,步骤清晰、代码完整。用户可根据教程直接复现和应用于自身项目,显著缩短从学习到实践的距离,快速解决开发中的具体问题。
持续更新保障
专注既定技术方向进行长期、稳定的内容输出。确保各栏目技术文章持续更新迭代,紧跟主流技术发展趋势,为用户提供经久不衰的学习价值。