在Gevent的协程并发模型中,多个协程如果直接共享同一个Socket连接,很容易出现数据发送接收错乱、连接意外关闭等问题,这也是很多开发者在写Gevent网络程序时会遇到的典型问题。下面我们先看一个错误的共享示例,再逐步介绍正确的实现方式。

为什么直接共享Socket连接会有问题
Socket本身是全双工的,但是它的读写操作不是原子性的。在Gevent中,多个协程是交替执行的,如果一个协程刚发送了一半数据,就被切换到另一个协程执行发送操作,就会导致两个请求的数据混在一起,服务端收到的是错乱的报文,同样接收数据的时候也可能出现多个协程抢读同一份响应,导致数据归属错误。
下面是一个错误的共享示例,多个协程直接操作同一个Socket:
import gevent
from gevent import socket
def wrong_share(sock, msg):
# 多个协程直接调用send,可能出现数据交错
sock.send(msg.encode())
data = sock.recv(1024)
print(f"收到响应: {data.decode()}")
def main():
sock = socket.socket()
sock.connect(("127.0.0.1", 8080))
tasks = []
for i in range(3):
# 启动3个协程共享同一个socket
t = gevent.spawn(wrong_share, sock, f"请求{i}")
tasks.append(t)
gevent.joinall(tasks)
sock.close()
if __name__ == "__main__":
main()方案一:使用请求队列做中心化分发
这种方案的核心是所有协程不直接操作Socket,而是把要发送的请求放到一个队列里,单独启动一个协程负责从队列取请求、通过Socket发送、接收响应,再把响应回传给对应的请求协程。这样可以保证Socket的读写操作都是串行的,避免并发冲突。
实现的时候可以用Gevent的Queue来做请求中转,每个请求带一个结果队列,发送协程处理完之后把结果放到对应的结果队列里,请求方从自己的结果队列取数据即可。
import gevent
from gevent import socket, Queue
class SocketDispatcher:
def __init__(self, sock):
self.sock = sock
self.request_queue = Queue()
self.running = True
# 启动发送协程
self.sender = gevent.spawn(self._send_loop)
def _send_loop(self):
while self.running:
# 从队列取请求,每个请求是(发送内容, 结果队列)的元组
msg, result_queue = self.request_queue.get()
try:
self.sock.send(msg.encode())
resp = self.sock.recv(1024)
result_queue.put(resp.decode())
except Exception as e:
result_queue.put(e)
def request(self, msg):
# 每个请求创建独立的结果队列
result_queue = Queue()
self.request_queue.put((msg, result_queue))
# 等待结果返回
return result_queue.get()
def close(self):
self.running = False
self.sender.join()
self.sock.close()
def worker(dispatcher, msg):
resp = dispatcher.request(msg)
print(f"请求{msg}的响应: {resp}")
def main():
sock = socket.socket()
sock.connect(("127.0.0.1", 8080))
dispatcher = SocketDispatcher(sock)
tasks = []
for i in range(3):
t = gevent.spawn(worker, dispatcher, f"请求{i}")
tasks.append(t)
gevent.joinall(tasks)
dispatcher.close()
if __name__ == "__main__":
main()方案二:构建Socket连接池
如果业务场景需要多个协程同时发起请求,单连接串行处理可能性能不够,这时候可以构建连接池,提前创建多个Socket连接,每个协程从连接池取一个空闲连接使用,用完再还回去,避免多个协程共享同一个连接。
连接池需要管理连接的创建、回收、健康检查,同时用锁或者信号量控制连接获取的逻辑,避免并发获取连接出现问题。
import gevent
from gevent import socket, lock
import queue
class SocketPool:
def __init__(self, host, port, max_size=5):
self.host = host
self.port = port
self.max_size = max_size
self.pool = queue.Queue()
self.size = 0
self.size_lock = lock.Semaphore()
def _create_sock(self):
sock = socket.socket()
sock.connect((self.host, self.port))
return sock
def get(self):
# 先尝试从池里取空闲连接
try:
return self.pool.get_nowait()
except queue.Empty:
pass
# 池里没有的话,检查是否可以创建新连接
with self.size_lock:
if self.size < self.max_size:
self.size += 1
return self._create_sock()
# 达到最大连接数,等待其他协程归还连接
return self.pool.get()
def put(self, sock):
# 简单检查连接是否可用,不可用就新建替换
try:
sock.send(b"")
self.pool.put(sock)
except:
with self.size_lock:
self.size -= 1
sock.close()
def close_all(self):
while not self.pool.empty():
try:
sock = self.pool.get_nowait()
sock.close()
except:
pass
def pool_worker(pool, msg):
sock = pool.get()
try:
sock.send(msg.encode())
resp = sock.recv(1024)
print(f"请求{msg}的响应: {resp.decode()}")
finally:
pool.put(sock)
def main():
pool = SocketPool("127.0.0.1", 8080, max_size=3)
tasks = []
for i in range(5):
t = gevent.spawn(pool_worker, pool, f"请求{i}")
tasks.append(t)
gevent.joinall(tasks)
pool.close_all()
if __name__ == "__main__":
main()方案三:使用Gevent锁同步单连接操作
如果业务场景只能使用单个Socket连接,也可以给Socket的读写操作加锁,保证同一时间只有一个协程在操作这个连接。这种方式实现简单,但是性能会比前两种方案差,因为所有操作都是串行的。
import gevent
from gevent import socket, lock
def lock_share(sock, msg, sock_lock):
with sock_lock:
# 加锁保证同一时间只有一个协程操作socket
sock.send(msg.encode())
resp = sock.recv(1024)
print(f"请求{msg}的响应: {resp.decode()}")
def main():
sock = socket.socket()
sock.connect(("127.0.0.1", 8080))
sock_lock = lock.Lock()
tasks = []
for i in range(3):
t = gevent.spawn(lock_share, sock, f"请求{i}", sock_lock)
tasks.append(t)
gevent.joinall(tasks)
sock.close()
if __name__ == "__main__":
main()方案选择建议
如果业务请求量小,只需要单个连接,用锁同步的方式最简单;如果需要多个协程并发请求,优先选择连接池方案,性能更好;如果服务端要求所有请求必须走同一个连接,就用请求队列做中心化分发的方案。不管选哪种方案,都要注意连接异常的处理,比如连接断开后要及时重连,避免程序报错。
| 方案 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|
| 请求队列分发 | 必须单连接、请求有序 | 逻辑简单,无并发冲突 | 性能低,串行处理请求 |
| 连接池 | 高并发请求、允许多连接 | 性能好,支持并发请求 | 实现稍复杂,需要管理连接 |
| 锁同步单连接 | 低并发、单连接场景 | 实现最简单 | 性能差,串行操作 |
GeventSocket连接共享协程并发连接池Python并发编程修改时间:2026-05-31 23:40:40