在真实的工厂车间里,一台设备的异常报警可能意味着数十万的损失。而那条关键的错误日志,往往就是唯一的"案发现场还原"线索。
说真的,工控软件的日志问题,是我职业生涯里踩得最深的坑之一。
刚入行那会儿,我负责维护一套PLC通信程序。某天凌晨两点,产线突然停了。翻遍整个系统,日志文件里只有寥寥几行print("error")——连时间戳都没有。那一夜,我和同事对着设备发呆了三个小时,愣是没定位到根因。
这种痛,相信很多做工控、MES、SCADA系统的朋友都懂。
工业场景的日志需求,和普通Web应用完全不是一个量级:多线程并发写入、跨设备数据聚合、毫秒级时序追踪、海量数据的长期归档……随便拎出一条,都够折腾一阵子。
本文就从实战出发,带你搭一套真正能在工业环境里"扛造"的Python日志系统。代码全部可运行,架构可直接迁移到生产项目。
先把问题说清楚,再谈方案。
普通应用的日志,核心诉求是记录和排错。但工业系统的日志,承担的职责要复杂得多——
时序精度要求极高。 一条焊接指令和一条质检结果,如果时间戳偏差超过50ms,数据关联就会失效。这不是"差不多"能过去的事。
多源并发是常态。 同一时刻,温控模块、运动控制模块、视觉检测模块可能同时在写日志。锁竞争、写入顺序错乱,是家常便饭。
日志本身就是业务数据。 工厂的质量追溯、工艺优化,全靠历史日志。这意味着日志不能丢、不能乱、还得方便查。
存储压力大。 一条产线一天可能产生几个GB的原始日志。怎么压缩、怎么分片、怎么归档,都得提前设计好。
带着这四个问题,咱们开始搭架子。
太多项目,把所有日志逻辑塞进一个utils.py,然后在几十个模块里import来import去。这玩意儿,短期看没问题,长期必然是一团乱麻。
工业日志系统,我建议采用三层架构:

