在Airflow的任务调度场景中,任务失败是不可避免的情况,尤其是涉及外部接口调用、大数据处理的长时任务,失败后重新从头执行会浪费大量计算资源。通过合理配置重连机制与状态追踪,可实现任务失败后的断点续跑,大幅降低重复执行成本。

一、Airflow断点续跑的核心思路
断点续跑的实现需要两个核心支撑:一是任务失败后的自动重连重试能力,避免偶发故障导致任务直接终止;二是任务执行过程中的状态持久化,记录已完成的执行进度,重试时从断点位置继续运行,而非从头开始。
1. 重连机制的作用
重连机制主要针对网络抖动、外部服务临时不可用等偶发故障,通过配置重试次数、重试间隔,让任务在故障恢复后自动重新执行,减少人工介入的成本。Airflow原生支持任务级别的重试配置,也可在任务逻辑中自定义重连逻辑。
2. 状态追踪的作用
状态追踪用于记录任务执行到哪个阶段、处理了多少数据,通常将状态信息存储到数据库、文件或者Airflow的XCom中。当任务重试时,先读取之前保存的状态,恢复上下文后从断点继续处理,避免重复执行已完成的逻辑。
二、配置Airflow任务重连机制
1. 原生重试参数配置
Airflow的DAG和任务算子都支持重试相关参数,最常用的参数包括retries设置重试次数,retry_delay设置重试间隔,retry_exponential_backoff开启指数退避重试。
以下是一个简单的DAG示例,配置了任务最多重试3次,每次重试间隔30秒,且开启指数退避:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def process_data():
# 模拟可能失败的任务逻辑
import random
if random.random() < 0.5:
raise Exception("模拟任务失败")
print("任务执行成功")
with DAG(
dag_id="retry_demo",
start_date=datetime(2024, 1, 1),
schedule_interval=None,
default_args={
"retries": 3, # 最多重试3次
"retry_delay": timedelta(seconds=30), # 基础重试间隔30秒
"retry_exponential_backoff": True, # 开启指数退避,重试间隔会逐步增大
"max_retry_delay": timedelta(minutes=5) # 最大重试间隔5分钟
}
) as dag:
task = PythonOperator(
task_id="process_data_task",
python_callable=process_data
)
2. 自定义重连逻辑
如果原生重试参数无法满足需求,比如需要针对特定异常类型才重试,可在任务逻辑中自定义重连逻辑。以下示例实现了调用外部接口时的重连逻辑,仅当遇到网络异常时才重试:
import requests
from requests.exceptions import ConnectionError, Timeout
def call_external_api_with_retry(url, max_retries=3):
retry_count = 0
while retry_count < max_retries:
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
return response.json()
except (ConnectionError, Timeout) as e:
retry_count += 1
print(f"请求失败,第{retry_count}次重试,错误信息:{str(e)}")
if retry_count == max_retries:
raise Exception(f"请求失败,已达最大重试次数{max_retries}")
except Exception as e:
# 非网络异常直接抛出,不重试
raise e
return None
def api_task_logic():
result = call_external_api_with_retry("http://ipipp.com/api/data")
print(f"接口返回结果:{result}")
三、实现任务状态追踪与断点续跑
1. 使用XCom存储任务状态
Airflow的XCom可以在任务之间传递小体积的数据,也可以用来存储当前任务的执行状态。以下示例实现了分页处理数据的断点续跑,每次处理完一页数据后,将当前页码存储到XCom,任务重试时先读取页码再继续处理:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
def process_page_data(ti):
# 从XCom读取上次处理的页码,首次执行时为None,从第一页开始
current_page = ti.xcom_pull(task_ids="process_page_data", key="current_page")
if current_page is None:
current_page = 1
total_pages = 10 # 模拟总页数
while current_page <= total_pages:
print(f"正在处理第{current_page}页数据")
# 模拟处理当前页数据的逻辑
# 处理完成后将当前页码存入XCom
ti.xcom_push(key="current_page", value=current_page)
current_page += 1
print("所有页面处理完成")
with DAG(
dag_id="state_track_demo",
start_date=days_ago(1),
schedule_interval=None
) as dag:
track_task = PythonOperator(
task_id="process_page_data",
python_callable=process_page_data,
retries=2,
retry_delay=30
)
2. 使用外部数据库存储状态
如果任务状态数据量较大,或者需要跨DAG、跨任务复用状态,可使用外部数据库存储状态。以下示例将任务处理进度存储到MySQL数据库,重试时从数据库读取进度:
import pymysql
def get_progress_from_db(task_id):
# 连接数据库读取任务进度
conn = pymysql.connect(
host="127.0.0.1",
user="root",
password="password",
database="airflow_state"
)
cursor = conn.cursor()
cursor.execute("SELECT progress FROM task_progress WHERE task_id = %s", (task_id,))
result = cursor.fetchone()
cursor.close()
conn.close()
return result[0] if result else 0
def save_progress_to_db(task_id, progress):
# 保存任务进度到数据库
conn = pymysql.connect(
host="127.0.0.1",
user="root",
password="password",
database="airflow_state"
)
cursor = conn.cursor()
# 使用REPLACE INTO实现存在则更新,不存在则插入
cursor.execute(
"REPLACE INTO task_progress (task_id, progress) VALUES (%s, %s)",
(task_id, progress)
)
conn.commit()
cursor.close()
conn.close()
def process_with_db_state(ti):
task_id = ti.task_id
# 读取已处理的进度
processed_count = get_progress_from_db(task_id)
total_count = 100 # 模拟总处理量
while processed_count < total_count:
print(f"正在处理第{processed_count + 1}条数据")
processed_count += 1
# 每处理10条数据保存一次进度
if processed_count % 10 == 0:
save_progress_to_db(task_id, processed_count)
# 处理完成后保存最终进度
save_progress_to_db(task_id, processed_count)
print("数据处理完成")
四、注意事项
- 状态存储的粒度需要合理,过于频繁的状态保存会增加IO开销,过于稀疏则会导致重试时重复处理较多内容。
- 重连机制需要设置最大重试次数上限,避免任务因永久性故障无限重试,占用调度资源。
- 如果任务涉及幂等性操作,断点续跑的逻辑需要保证重复执行不会产生副作用,比如插入数据前先判断数据是否已存在。
- 对于使用
KubernetesPodOperator等容器化算子的任务,状态存储需要放在容器外部,避免容器重启后状态丢失。