Python多进程核心原理
Python多进程的核心依托于操作系统的进程管理机制,通过multiprocessing模块实现跨平台的多进程支持。在Unix类系统中,该模块基于fork系统调用创建子进程,子进程会复制父进程的内存空间和资源;在Windows系统中则通过创建新的Python解释器实例来启动子进程,子进程会重新导入主模块的代码。
由于Python的全局解释器锁GIL仅限制单个进程内的多线程并发,多进程可以真正利用多核CPU的计算能力,因此更适合CPU密集型任务。每个进程拥有独立的内存空间,进程之间的数据默认不共享,需要通过特定的通信机制来传递信息。
进程创建与基本使用
multiprocessing模块中的Process类是创建进程的基础,通过实例化Process对象并指定target参数即可定义子进程要执行的函数。以下是创建单个子进程的基础示例:
import multiprocessing
import time
def worker_task(name):
print(f"子进程 {name} 开始执行")
time.sleep(2)
print(f"子进程 {name} 执行结束")
if __name__ == "__main__":
# 创建进程实例,target为子进程执行的函数,args为函数参数
process = multiprocessing.Process(target=worker_task, args=("test_process",))
# 启动进程
process.start()
# 等待子进程执行完成
process.join()
print("主进程执行结束")
需要注意的是,在Windows系统中必须将进程创建和执行代码放在if __name__ == "__main__":条件下,否则会出现递归创建进程的问题。
进程间通信机制
由于进程内存空间独立,multiprocessing模块提供了多种进程间通信的方式,常用的包括Queue队列、Pipe管道、Value和Array共享变量等。
Queue队列通信
Queue是线程和进程安全的队列,可实现多个进程之间的数据传递,以下是使用Queue实现主进程向子进程传递数据的示例:
import multiprocessing
import time
def consumer(queue):
while True:
# 从队列中获取数据,如果队列为空会阻塞等待
data = queue.get()
if data == "STOP":
print("消费者收到停止信号,退出执行")
break
print(f"消费者收到数据:{data}")
time.sleep(1)
if __name__ == "__main__":
# 创建进程间通信的队列
task_queue = multiprocessing.Queue()
# 创建消费者进程
consumer_process = multiprocessing.Process(target=consumer, args=(task_queue,))
consumer_process.start()
# 主进程向队列中放入任务数据
for i in range(5):
task_queue.put(f"任务{i}")
print(f"主进程放入任务{i}")
# 放入停止信号
task_queue.put("STOP")
consumer_process.join()
print("主进程执行结束")
共享变量通信
如果需要多个进程共享一个变量,可以使用Value或者Array,以下是使用Value实现共享计数器计数的示例:
import multiprocessing
def increment_counter(counter, lock):
# 获取锁,保证计数操作的原子性
lock.acquire()
for _ in range(1000):
counter.value += 1
lock.release()
if __name__ == "__main__":
# 创建共享变量,类型为无符号整数,初始值为0
shared_counter = multiprocessing.Value("i", 0)
# 创建进程锁,避免多个进程同时修改共享变量出现数据不一致
process_lock = multiprocessing.Lock()
process_list = []
# 创建5个子进程执行计数任务
for _ in range(5):
p = multiprocessing.Process(target=increment_counter, args=(shared_counter, process_lock))
p.start()
process_list.append(p)
# 等待所有子进程执行完成
for p in process_list:
p.join()
print(f"最终共享计数器的值:{shared_counter.value}")
Python多进程实战案例
进程池批量处理任务
当需要处理大量同类型的任务时,频繁创建和销毁进程会带来额外的性能开销,此时可以使用进程池Pool来管理进程。进程池会预先创建一定数量的进程,重复利用这些进程处理任务,提升执行效率。
以下是使用进程池批量处理文件解析任务的示例,假设我们需要解析多个文本文件,统计每个文件的行数:
import multiprocessing
import os
def count_file_lines(file_path):
"""统计单个文件的行数"""
try:
with open(file_path, "r", encoding="utf-8") as f:
line_count = len(f.readlines())
return file_path, line_count
except Exception as e:
return file_path, f"解析失败:{str(e)}"
if __name__ == "__main__":
# 待处理的文件列表,实际场景中可以从目录扫描获取
file_list = ["file1.txt", "file2.txt", "file3.txt", "file4.txt", "file5.txt"]
# 创建进程池,进程数量为CPU核心数
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
# 使用map方法批量提交任务,返回所有任务的结果
results = pool.map(count_file_lines, file_list)
# 关闭进程池,不再接收新任务
pool.close()
# 等待所有进程执行完成
pool.join()
# 输出结果
for file_path, line_count in results:
print(f"文件 {file_path} 的行数:{line_count}")
多进程分发计算任务
对于CPU密集型的数值计算任务,比如批量计算大数组的哈希值,使用多进程可以充分利用多核CPU提升计算速度。以下是使用进程池的apply_async方法异步提交任务的示例:
import multiprocessing
import hashlib
def calculate_hash(data_chunk):
"""计算数据块的SHA256哈希值"""
hash_obj = hashlib.sha256()
hash_obj.update(data_chunk)
return hash_obj.hexdigest()
if __name__ == "__main__":
# 模拟待计算的大数据,拆分为多个数据块
data_chunks = [b"data_chunk_1", b"data_chunk_2", b"data_chunk_3", b"data_chunk_4"]
pool = multiprocessing.Pool(processes=4)
result_list = []
# 异步提交任务,获取结果对象
for chunk in data_chunks:
result = pool.apply_async(calculate_hash, (chunk,))
result_list.append(result)
# 关闭进程池
pool.close()
pool.join()
# 获取所有任务的结果
for i, result in enumerate(result_list):
print(f"数据块{i+1}的哈希值:{result.get()}")
多进程使用注意事项
- 进程间通信尽量使用官方提供的Queue、Pipe等机制,避免自行实现复杂的共享内存逻辑,减少出错概率。
- 使用共享变量时必须配合锁机制,否则多个进程同时修改共享变量会导致数据不一致。
- Windows系统下进程创建成本较高,不适合创建过多进程,建议进程数量不超过CPU核心数的2倍。
- 如果任务属于IO密集型,多进程的优势不如多线程或者协程明显,需要根据任务类型选择合适的并发方案。
- 子进程中不要使用不可序列化的对象作为函数参数,否则会导致进程创建失败。
Python多进程Processmultiprocessing进程池修改时间:2026-06-22 05:45:48