基于优先级的异步日志写入系统结合线程池方案,能够有效平衡日志处理的实时性和系统资源占用,避免高优先级日志被低优先级日志阻塞,同时利用线程池复用线程减少线程创建销毁的开销。下面介绍完整的实现过程。
核心设计思路
整个系统分为三个核心部分:日志任务封装、优先级队列管理、线程池调度。日志任务需要携带优先级标识,优先级队列按照优先级排序任务,线程池中的工作线程从队列中获取任务执行日志写入操作。优先级越高,任务越先被处理。
日志任务结构设计
首先需要定义日志任务的结构,包含日志内容、优先级、时间戳等信息,同时需要支持优先级比较,方便优先级队列排序。
#include <string>
#include <chrono>
#include <iostream>
// 日志优先级枚举,数值越小优先级越高
enum class LogPriority {
DEBUG = 0,
INFO = 1,
WARN = 2,
ERROR = 3
};
// 日志任务结构体
struct LogTask {
std::string content; // 日志内容
LogPriority priority; // 日志优先级
std::chrono::system_clock::time_point timestamp; // 日志时间戳
// 优先级比较函数,用于优先级队列排序
bool operator<(const LogTask& other) const {
// 优先级相同则按照时间戳排序,早的先处理
if (priority == other.priority) {
return timestamp > other.timestamp;
}
return priority > other.priority;
}
};
优先级队列封装
使用C++标准库的priority_queue作为任务容器,但是需要保证线程安全,添加互斥锁和条件变量实现多线程下的安全访问。
#include <queue>
#include <mutex>
#include <condition_variable>
// 线程安全的优先级队列
class ThreadSafePriorityQueue {
private:
std::priority_queue<LogTask> queue;
mutable std::mutex mtx;
std::condition_variable cond;
public:
// 插入任务
void push(const LogTask& task) {
std::lock_guard<std::mutex> lock(mtx);
queue.push(task);
cond.notify_one(); // 通知等待的工作线程
}
// 取出任务,队列为空时阻塞等待
bool pop(LogTask& task) {
std::unique_lock<std::mutex> lock(mtx);
// 等待队列非空
cond.wait(lock, [this]() { return !queue.empty(); });
task = queue.top();
queue.pop();
return true;
}
// 判断队列是否为空
bool empty() const {
std::lock_guard<std::mutex> lock(mtx);
return queue.empty();
}
};
线程池实现
线程池需要管理一组工作线程,这些线程循环从优先级队列中获取日志任务并执行写入操作。线程池支持启动、停止,以及任务提交接口。
#include <vector>
#include <thread>
#include <atomic>
#include <functional>
class LogThreadPool {
private:
std::vector<std::thread> workers; // 工作线程集合
ThreadSafePriorityQueue taskQueue; // 任务优先级队列
std::atomic<bool> stopFlag; // 停止标志
// 工作线程执行函数
void workerThread() {
while (!stopFlag.load()) {
LogTask task;
// 从队列获取任务,队列为空时会阻塞
if (taskQueue.pop(task)) {
writeLog(task);
}
}
// 停止后处理剩余任务
while (!taskQueue.empty()) {
LogTask task;
taskQueue.pop(task);
writeLog(task);
}
}
// 实际写入日志的函数,可替换为文件写入、网络发送等逻辑
void writeLog(const LogTask& task) {
auto time = std::chrono::system_clock::to_time_t(task.timestamp);
std::string priorityStr;
switch (task.priority) {
case LogPriority::DEBUG: priorityStr = "DEBUG"; break;
case LogPriority::INFO: priorityStr = "INFO"; break;
case LogPriority::WARN: priorityStr = "WARN"; break;
case LogPriority::ERROR: priorityStr = "ERROR"; break;
}
// 这里输出到控制台,实际场景可替换为文件写入
std::cout << "[" << std::ctime(&time) << "] [" << priorityStr << "] " << task.content << std::endl;
}
public:
// 构造函数,初始化线程数量
explicit LogThreadPool(size_t threadNum) : stopFlag(false) {
for (size_t i = 0; i < threadNum; ++i) {
workers.emplace_back([this]() { workerThread(); });
}
}
// 提交日志任务
void submit(const LogTask& task) {
if (!stopFlag.load()) {
taskQueue.push(task);
}
}
// 停止线程池
void stop() {
stopFlag.store(true);
// 等待所有工作线程退出
for (auto& worker : workers) {
if (worker.joinable()) {
worker.join();
}
}
}
~LogThreadPool() {
if (!stopFlag.load()) {
stop();
}
}
};
系统整合与使用示例
将日志任务封装、优先级队列、线程池整合起来,对外提供简单的日志写入接口,调用方只需要传入日志内容和优先级即可。
#include <memory>
// 日志系统单例类
class AsyncLogSystem {
private:
std::unique_ptr<LogThreadPool> threadPool;
static AsyncLogSystem* instance;
static std::mutex instanceMtx;
AsyncLogSystem() {
// 初始化线程池,设置4个工作线程
threadPool = std::make_unique<LogThreadPool>(4);
}
public:
// 获取单例实例
static AsyncLogSystem* getInstance() {
std::lock_guard<std::mutex> lock(instanceMtx);
if (instance == nullptr) {
instance = new AsyncLogSystem();
}
return instance;
}
// 写入日志接口
void writeLog(const std::string& content, LogPriority priority) {
LogTask task;
task.content = content;
task.priority = priority;
task.timestamp = std::chrono::system_clock::now();
threadPool->submit(task);
}
// 销毁实例
static void destroy() {
std::lock_guard<std::mutex> lock(instanceMtx);
if (instance != nullptr) {
delete instance;
instance = nullptr;
}
}
};
// 静态成员初始化
AsyncLogSystem* AsyncLogSystem::instance = nullptr;
std::mutex AsyncLogSystem::instanceMtx;
// 使用示例
int main() {
AsyncLogSystem* logSystem = AsyncLogSystem::getInstance();
// 提交不同优先级的日志任务
logSystem->writeLog("这是一条调试日志", LogPriority::DEBUG);
logSystem->writeLog("这是一条信息日志", LogPriority::INFO);
logSystem->writeLog("这是一条错误日志", LogPriority::ERROR);
logSystem->writeLog("这是一条警告日志", LogPriority::WARN);
// 等待日志处理完成,实际场景中可根据需求调整等待逻辑
std::this_thread::sleep_for(std::chrono::seconds(1));
AsyncLogSystem::destroy();
return 0;
}
注意事项
- 日志写入的实际逻辑可根据需求替换,比如写入文件、发送到远程日志服务器等,注意文件写入需要额外做线程安全处理。
- 优先级队列的比较逻辑可以根据业务需求调整,比如相同优先级的任务可以按照提交顺序处理。
- 线程池的线程数量需要根据实际业务场景调整,避免线程过多导致上下文切换开销过大。
- 系统停止时需要保证所有已提交的日志任务都被处理完成,避免日志丢失。