导读:本期聚焦于小伙伴创作的《如何让多个 Ray Worker 并发写入共享数据缓冲区并由主线程安全读取》,敬请观看详情,探索知识的价值。以下视频、文章将为您系统阐述其核心内容与价值。如果您觉得《如何让多个 Ray Worker 并发写入共享数据缓冲区并由主线程安全读取》有用,将其分享出去将是对创作者最好的鼓励。

在Ray分布式计算场景中,多个Worker并发处理任务后将结果写入共享数据缓冲区,再由主线程统一读取是常见需求,不过并发写入容易引发数据竞争问题,需要通过合适的同步机制保证操作安全。

如何让多个 Ray Worker 并发写入共享数据缓冲区并由主线程安全读取

核心实现思路

要实现多Worker并发写入、主线程安全读取,核心需要解决两个问题:一是提供所有Worker都能访问的共享缓冲区,二是给写入操作加锁避免并发冲突。Ray中可以通过ray.put创建共享对象,结合Python的threading.Lock实现写入同步,主线程读取时只需要获取缓冲区的当前快照即可保证安全。

共享缓冲区的初始化

首先我们需要初始化一个共享的列表作为数据缓冲区,同时创建一个全局的锁对象,用于控制写入操作的互斥。因为Ray的远程函数(remote function)是运行在不同进程中的,所以不能直接使用普通的全局变量,需要将缓冲区和锁都通过Ray的共享机制传递。

具体初始化代码如下:

import ray
import threading

# 初始化Ray
ray.init()

# 定义共享缓冲区和锁的持有对象
@ray.remote
class SharedBuffer:
    def __init__(self):
        # 缓冲区存储Worker写入的数据
        self.buffer = []
        # 写入操作的锁
        self.lock = threading.Lock()
    
    def write(self, data):
        # 加锁保证同一时间只有一个Worker写入
        with self.lock:
            self.buffer.append(data)
    
    def read(self):
        # 读取时加锁获取当前缓冲区的副本,避免读取过程中被修改
        with self.lock:
            return list(self.buffer)
    
    def clear(self):
        # 清空缓冲区的操作也加锁
        with self.lock:
            self.buffer.clear()

# 创建共享缓冲区实例
shared_buffer = SharedBuffer.remote()

Worker并发写入实现

接下来定义Worker的远程任务,每个Worker处理完自己的逻辑后,调用共享缓冲区的write方法写入数据。因为write方法内部已经加了锁,所以多个Worker同时调用也不会出现数据竞争。

Worker的实现代码如下:

@ray.remote
def worker_task(worker_id, buffer, task_num):
    # 模拟Worker处理多个任务,每个任务完成后写入结果
    for i in range(task_num):
        # 模拟任务处理耗时
        import time
        time.sleep(0.1)
        # 构造当前任务的结果
        result = f"Worker_{worker_id}_task_{i}_result"
        # 调用共享缓冲区的写入方法
        buffer.write.remote(result)
    return f"Worker_{worker_id}完成所有任务"

# 启动4个Worker,每个Worker处理3个任务
worker_tasks = []
for i in range(4):
    task = worker_task.remote(i, shared_buffer, 3)
    worker_tasks.append(task)

# 等待所有Worker任务完成
ray.get(worker_tasks)
print("所有Worker写入完成")

主线程安全读取数据

所有Worker完成写入后,主线程可以调用共享缓冲区的read方法读取数据。read方法内部同样加了锁,会返回当前缓冲区的完整副本,主线程拿到副本后可以进行后续处理,不需要担心读取过程中缓冲区被修改的问题。

读取代码如下:

# 主线程调用read方法获取缓冲区数据
buffer_data = ray.get(shared_buffer.read.remote())
print(f"主线程读取到的数据总量:{len(buffer_data)}")
print("前5条数据示例:")
for item in buffer_data[:5]:
    print(item)

# 如果需要清空缓冲区,可以调用clear方法
ray.get(shared_buffer.clear.remote())
print("缓冲区已清空")

注意事项

  • 锁的作用范围是单个SharedBuffer实例内部,如果有多个共享缓冲区实例,需要分别加锁,不能共用同一个锁对象。
  • 如果Worker写入的数据量非常大,共享缓冲区会占用较多内存,此时可以考虑定期让主线程读取并清空缓冲区,避免内存溢出。
  • 读取操作返回的缓冲区副本是独立的列表对象,主线程后续对副本的修改不会影响共享缓冲区的原始数据。
  • 如果需要在读取时同时获取写入状态,可以在SharedBuffer中增加一个写入完成的标记字段,Worker全部完成后修改该标记,主线程读取前先判断标记状态。

常见问题排查

如果读取到的数据不全,可以先检查Worker是否全部执行完成,再确认write方法的锁是否生效。如果出现数据重复,需要检查是否有Worker重复提交相同的任务结果。如果程序报锁相关的错误,需要确认锁对象是在SharedBuffer内部初始化的,没有被意外修改或释放。

Ray共享数据缓冲区并发写入线程安全Python修改时间:2026-06-24 07:30:32

免责声明:​ 已尽一切努力确保本网站所含信息的准确性。网站内容多为原创整理与精心编撰,观点力求客观中立。本站旨在免费分享,内容仅供个人学习、研究或参考使用。若引用了第三方作品,版权归原作者所有。如内容涉及您的权益,请联系我们处理。
内容垂直聚焦
专注技术核心技术栏目,确保每篇文章深度聚焦于实用技能。从代码技巧到架构设计,为用户提供无干扰的纯技术知识沉淀,精准满足专业提升需求。
知识结构清晰
覆盖从开发到部署的全链路。AI、前端、编程、数据库、服务器、建站、系统层层递进,构建清晰学习路径,帮助用户系统化掌握开发与运维所需的核心技术。
深度技术解析
拒绝泛泛而谈,深入技术细节与实践难点。无论是数据库优化还是服务器配置,均结合真实场景与代码示例进行剖析,致力于提供可直接应用于工作的解决方案。
专业领域覆盖
精准对应开发生命周期。从前端界面到后端编程,从数据库操作到服务器运维,形成完整闭环,一站式满足全栈工程师和运维人员的技术需求。
即学即用高效
内容强调实操性,步骤清晰、代码完整。用户可根据教程直接复现和应用于自身项目,显著缩短从学习到实践的距离,快速解决开发中的具体问题。
持续更新保障
专注既定技术方向进行长期、稳定的内容输出。确保各栏目技术文章持续更新迭代,紧跟主流技术发展趋势,为用户提供经久不衰的学习价值。