Python多进程日志收集的实现思路有哪些

来源:Vuejs社区作者:松本一香头衔:网络博主
导读:本期聚焦于小伙伴创作的《Python多进程日志收集的实现思路有哪些》,敬请观看详情,探索知识的价值。以下视频、文章将为您系统阐述其核心内容与价值。如果您觉得《Python多进程日志收集的实现思路有哪些》有用,将其分享出去将是对创作者最好的鼓励。

Python多进程开发中,多个进程同时操作同一个日志文件时,会因为文件写入的竞争导致日志内容丢失、顺序错乱甚至文件损坏,因此需要根据实际场景选择合适的日志收集实现思路,保障日志的完整性和准确性。

Python多进程日志收集的实现思路有哪些

常见实现思路分析

1. 进程独立日志文件

每个子进程单独创建自己的日志文件,进程之间完全不共享日志写入对象,从根源上避免文件竞争问题。这种思路实现简单,适合进程数量固定且不需要统一汇总日志的场景。

实现时可以在子进程启动时,根据进程ID或者自定义标识生成独立的日志文件路径,每个进程的logging配置仅指向自己的文件。示例代码如下:

import logging
import multiprocessing
import os
import time

def init_process_logger(process_id):
    # 为每个进程创建独立的日志文件
    logger = logging.getLogger(f"process_{process_id}")
    logger.setLevel(logging.INFO)
    # 避免日志向上传递到根logger
    logger.propagate = False
    # 创建文件处理器,路径包含进程ID
    file_handler = logging.FileHandler(f"process_{process_id}_log.log", encoding="utf-8")
    formatter = logging.Formatter('%(asctime)s - %(processName)s - %(levelname)s - %(message)s')
    file_handler.setFormatter(formatter)
    logger.addHandler(file_handler)
    return logger

def worker_process(process_id):
    logger = init_process_logger(process_id)
    for i in range(5):
        logger.info(f"进程{process_id}输出第{i}条日志")
        time.sleep(0.1)

if __name__ == "__main__":
    process_list = []
    for i in range(3):
        p = multiprocessing.Process(target=worker_process, args=(i,), name=f"worker_{i}")
        process_list.append(p)
        p.start()
    for p in process_list:
        p.join()

2. 队列中转日志

创建一个公共的日志队列,所有子进程将日志内容发送到队列中,由单独的日志收集进程从队列中取出日志并写入统一文件,避免多进程直接操作日志文件。这种思路适合需要统一汇总所有进程日志的场景。

需要注意队列的传递方式,multiprocessing模块提供的Queue可以在进程间安全传递日志内容。示例代码如下:

import logging
import multiprocessing
from logging.handlers import QueueHandler, QueueListener
import time

def worker_process(log_queue, process_id):
    # 子进程使用QueueHandler将日志发送到队列
    logger = logging.getLogger(f"worker_{process_id}")
    logger.setLevel(logging.INFO)
    logger.propagate = False
    queue_handler = QueueHandler(log_queue)
    logger.addHandler(queue_handler)
    for i in range(5):
        logger.info(f"子进程{process_id}的第{i}条日志")
        time.sleep(0.1)

def log_listener(log_queue):
    # 日志收集进程,从队列取日志写入文件
    logger = logging.getLogger("log_listener")
    logger.setLevel(logging.INFO)
    file_handler = logging.FileHandler("all_process_log.log", encoding="utf-8")
    formatter = logging.Formatter('%(asctime)s - %(processName)s - %(levelname)s - %(message)s')
    file_handler.setFormatter(formatter)
    logger.addHandler(file_handler)
    # 启动QueueListener监听队列
    listener = QueueListener(log_queue, file_handler)
    listener.start()
    return listener

if __name__ == "__main__":
    log_queue = multiprocessing.Queue()
    # 启动日志收集进程
    listener = log_listener(log_queue)
    process_list = []
    for i in range(3):
        p = multiprocessing.Process(target=worker_process, args=(log_queue, i), name=f"worker_{i}")
        process_list.append(p)
        p.start()
    for p in process_list:
        p.join()
    # 所有子进程结束后停止监听
    listener.stop()

3. Socket转发日志

将日志通过Socket发送到远程或者本地的日志接收服务,由接收服务统一处理日志存储。这种思路适合分布式多进程场景,或者需要将日志输出到第三方日志系统的场景。

实现时可以使用logging的SocketHandler,子进程将日志序列化后发送到指定的Socket服务端,服务端接收后解析并写入文件。示例代码如下:

import logging
import multiprocessing
import socket
import pickle
import struct
import time
from logging.handlers import SocketHandler

