在文件管理、数据安全防护等场景中,实时掌握文件夹内的变动情况是实现文件审计的核心需求。Python的watchdog库是专门用于文件系统事件监听的第三方库,支持Windows、Linux、macOS等主流操作系统,能够精准捕获文件夹内文件的创建、修改、删除、移动等所有操作,非常适合用来实现文件实时审计功能。
watchdog库核心概念
使用watchdog库实现监控功能前,需要先了解几个核心组件:
- Observer:观察者对象,负责调度监控任务,监听文件系统事件并分发给对应的处理器
- EventHandler:事件处理器基类,所有自定义的事件处理逻辑都需要继承该类并重写对应的方法
- 监控事件类型:包含文件创建(CreatedEvent)、文件修改(ModifiedEvent)、文件删除(DeletedEvent)、文件移动(MovedEvent)等
环境准备
首先需要安装watchdog库,使用pip命令即可完成安装:
pip install watchdog
基础监控实现
下面先实现一个最基础的文件夹监控程序,能够打印出文件夹内发生的所有变动事件:
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
# 自定义事件处理器,继承FileSystemEventHandler
class FileAuditHandler(FileSystemEventHandler):
def on_created(self, event):
# 文件创建事件处理
if not event.is_directory:
print(f"文件创建: {event.src_path}")
def on_modified(self, event):
# 文件修改事件处理
if not event.is_directory:
print(f"文件修改: {event.src_path}")
def on_deleted(self, event):
# 文件删除事件处理
if not event.is_directory:
print(f"文件删除: {event.src_path}")
def on_moved(self, event):
# 文件移动事件处理
if not event.is_directory:
print(f"文件移动: 从 {event.src_path} 到 {event.dest_path}")
def start_monitor(path):
# 创建事件处理器实例
event_handler = FileAuditHandler()
# 创建观察者实例
observer = Observer()
# 设置监控目录,recursive=True表示递归监控子目录
observer.schedule(event_handler, path, recursive=True)
# 启动观察者
observer.start()
print(f"开始监控目录: {path}")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
# 捕获Ctrl+C中断,停止监控
observer.stop()
observer.join()
if __name__ == "__main__":
# 替换为需要监控的文件夹路径
monitor_path = "./test_dir"
start_monitor(monitor_path)
运行上述代码后,在./test_dir目录下创建、修改、删除或移动文件,控制台都会实时输出对应的变动信息。
实现文件实时审计功能
基础的事件打印无法满足审计需求,我们需要把变动信息持久化存储,同时增加更多审计相关的字段,比如操作时间、操作用户等。下面扩展上述代码,实现完整的文件实时审计功能:
import time
import json
import os
from datetime import datetime
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class FileRealTimeAuditHandler(FileSystemEventHandler):
def __init__(self, log_path):
# 审计日志存储路径
self.log_path = log_path
# 确保日志目录存在
log_dir = os.path.dirname(log_path)
if log_dir and not os.path.exists(log_dir):
os.makedirs(log_dir)
def _write_log(self, event_type, file_path, extra_info=None):
# 构造审计日志条目
log_item = {
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"event_type": event_type,
"file_path": file_path,
"extra_info": extra_info or {}
}
# 将日志写入文件,每条日志占一行
with open(self.log_path, "a", encoding="utf-8") as f:
f.write(json.dumps(log_item, ensure_ascii=False) + "n")
# 同时打印到控制台
print(f"审计记录: {log_item}")
def on_created(self, event):
if not event.is_directory:
self._write_log("文件创建", event.src_path)
def on_modified(self, event):
if not event.is_directory:
self._write_log("文件修改", event.src_path)
def on_deleted(self, event):
if not event.is_directory:
self._write_log("文件删除", event.src_path)
def on_moved(self, event):
if not event.is_directory:
extra = {"原路径": event.src_path, "新路径": event.dest_path}
self._write_log("文件移动", event.dest_path, extra)
def start_audit_monitor(monitor_path, log_path):
event_handler = FileRealTimeAuditHandler(log_path)
observer = Observer()
observer.schedule(event_handler, monitor_path, recursive=True)
observer.start()
print(f"文件实时审计已启动,监控目录: {monitor_path},日志存储: {log_path}")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
print("审计监控已停止")
observer.join()
if __name__ == "__main__":
# 监控目录路径
monitor_dir = "./important_files"
# 审计日志存储路径
audit_log = "./audit_logs/file_audit.log"
start_audit_monitor(monitor_dir, audit_log)
上述代码会将所有文件变动的审计记录以JSON格式写入指定的日志文件,每条记录包含操作时间、事件类型、文件路径以及额外的关联信息,方便后续查询和审计分析。
常见问题与优化
1. 重复触发修改事件
部分编辑器保存文件时会先创建临时文件再替换原文件,可能导致on_modified事件重复触发。可以通过增加文件路径去重、设置事件触发间隔阈值来优化,比如1秒内同一文件的修改事件只记录一次。
2. 监控大量文件性能问题
如果需要监控的目录包含大量文件或者深层嵌套子目录,可以适当调整Observer的轮询间隔,或者过滤不需要监控的文件类型,减少不必要的事件处理开销。
3. 跨平台兼容性
watchdog库已经做了跨平台适配,在Windows、Linux、macOS系统上都可以正常运行,不需要额外修改代码,仅需要注意不同系统下的文件路径格式差异即可。
总结
通过watchdog库可以快速实现文件夹变动监控和文件实时审计功能,核心逻辑是继承FileSystemEventHandler类重写事件处理方法,结合Observer对象启动监控任务。本文提供的示例代码可以直接落地使用,也可以根据实际需求扩展告警通知、变动文件内容备份等更多功能,满足不同场景下的文件审计需求。