Python爬虫调度与自动化的核心需求
Python爬虫在实际落地场景中,往往不是单次执行就结束,很多业务需要每天定时爬取最新数据,或者同时运行多个不同站点的爬取任务。如果靠手动触发脚本,不仅效率低,还容易出现遗漏。这时候就需要引入任务调度工具,而Celery是Python生态中非常成熟的分布式任务队列,能够很好地支撑爬虫的调度与自动化需求。

Celery的核心组件
在开始实践前,需要先了解Celery的几个核心部分:
- 任务生产者:负责定义需要执行的爬虫任务,将任务发送到消息队列
- 消息中间件:常用Redis、RabbitMQ,用于暂存待执行的任务,本文选用Redis作为中间件
- 任务消费者(Worker):从消息队列中获取任务并执行,也就是实际运行爬虫逻辑的进程
- 定时调度器(Beat):负责按照预设的时间规则,将定时任务发送到消息队列
- 结果存储:可选组件,用于存储任务的执行结果和状态,方便后续查询
环境准备与基础配置
首先需要安装对应的依赖包,执行以下命令:
pip install celery redis requests
接下来创建基础的Celery实例配置文件,假设项目目录为spider_schedule,新建celery_app.py文件:
from celery import Celery
import redis
# 创建Celery实例,第一个参数是应用名称,第二个是消息中间件地址
# 这里使用Redis作为消息中间件,地址为本地默认Redis服务
app = Celery(
"spider_tasks",
broker="redis://127.0.0.1:6379/0",
backend="redis://127.0.0.1:6379/1" # 结果存储也使用Redis
)
# 可选配置,设置任务序列化方式为json,结果序列化方式也为json
app.conf.update(
task_serializer="json",
result_serializer="json",
accept_content=["json"]
)
编写爬虫任务逻辑
接下来编写具体的爬虫任务,在spider_schedule目录下新建tasks.py文件,定义需要执行的爬虫函数:
import requests
from celery_app import app
import time
@app.task(bind=True, max_retries=3, default_retry_delay=60)
def crawl_news(self, url):
"""
爬取新闻页面的爬虫任务
:param self: Celery任务实例,用于重试等操作
:param url: 待爬取的目标URL
"""
try:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status() # 如果状态码不是200,抛出异常
# 这里可以添加解析页面、存储数据的逻辑,简化为打印内容长度
print(f"爬取{url}成功,页面长度:{len(response.text)}")
return {"status": "success", "url": url, "length": len(response.text)}
except Exception as e:
# 任务执行失败,自动重试,最多重试3次,每次间隔60秒
raise self.retry(exc=e)
配置定时任务规则
要实现定时自动化执行,需要配置Celery Beat的定时规则。在celery_app.py中添加定时任务配置:
from celery.schedules import crontab
app.conf.beat_schedule = {
# 定时任务1:每天早上8点爬取新闻首页
"crawl_news_morning": {
"task": "tasks.crawl_news", # 任务路径,对应tasks.py中的crawl_news函数
"schedule": crontab(hour=8, minute=0), # 每天8点执行
"args": ("https://ipipp.com/news",) # 传递给任务的参数,注意是元组格式
},
# 定时任务2:每30分钟爬取一次资讯列表
"crawl_info_every_30min": {
"task": "tasks.crawl_news",
"schedule": 30 * 60, # 每30分钟执行一次,单位是秒
"args": ("https://ipipp.com/info",)
}
}
启动服务与验证
完成配置后,需要分别启动Celery Worker和Beat服务:
首先打开第一个终端,启动Worker进程,执行命令:
celery -A celery_app worker -l info -P eventlet
如果是Windows系统,需要加上-P eventlet参数,否则可能出现运行错误。如果是Linux或Mac系统,可以去掉该参数。
打开第二个终端,启动Beat定时调度器,执行命令:
celery -A celery_app beat -l info
启动后,Beat会按照预设的规则,定时将任务发送到Redis队列,Worker会自动获取任务并执行对应的爬虫逻辑。可以在Worker的终端输出中看到任务的执行日志,验证定时调度是否生效。
常见问题与优化建议
- 如果爬虫任务需要传递复杂参数,建议将参数序列化为json格式,避免传递过程中出现编码问题
- 多个爬虫任务可以定义不同的队列,通过
queue参数指定任务投递的队列,Worker也可以指定消费特定队列的任务,实现任务隔离 - 如果爬取频率过高,需要在任务中添加请求间隔,避免被目标站点封禁IP,可以在任务逻辑中使用
time.sleep()控制间隔 - 任务执行结果可以通过Celery的
AsyncResult查询,方便排查任务失败的原因
注意:爬虫开发需要遵守目标站点的robots协议,不要高频请求给站点服务器造成压力,同时不要爬取涉及隐私、版权的内容。