在分布式服务架构中,多台服务器组成服务器池共同处理客户端请求是常见方案,通过Python的异步并发能力可以实现高效的请求调度,同时结合动态负载均衡策略让服务器池的资源得到充分利用,还能保障任务流持续稳定执行,避免请求堆积或节点过载。
核心实现思路
整个调度系统的核心分为三个部分:异步请求执行模块、服务器负载监控模块、动态调度决策模块。异步请求执行模块负责非阻塞地发送请求并处理响应,负载监控模块实时采集各服务器的运行状态,调度决策模块根据监控数据调整请求分配策略,三者配合实现持续任务流的稳定调度。
异步请求执行模块
使用aiohttp库实现异步HTTP请求,避免同步请求带来的阻塞问题,提升单进程内的请求处理效率。首先封装基础的异步请求函数:
import asyncio
import aiohttp
from typing import Dict, Any
async def async_request(session: aiohttp.ClientSession, url: str, method: str = "GET", data: Dict[str, Any] = None) -> Dict[str, Any]:
"""异步发送HTTP请求"""
try:
if method.upper() == "GET":
async with session.get(url) as response:
return {
"status": response.status,
"content": await response.text(),
"url": url
}
elif method.upper() == "POST":
async with session.post(url, json=data) as response:
return {
"status": response.status,
"content": await response.text(),
"url": url
}
except Exception as e:
return {
"status": -1,
"error": str(e),
"url": url
}
服务器池与负载监控
首先需要定义服务器池的结构,记录每台服务器的基础信息和实时负载数据,负载数据包括当前活跃请求数、最近响应时间、错误率等指标:
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class ServerNode:
"""服务器节点信息"""
url: str
max_concurrent: int = 10 # 最大并发请求数
active_requests: int = 0 # 当前活跃请求数
total_requests: int = 0 # 总处理请求数
error_count: int = 0 # 错误请求数
last_response_time: float = 0.0 # 最近响应时间(秒)
last_update_time: datetime = field(default_factory=datetime.now)
def get_load_score(self) -> float:
"""计算负载得分,得分越低负载越低"""
error_rate = self.error_count / self.total_requests if self.total_requests > 0 else 0
# 综合活跃请求占比、响应时间、错误率计算得分
score = (self.active_requests / self.max_concurrent) * 0.5 + self.last_response_time * 0.3 + error_rate * 0.2
return score
class ServerPool:
"""服务器池管理"""
def __init__(self):
self.servers: list[ServerNode] = []
def add_server(self, server: ServerNode):
"""添加服务器节点"""
self.servers.append(server)
def get_available_server(self) -> ServerNode:
"""获取负载最低的可用服务器"""
available_servers = [s for s in self.servers if s.active_requests < s.max_concurrent]
if not available_servers:
return None
# 按负载得分升序排序,取第一个
available_servers.sort(key=lambda x: x.get_load_score())
return available_servers[0]
def update_server_status(self, server_url: str, response_time: float, is_error: bool):
"""更新服务器状态"""
for server in self.servers:
if server.url == server_url:
server.active_requests -= 1
server.total_requests += 1
server.last_response_time = response_time
if is_error:
server.error_count += 1
server.last_update_time = datetime.now()
break
动态调度与持续任务流实现
调度器负责从任务队列中获取任务,选择负载最低的服务器执行请求,同时更新服务器状态,实现持续的任务流处理:
class RequestScheduler:
"""请求调度器"""
def __init__(self, server_pool: ServerPool, max_retry: int = 3):
self.server_pool = server_pool
self.max_retry = max_retry
self.task_queue = asyncio.Queue()
self.session = None
async def init_session(self):
"""初始化aiohttp会话"""
self.session = aiohttp.ClientSession()
async def close_session(self):
"""关闭会话"""
if self.session:
await self.session.close()
async def add_task(self, url: str, method: str = "GET", data: Dict[str, Any] = None):
"""添加任务到队列"""
await self.task_queue.put({
"url": url,
"method": method,
"data": data,
"retry_count": 0
})
async def process_task(self, task: Dict[str, Any]):
"""处理单个任务"""
server = self.server_pool.get_available_server()
if not server:
# 没有可用服务器,重新入队等待
await self.task_queue.put(task)
await asyncio.sleep(0.1)
return
# 标记服务器活跃请求数增加
server.active_requests += 1
start_time = asyncio.get_event_loop().time()
result = await async_request(self.session, server.url, task["method"], task["data"])
end_time = asyncio.get_event_loop().time()
response_time = end_time - start_time
# 更新服务器状态
is_error = result.get("status", -1) != 200
self.server_pool.update_server_status(server.url, response_time, is_error)
# 处理重试逻辑
if is_error and task["retry_count"] < self.max_retry:
task["retry_count"] += 1
await self.task_queue.put(task)
else:
# 这里可以处理最终请求结果,比如写入数据库、返回给客户端等
print(f"任务处理完成,服务器:{server.url},状态:{result.get('status')}")
async def run(self):
"""启动调度器,持续处理任务流"""
await self.init_session()
try:
while True:
task = await self.task_queue.get()
# 创建任务异步处理,不阻塞主循环
asyncio.create_task(self.process_task(task))
self.task_queue.task_done()
except asyncio.CancelledError:
await self.close_session()
使用示例
以下是完整的调度系统启动示例,模拟向服务器池发送持续的任务流:
async def main():
# 初始化服务器池,添加两台测试服务器
pool = ServerPool()
pool.add_server(ServerNode(url="http://127.0.0.1:8080", max_concurrent=5))
pool.add_server(ServerNode(url="http://127.0.0.1:8081", max_concurrent=8))
# 初始化调度器
scheduler = RequestScheduler(pool)
# 启动调度器任务
scheduler_task = asyncio.create_task(scheduler.run())
# 模拟添加持续任务流
for i in range(50):
await scheduler.add_task(
url=f"http://ipipp.com/api/test?id={i}",
method="GET"
)
# 等待所有任务处理完成
await scheduler.task_queue.join()
# 取消调度器运行
scheduler_task.cancel()
try:
await scheduler_task
except asyncio.CancelledError:
pass
if __name__ == "__main__":
asyncio.run(main())
注意事项
- 服务器节点的
max_concurrent参数需要根据服务器的实际处理能力设置,避免设置过高导致服务器过载。 - 负载得分的计算逻辑可以根据实际业务场景调整权重,比如对响应时间敏感的可以调高响应时间的权重。
- 持续任务流场景下,任务队列需要设置合理的容量上限,避免内存溢出,同时可以添加队列满时的降级策略。
- 生产环境中需要添加更完善的监控和告警机制,当服务器池整体负载过高时及时扩容。