业务层不关心日志怎么存、存哪里。门面层负责统一收口、注入设备ID/工单号等上下文。处理器层各司其职,互不干扰。
Python标准库的logging模块本身是线程安全的——但很多人不知道,FileHandler在Windows下的多进程写入是有问题的。工控机上跑多进程采集的场景,必须用RotatingFileHandler配合文件锁,或者直接上队列方案。
pythonimport logging
import logging.handlers
import threading
import queue
from datetime import datetime
from pathlib import Path
class IndustrialLoggerFactory:
"""
工业日志工厂类
核心设计:单例 + 异步队列写入,避免IO阻塞业务线程
"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self.log_dir = Path("logs")
self.log_dir.mkdir(exist_ok=True)
# 异步队列:业务线程只管往队列扔,IO线程负责实际写入
self._log_queue = queue.Queue(maxsize=10000)
self._setup_handlers()
self._start_async_worker()
self._initialized = True
def _setup_handlers(self):
"""配置多目标Handler"""
self.logger = logging.getLogger("industrial_system")
self.logger.setLevel(logging.DEBUG)
# 按日期滚动的文件Handler
file_handler = logging.handlers.TimedRotatingFileHandler(
filename=self.log_dir / "system.log",
when="midnight", # 每天零点切割
interval=1,
backupCount=90, # 保留90天
encoding="utf-8"
)
# 关键告警单独存一份,方便快速检索
alarm_handler = logging.handlers.RotatingFileHandler(
filename=self.log_dir / "alarm.log",
maxBytes=50 * 1024 * 1024, # 50MB切割
backupCount=20,
encoding="utf-8"
)
alarm_handler.setLevel(logging.WARNING)
formatter = IndustrialFormatter()
file_handler.setFormatter(formatter)
alarm_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
self.logger.addHandler(alarm_handler)
def _start_async_worker(self):
"""启动后台IO线程,消费日志队列"""
worker = threading.Thread(
target=self._async_write_worker,
daemon=True, # 随主进程退出,不阻塞关闭
name="LogIOWorker"
)
worker.start()
def _async_write_worker(self):
while True:
try:
record = self._log_queue.get(timeout=1)
if record is None: # 优雅停止信号
break
self.logger.handle(record)
except queue.Empty:
continue
这里有个细节值得注意:daemon=True让IO线程随主进程退出,但这意味着程序崩溃时队列里可能还有未写入的日志。生产环境里,建议在主程序的finally块里发送停止信号,等队列清空再退出。
标准的日志格式对工业系统来说信息量严重不足。我们需要在每条日志里自动带上设备编号、工单号、操作员ID这些关键字段。
pythonimport json
import traceback
class IndustrialFormatter(logging.Formatter):
"""
结构化日志格式器
输出JSON格式,方便后续用ELK或自研平台解析
"""
# 线程本地存储:每个线程独立维护自己的上下文
_context = threading.local()
@classmethod
def set_context(cls, **kwargs):
"""在业务代码入口处设置上下文,后续日志自动携带"""
for key, value in kwargs.items():
setattr(cls._context, key, value)
@classmethod
def clear_context(cls):
cls._context.__dict__.clear()
def format(self, record: logging.LogRecord) -> str:
# 基础字段
log_entry = {
"timestamp": datetime.fromtimestamp(record.created).strftime(
"%Y-%m-%d %H:%M:%S.%f"
)[:-3], # 精确到毫秒
"level": record.levelname,
"module": record.module,
"func": record.funcName,
"line": record.lineno,
"message": record.getMessage(),
# 从线程本地存储中读取业务上下文
"device_id": getattr(self._context, "device_id", "UNKNOWN"),
"work_order": getattr(self._context, "work_order", None),
"operator": getattr(self._context, "operator", None),
}
# 异常信息单独格式化,保留完整堆栈
if record.exc_info:
log_entry["exception"] = {
"type": record.exc_info[0].__name__,
"message": str(record.exc_info[1]),
"traceback": traceback.format_exception(*record.exc_info)
}
# 过滤None值,减少存储冗余
log_entry = {k: v for k, v in log_entry.items() if v is not None}
return json.dumps(log_entry, ensure_ascii=False)
线程本地存储(threading.local())是这里的关键。 每个采集线程处理不同设备,上下文互不污染。在线程入口处调一次set_context(device_id="PLC_001"),后续这个线程产生的所有日志都会自动带上设备ID。
有了底层基础设施,再封装一个面向业务的门面类。让业务代码写日志时,不需要关心任何底层细节。
pythonfrom contextlib import contextmanager
from functools import wraps
import time
class DeviceLogger:
"""
设备日志门面
业务模块统一通过这个类记录日志
"""
def __init__(self, device_id: str, device_type: str = "GENERIC"):
self.device_id = device_id
self.device_type = device_type
self._factory = IndustrialLoggerFactory()
# 初始化时就把设备信息注入上下文
IndustrialFormatter.set_context(
device_id=device_id,
device_type=device_type
)
def info(self, message: str, **extra):
self._log(logging.INFO, message, **extra)
def warning(self, message: str, **extra):
self._log(logging.WARNING, message, **extra)
def error(self, message: str, exc_info=False, **extra):
self._log(logging.ERROR, message, exc_info=exc_info, **extra)
def critical(self, message: str, **extra):
self._log(logging.CRITICAL, message, **extra)
def _log(self, level: int, message: str, exc_info=False, **extra):
record = logging.LogRecord(
name="industrial_system",
level=level,
pathname="",
lineno=0,
msg=message,
args=(),
exc_info=logging.sys.exc_info() if exc_info else None
)
# 把extra字段挂到record上,Formatter可以读取
for key, value in extra.items():
setattr(record, key, value)
# 非阻塞放入队列
try:
self._factory._log_queue.put_nowait(record)
except queue.Full:
# 队列满了,这条日志只能丢弃——但这本身也是个告警信号
print(f"[WARN] Log queue full, dropping: {message}")
@contextmanager
def operation_trace(self, operation_name: str):
"""
上下文管理器:自动记录操作耗时
用法:with logger.operation_trace("焊接动作"):
"""
start = time.perf_counter()
self.info(f"[START] {operation_name}")
try:
yield
elapsed = (time.perf_counter() - start) * 1000
self.info(f"[END] {operation_name} | 耗时: {elapsed:.2f}ms")
except Exception as e:
elapsed = (time.perf_counter() - start) * 1000
self.error(
f"[FAILED] {operation_name} | 耗时: {elapsed:.2f}ms | 原因: {e}",
exc_info=True
)
raise # 异常继续向上传播,不在日志层吞掉
def log_device_action(operation: str):
"""
装饰器版本:给函数自动加上日志追踪
适合那些每次调用都需要记录的设备操作函数
"""
def decorator(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
# 假设self上有logger属性
logger = getattr(self, 'logger', None)
if logger and hasattr(logger, 'operation_trace'):
with logger.operation_trace(operation):
return func(self, *args, **kwargs)
return func(self, *args, **kwargs)
return wrapper
return decorator
把上面这些拼起来,看看实际用起来是什么感觉:
pythonclass PLCController:
"""PLC控制器示例:展示日志系统在真实业务中的用法"""
def __init__(self, plc_id: str):
self.plc_id = plc_id
self.logger = DeviceLogger(
device_id=plc_id,
device_type="SIEMENS_S7"
)
self.connected = False
@log_device_action("PLC连接建立")
def connect(self, ip: str, port: int = 102):
"""建立PLC连接"""
self.logger.info(f"尝试连接 {ip}:{port}")
# 模拟连接逻辑...
self.connected = True
self.logger.info("连接成功", ip=ip, port=port)
def read_registers(self, start_addr: int, count: int) -> list:
"""读取寄存器数据"""
if not self.connected:
self.logger.error("读取失败:设备未连接")
raise ConnectionError("PLC未连接")
with self.logger.operation_trace(f"读取寄存器 DB{start_addr}[0..{count}]"):
# 实际项目里这里是snap7或pycomm3的调用
data = [0] * count
self.logger.info(
f"寄存器读取完成",
start_addr=start_addr,
count=count,
sample_value=data[0] if data else None
)
return data
def write_coil(self, addr: int, value: bool):
"""写线圈——这类操作必须有完整的操作记录"""
IndustrialFormatter.set_context(
device_id=self.plc_id,
action_type="WRITE",
target_addr=addr
)
self.logger.warning(
f"写入线圈 addr={addr}, value={value}",
safety_level="MEDIUM"
)
# 使用示例
if __name__ == "__main__":
# 设置当前工单上下文
IndustrialFormatter.set_context(
work_order="WO-20260318-001",
operator="张工"
)
plc = PLCController("PLC_LINE_A_01")
plc.connect("192.168.1.100")
data = plc.read_registers(start_addr=100, count=10)
plc.write_coil(addr=200, value=True)
运行后,logs/system.log里的每一行都是结构化JSON,长这样:
json{
"timestamp": "2026-03-18 07:16:49.698",
"level": "INFO",
"module": "plc_controller",
"func": "connect",
"line": 28,
"message": "连接成功",
"device_id": "PLC_LINE_A_01",
"device_type": "SIEMENS_S7",
"work_order": "WO-20260318-001",
"operator": "张工",
"ip": "192.168.1.100",
"port": 102
}

坑1:日志队列满了怎么办? 上面代码里用的是put_nowait,队列满直接丢弃。这在大多数场景是合理的——日志不能反过来阻塞业务。但如果你的场景要求日志绝对不丢,可以改成put(block=True, timeout=0.1),超时后写到应急文件。
坑2:Windows下的文件时间戳精度问题。 datetime.now()在Windows上的精度只有约15ms,不足以区分高频操作。建议改用time.perf_counter()计算相对时间差,或者引入time.time_ns()。
坑3:JSON序列化遇到numpy类型。 工控采集数据里经常有numpy.float32、numpy.int64这类类型,直接json.dumps会报错。需要自定义一个JSONEncoder:
pythonimport numpy as np
class IndustrialJSONEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, np.integer):
return int(obj)
if isinstance(obj, np.floating):
return float(obj)
if isinstance(obj, np.ndarray):
return obj.tolist()
return super().default(obj)
# 使用时替换json.dumps的encoder参数
json.dumps(log_entry, cls=IndustrialJSONEncoder, ensure_ascii=False)
坑4:多进程场景下的Handler冲突。 如果你的系统是多进程架构(比如用multiprocessing跑多个设备采集进程),每个进程都有自己的FileHandler,同时写同一个文件会导致内容交错。标准解法是用QueueHandler + 独立的日志服务进程,或者每个进程写独
本文作者:技术老小子
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!