Python多进程开发中,多个进程同时操作同一个日志文件时,会因为文件写入的竞争导致日志内容丢失、顺序错乱甚至文件损坏,因此需要根据实际场景选择合适的日志收集实现思路,保障日志的完整性和准确性。

常见实现思路分析
1. 进程独立日志文件
每个子进程单独创建自己的日志文件,进程之间完全不共享日志写入对象,从根源上避免文件竞争问题。这种思路实现简单,适合进程数量固定且不需要统一汇总日志的场景。
实现时可以在子进程启动时,根据进程ID或者自定义标识生成独立的日志文件路径,每个进程的logging配置仅指向自己的文件。示例代码如下:
import logging
import multiprocessing
import os
import time
def init_process_logger(process_id):
# 为每个进程创建独立的日志文件
logger = logging.getLogger(f"process_{process_id}")
logger.setLevel(logging.INFO)
# 避免日志向上传递到根logger
logger.propagate = False
# 创建文件处理器,路径包含进程ID
file_handler = logging.FileHandler(f"process_{process_id}_log.log", encoding="utf-8")
formatter = logging.Formatter('%(asctime)s - %(processName)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
return logger
def worker_process(process_id):
logger = init_process_logger(process_id)
for i in range(5):
logger.info(f"进程{process_id}输出第{i}条日志")
time.sleep(0.1)
if __name__ == "__main__":
process_list = []
for i in range(3):
p = multiprocessing.Process(target=worker_process, args=(i,), name=f"worker_{i}")
process_list.append(p)
p.start()
for p in process_list:
p.join()
2. 队列中转日志
创建一个公共的日志队列,所有子进程将日志内容发送到队列中,由单独的日志收集进程从队列中取出日志并写入统一文件,避免多进程直接操作日志文件。这种思路适合需要统一汇总所有进程日志的场景。
需要注意队列的传递方式,multiprocessing模块提供的Queue可以在进程间安全传递日志内容。示例代码如下:
import logging
import multiprocessing
from logging.handlers import QueueHandler, QueueListener
import time
def worker_process(log_queue, process_id):
# 子进程使用QueueHandler将日志发送到队列
logger = logging.getLogger(f"worker_{process_id}")
logger.setLevel(logging.INFO)
logger.propagate = False
queue_handler = QueueHandler(log_queue)
logger.addHandler(queue_handler)
for i in range(5):
logger.info(f"子进程{process_id}的第{i}条日志")
time.sleep(0.1)
def log_listener(log_queue):
# 日志收集进程,从队列取日志写入文件
logger = logging.getLogger("log_listener")
logger.setLevel(logging.INFO)
file_handler = logging.FileHandler("all_process_log.log", encoding="utf-8")
formatter = logging.Formatter('%(asctime)s - %(processName)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
# 启动QueueListener监听队列
listener = QueueListener(log_queue, file_handler)
listener.start()
return listener
if __name__ == "__main__":
log_queue = multiprocessing.Queue()
# 启动日志收集进程
listener = log_listener(log_queue)
process_list = []
for i in range(3):
p = multiprocessing.Process(target=worker_process, args=(log_queue, i), name=f"worker_{i}")
process_list.append(p)
p.start()
for p in process_list:
p.join()
# 所有子进程结束后停止监听
listener.stop()
3. Socket转发日志
将日志通过Socket发送到远程或者本地的日志接收服务,由接收服务统一处理日志存储。这种思路适合分布式多进程场景,或者需要将日志输出到第三方日志系统的场景。
实现时可以使用logging的SocketHandler,子进程将日志序列化后发送到指定的Socket服务端,服务端接收后解析并写入文件。示例代码如下:
import logging
import multiprocessing
import socket
import pickle
import struct
import time
from logging.handlers import SocketHandler
class CustomSocketHandler(SocketHandler):
def makePickle(self, record):
# 自定义序列化逻辑,添加进程信息
record.process_name = multiprocessing.current_process().name
return pickle.dumps(record)
def log_server(host="127.0.0.1", port=9020):
# 启动Socket日志接收服务
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((host, port))
sock.listen(5)
print(f"日志服务启动,监听{host}:{port}")
file_handler = logging.FileHandler("socket_log.log", encoding="utf-8")
formatter = logging.Formatter('%(asctime)s - %(process_name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
while True:
conn, addr = sock.accept()
try:
while True:
# 读取日志长度
chunk = conn.recv(4)
if len(chunk) < 4:
break
length = struct.unpack(">L", chunk)[0]
# 读取完整日志数据
data = b""
while len(data) < length:
chunk = conn.recv(length - len(data))
if not chunk:
break
data += chunk
if not data:
break
# 反序列化日志记录
record = pickle.loads(data)
# 写入文件
file_handler.handle(record)
except Exception as e:
print(f"接收日志出错:{e}")
finally:
conn.close()
def worker_process(process_id):
logger = logging.getLogger(f"worker_{process_id}")
logger.setLevel(logging.INFO)
logger.propagate = False
# 使用自定义SocketHandler发送日志
socket_handler = CustomSocketHandler("127.0.0.1", 9020)
logger.addHandler(socket_handler)
for i in range(5):
logger.info(f"Socket方式进程{process_id}的第{i}条日志")
time.sleep(0.1)
if __name__ == "__main__":
# 先启动日志服务进程
server_process = multiprocessing.Process(target=log_server, daemon=True)
server_process.start()
time.sleep(1)
process_list = []
for i in range(3):
p = multiprocessing.Process(target=worker_process, args=(i,), name=f"worker_{i}")
process_list.append(p)
p.start()
for p in process_list:
p.join()
time.sleep(1)
不同思路的适用场景对比
可以根据实际需求选择对应的实现方案,以下是三种思路的对比:
| 实现思路 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|
| 进程独立日志文件 | 进程数量少,不需要汇总日志 | 实现简单,无额外依赖 | 日志分散,不便于统一查看 |
| 队列中转日志 | 单机多进程,需要统一汇总日志 | 日志集中,无文件竞争 | 依赖队列,收集进程故障会导致日志丢失 |
| Socket转发日志 | 分布式多进程,需要对接外部日志系统 | 扩展性强,支持跨机器收集 | 实现复杂,需要维护Socket服务 |
实现注意事项
- 无论选择哪种思路,都要避免多个进程同时打开同一个日志文件进行写入操作,这是多进程日志问题的核心诱因。
- 使用队列或者Socket方式时,要做好异常处理,避免子进程因为日志发送失败导致业务逻辑中断。
- logging模块的logger对象不要在进程间直接传递,最好在子进程内部单独初始化,避免对象序列化问题。
- 如果进程会频繁创建销毁,建议选择队列或者Socket方式,减少日志文件的创建和清理成本。
Python多进程日志收集logging_module修改时间:2026-07-02 12:33:52