为什么多线程 Logging 需要特别注意?
在单线程程序中,你调用 logging.info("..."),日志信息被直接写入到配置好的处理器(如文件、控制台)中。

但在多线程环境下,如果多个线程同时调用 logging 方法,可能会发生以下问题:
- 竞争条件:多个线程同时尝试写入同一个文件或同一个控制台输出流,导致日志条目交错、顺序错乱,甚至数据损坏。
- 性能瓶颈:日志写入(尤其是写入磁盘)是一个相对较慢的操作,如果每个线程都要等待 I/O 完成,会严重影响程序的并发性能。
幸运的是,Python 的 logging 模块在设计上已经内置了线程安全的机制,我们只需要正确使用它,就能避免这些问题。
logging 模块的线程安全机制
Python 的 logging 模块通过一个 模块级别的锁 来保证线程安全。
logging._lock: 这是一个内部的threading.RLock(可重入锁)。- 工作原理:每当一个线程需要调用
Logger、Handler或Formatter的方法时,它会首先获取这个锁,在操作完成后,再释放锁,这样,在任何时刻,只有一个线程能执行核心的日志记录逻辑,从而避免了竞争条件。
你不需要手动加锁来保证 logging 的线程安全,直接在每个线程中像单线程一样使用 logging 即可。

基础实践:一个简单的多线程日志示例
让我们从一个最简单的例子开始,看看线程安全的日志是如何工作的。
import logging
import threading
import time
# 1. 配置日志系统 (通常在程序入口处只配置一次)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(threadName)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('app.log'), # 同时输出到文件和控制台
logging.StreamHandler()
]
)
def worker(num):
"""一个简单的线程工作函数"""
logger = logging.getLogger(f"Worker-{num}")
logger.info(f"Thread {num} has started.")
for i in range(3):
time.sleep(0.5)
logger.info(f"Thread {num} is working... count {i}")
logger.info(f"Thread {num} has finished.")
if __name__ == "__main__":
threads = []
# 创建并启动5个线程
for i in range(5):
thread = threading.Thread(target=worker, args=(i,), name=f"Thread-{i}")
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
logging.info("All threads have completed.")
代码解释:
basicConfig: 在main模块中,我们只调用一次basicConfig来配置日志的格式、级别和处理器,这是一个最佳实践。%(threadName)s: 在format字符串中,我们使用了%(threadName)s这个占位符,它会自动记录当前正在执行日志记录的线程名称,这对于调试多线程程序非常有用。logging.getLogger(f"Worker-{num}"): 在每个线程函数内部,我们获取一个Logger实例,虽然这里我们使用了不同的名字,但它们都指向同一个日志层次结构,并且最终都会使用我们在basicConfig中配置的处理器。threading.Thread: 创建并启动了5个线程,它们会并发执行worker函数。
运行结果 (app.log 和控制台输出): 你会看到日志条目虽然来自不同线程,但它们是完整且顺序正确的(顺序可能因线程调度而略有不同,但每个线程的日志块是完整的),没有出现交错的情况。
2025-10-27 10:30:00,123 - Thread-0 - INFO - Thread 0 has started.
2025-10-27 10:30:00,123 - Thread-1 - INFO - Thread 1 has started.
2025-10-27 10:30:00,123 - Thread-2 - INFO - Thread 2 has started.
2025-10-27 10:30:00,123 - Thread-3 - INFO - Thread 3 has started.
2025-10-27 10:30:00,123 - Thread-4 - INFO - Thread 4 has started.
2025-10-27 10:30:00,623 - Thread-0 - INFO - Thread 0 is working... count 0
2025-10-27 10:30:00,623 - Thread-1 - INFO - Thread 1 is working... count 0
... (其他线程的日志) ...
2025-10-27 10:30:02,123 - Thread-0 - INFO - Thread 0 has finished.
2025-10-27 10:30:02,123 - Thread-1 - INFO - Thread 1 has finished.
... (其他线程的日志) ...
2025-10-27 10:30:02,123 - MainThread - INFO - All threads have completed.
进阶实践:自定义 Handler 以提高性能
直接在每次日志记录时都进行磁盘 I/O,仍然是性能瓶颈,一个常见的优化模式是使用 QueueHandler 和 QueueListener。
QueueHandler: 线程安全的处理器,它不直接处理日志,而是将日志记录放入一个queue(队列) 中,然后立即返回,这极大地减少了日志记录的耗时。QueueListener: 在一个独立的、专门的线程中运行,它从queue中取出日志记录,然后分发给其他配置好的处理器(如FileHandler,StreamHandler)进行实际的 I/O 操作。
这种模式将日志记录的“生产”和“消费”解耦,是 Python 官方推荐的高性能多线程日志方案。
示例代码
import logging
import logging.handlers
import threading
import time
import queue
# 1. 创建一个队列作为日志消息的缓冲区
log_queue = queue.Queue()
queue_handler = logging.handlers.QueueHandler(log_queue)
# 2. 配置监听器及其最终处理器
# 这里我们配置一个文件处理器和一个控制台处理器
file_handler = logging.FileHandler('advanced_app.log')
console_handler = logging.StreamHandler()
# 设置日志格式
formatter = logging.Formatter('%(asctime)s - %(threadName)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# 3. 创建并启动 QueueListener
# 监听器会启动一个后台线程来处理队列中的日志
listener = logging.handlers.QueueListener(log_queue, file_handler, console_handler)
listener.start()
# 4. 获取根日志记录器并添加 QueueHandler
# 注意:这里不再使用 basicConfig
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(queue_handler)
def worker(num):
"""线程工作函数"""
logger.info(f"Thread {num} has started.")
for i in range(3):
time.sleep(0.5)
logger.info(f"Thread {num} is working... count {i}")
logger.info(f"Thread {num} has finished.")
if __name__ == "__main__":
threads = []
for i in range(5):
thread = threading.Thread(target=worker, args=(i,), name=f"Worker-{i}")
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
logger.info("All threads have completed.")
# 5. 停止监听器
# 这非常重要!它会等待队列中的所有消息被处理完毕,然后停止监听线程
listener.stop()
代码解释:
QueueHandler和Queue: 创建了一个队列和一个QueueHandler,所有线程的日志都会被快速地put进这个队列。QueueListener: 监听器被配置了两个“处理器(file_handler和console_handler),它在一个单独的线程中运行,不断地从log_queue中get日志记录,并交给这些最终处理器处理。logger.addHandler(queue_handler): 我们将QueueHandler添加到根Logger中,这意味着整个应用程序的日志都会通过这个高效、线程安全的路径。listener.start()和listener.stop(): 启动和停止监听器。stop()是必须的,它可以确保在程序退出前,队列中剩余的所有日志都被正确处理和写入,避免日志丢失。
最佳实践总结
- 配置一次:在应用程序的入口点(通常是
if __name__ == "__main__":或main()函数)集中配置一次日志系统,避免在多个地方重复配置。 - 使用
%(threadName)s:在日志格式字符串中包含%(threadName)s,这能极大地帮助你追踪问题来源。 - 对于简单应用:直接使用
logging.basicConfig即可,它已经内置了线程安全机制。 - 对于高性能或复杂应用:强烈推荐使用
QueueHandler+QueueListener模式,它能显著提升性能,并将日志 I/O 与业务逻辑彻底解耦。 logging.getLogger()是线程安全的:每个线程调用logging.getLogger()获取Logger实例是线程安全的,返回的Logger实例会被缓存,所以多次调用同名getLogger返回的是同一个对象。- 记得清理:如果使用了
QueueListener,请在程序退出前调用listener.stop()来确保所有日志都被刷新。
通过遵循这些原则,你可以轻松、安全、高效地在 Python 多线程应用程序中实现日志记录。
