在Ray分布式计算场景中,多个Worker并发处理任务后将结果写入共享数据缓冲区,再由主线程统一读取是常见需求,不过并发写入容易引发数据竞争问题,需要通过合适的同步机制保证操作安全。

核心实现思路
要实现多Worker并发写入、主线程安全读取,核心需要解决两个问题:一是提供所有Worker都能访问的共享缓冲区,二是给写入操作加锁避免并发冲突。Ray中可以通过ray.put创建共享对象,结合Python的threading.Lock实现写入同步,主线程读取时只需要获取缓冲区的当前快照即可保证安全。
共享缓冲区的初始化
首先我们需要初始化一个共享的列表作为数据缓冲区,同时创建一个全局的锁对象,用于控制写入操作的互斥。因为Ray的远程函数(remote function)是运行在不同进程中的,所以不能直接使用普通的全局变量,需要将缓冲区和锁都通过Ray的共享机制传递。
具体初始化代码如下:
import ray
import threading
# 初始化Ray
ray.init()
# 定义共享缓冲区和锁的持有对象
@ray.remote
class SharedBuffer:
def __init__(self):
# 缓冲区存储Worker写入的数据
self.buffer = []
# 写入操作的锁
self.lock = threading.Lock()
def write(self, data):
# 加锁保证同一时间只有一个Worker写入
with self.lock:
self.buffer.append(data)
def read(self):
# 读取时加锁获取当前缓冲区的副本,避免读取过程中被修改
with self.lock:
return list(self.buffer)
def clear(self):
# 清空缓冲区的操作也加锁
with self.lock:
self.buffer.clear()
# 创建共享缓冲区实例
shared_buffer = SharedBuffer.remote()
Worker并发写入实现
接下来定义Worker的远程任务,每个Worker处理完自己的逻辑后,调用共享缓冲区的write方法写入数据。因为write方法内部已经加了锁,所以多个Worker同时调用也不会出现数据竞争。
Worker的实现代码如下:
@ray.remote
def worker_task(worker_id, buffer, task_num):
# 模拟Worker处理多个任务,每个任务完成后写入结果
for i in range(task_num):
# 模拟任务处理耗时
import time
time.sleep(0.1)
# 构造当前任务的结果
result = f"Worker_{worker_id}_task_{i}_result"
# 调用共享缓冲区的写入方法
buffer.write.remote(result)
return f"Worker_{worker_id}完成所有任务"
# 启动4个Worker,每个Worker处理3个任务
worker_tasks = []
for i in range(4):
task = worker_task.remote(i, shared_buffer, 3)
worker_tasks.append(task)
# 等待所有Worker任务完成
ray.get(worker_tasks)
print("所有Worker写入完成")
主线程安全读取数据
所有Worker完成写入后,主线程可以调用共享缓冲区的read方法读取数据。read方法内部同样加了锁,会返回当前缓冲区的完整副本,主线程拿到副本后可以进行后续处理,不需要担心读取过程中缓冲区被修改的问题。
读取代码如下:
# 主线程调用read方法获取缓冲区数据
buffer_data = ray.get(shared_buffer.read.remote())
print(f"主线程读取到的数据总量:{len(buffer_data)}")
print("前5条数据示例:")
for item in buffer_data[:5]:
print(item)
# 如果需要清空缓冲区,可以调用clear方法
ray.get(shared_buffer.clear.remote())
print("缓冲区已清空")
注意事项
- 锁的作用范围是单个
SharedBuffer实例内部,如果有多个共享缓冲区实例,需要分别加锁,不能共用同一个锁对象。 - 如果Worker写入的数据量非常大,共享缓冲区会占用较多内存,此时可以考虑定期让主线程读取并清空缓冲区,避免内存溢出。
- 读取操作返回的缓冲区副本是独立的列表对象,主线程后续对副本的修改不会影响共享缓冲区的原始数据。
- 如果需要在读取时同时获取写入状态,可以在
SharedBuffer中增加一个写入完成的标记字段,Worker全部完成后修改该标记,主线程读取前先判断标记状态。
常见问题排查
如果读取到的数据不全,可以先检查Worker是否全部执行完成,再确认write方法的锁是否生效。如果出现数据重复,需要检查是否有Worker重复提交相同的任务结果。如果程序报锁相关的错误,需要确认锁对象是在SharedBuffer内部初始化的,没有被意外修改或释放。