class CustomSocketHandler(SocketHandler):
    def makePickle(self, record):
        # 自定义序列化逻辑,添加进程信息
        record.process_name = multiprocessing.current_process().name
        return pickle.dumps(record)

def log_server(host="127.0.0.1", port=9020):
    # 启动Socket日志接收服务
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind((host, port))
    sock.listen(5)
    print(f"日志服务启动,监听{host}:{port}")
    file_handler = logging.FileHandler("socket_log.log", encoding="utf-8")
    formatter = logging.Formatter('%(asctime)s - %(process_name)s - %(levelname)s - %(message)s')
    file_handler.setFormatter(formatter)
    while True:
        conn, addr = sock.accept()
        try:
            while True:
                # 读取日志长度
                chunk = conn.recv(4)
                if len(chunk) < 4:
                    break
                length = struct.unpack(">L", chunk)[0]
                # 读取完整日志数据
                data = b""
                while len(data) < length:
                    chunk = conn.recv(length - len(data))
                    if not chunk:
                        break
                    data += chunk
                if not data:
                    break
                # 反序列化日志记录
                record = pickle.loads(data)
                # 写入文件
                file_handler.handle(record)
        except Exception as e:
            print(f"接收日志出错:{e}")
        finally:
            conn.close()

def worker_process(process_id):
    logger = logging.getLogger(f"worker_{process_id}")
    logger.setLevel(logging.INFO)
    logger.propagate = False
    # 使用自定义SocketHandler发送日志
    socket_handler = CustomSocketHandler("127.0.0.1", 9020)
    logger.addHandler(socket_handler)
    for i in range(5):
        logger.info(f"Socket方式进程{process_id}的第{i}条日志")
        time.sleep(0.1)

if __name__ == "__main__":
    # 先启动日志服务进程
    server_process = multiprocessing.Process(target=log_server, daemon=True)
    server_process.start()
    time.sleep(1)
    process_list = []
    for i in range(3):
        p = multiprocessing.Process(target=worker_process, args=(i,), name=f"worker_{i}")
        process_list.append(p)
        p.start()
    for p in process_list:
        p.join()
    time.sleep(1)

不同思路的适用场景对比

可以根据实际需求选择对应的实现方案,以下是三种思路的对比:

实现思路适用场景优点缺点
进程独立日志文件进程数量少,不需要汇总日志实现简单,无额外依赖日志分散,不便于统一查看
队列中转日志单机多进程,需要统一汇总日志日志集中,无文件竞争依赖队列,收集进程故障会导致日志丢失
Socket转发日志分布式多进程,需要对接外部日志系统扩展性强,支持跨机器收集实现复杂,需要维护Socket服务

实现注意事项

  • 无论选择哪种思路,都要避免多个进程同时打开同一个日志文件进行写入操作,这是多进程日志问题的核心诱因。
  • 使用队列或者Socket方式时,要做好异常处理,避免子进程因为日志发送失败导致业务逻辑中断。
  • logging模块的logger对象不要在进程间直接传递,最好在子进程内部单独初始化,避免对象序列化问题。
  • 如果进程会频繁创建销毁,建议选择队列或者Socket方式,减少日志文件的创建和清理成本。

Python多进程日志收集logging_module修改时间:2026-07-02 12:33:52

免责声明:​ 已尽一切努力确保本网站所含信息的准确性。网站内容多为原创整理与精心编撰,观点力求客观中立。本站旨在免费分享,内容仅供个人学习、研究或参考使用。若引用了第三方作品,版权归原作者所有。如内容涉及您的权益,请联系我们处理。
内容垂直聚焦
专注技术核心技术栏目,确保每篇文章深度聚焦于实用技能。从代码技巧到架构设计,为用户提供无干扰的纯技术知识沉淀,精准满足专业提升需求。
知识结构清晰
覆盖从开发到部署的全链路。AI、前端、编程、数据库、服务器、建站、系统层层递进,构建清晰学习路径,帮助用户系统化掌握开发与运维所需的核心技术。
深度技术解析
拒绝泛泛而谈,深入技术细节与实践难点。无论是数据库优化还是服务器配置,均结合真实场景与代码示例进行剖析,致力于提供可直接应用于工作的解决方案。
专业领域覆盖
精准对应开发生命周期。从前端界面到后端编程,从数据库操作到服务器运维,形成完整闭环,一站式满足全栈工程师和运维人员的技术需求。
即学即用高效
内容强调实操性,步骤清晰、代码完整。用户可根据教程直接复现和应用于自身项目,显著缩短从学习到实践的距离,快速解决开发中的具体问题。
持续更新保障
专注既定技术方向进行长期、稳定的内容输出。确保各栏目技术文章持续更新迭代,紧跟主流技术发展趋势,为用户提供经久不衰的学习价值。