perf: log system - keep file handle open + async queue writes

- Replace open/write/flush/close per log call with persistent file handle
- Use threading.Queue + background daemon thread for non-blocking writes
- Only flush on exception/critical levels or periodically (every 1s idle)
- Queue full protection: drop oldest entry instead of blocking event loop
- Eliminates disk I/O blocking on slow storage (NFS/cloud disk) during high traffic
This commit is contained in:
yumoqing 2026-05-26 13:18:47 +08:00
parent afe7637966
commit 5238a08309
25 changed files with 410 additions and 24 deletions

58
appPublic/README.md Normal file
View File

@ -0,0 +1,58 @@
# EventDispatcher
生产级异步事件调度器。
## 特性
- 支持普通函数
- 支持 async 协程
- 支持实例方法
- 弱引用自动GC
- 异常隔离
- 超时控制
- 自定义错误处理
---
# 使用示例
```python
import asyncio
from event_dispatcher import EventDispatcher
def on_login(data):
print(data)
async def main():
dispatcher = EventDispatcher()
dispatcher.bind(
"login",
on_login
)
await dispatcher.dispatch(
"login",
{
"user": "张三"
}
)
asyncio.run(main())
```
---
# 项目结构
```text
event_dispatcher_project/
├── event_dispatcher.py
└── README.md
```

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

289
appPublic/k Normal file
View File

@ -0,0 +1,289 @@
#!/usr/bin/env bash
set -e
PROJECT_DIR="event_dispatcher_project"
mkdir -p "${PROJECT_DIR}"
BT='```'
# =========================================================
# event_dispatcher.py
# =========================================================
cat > "${PROJECT_DIR}/event_dispatcher.py" <<'PYEOF'
import asyncio
import inspect
import traceback
import weakref
from typing import Callable, Any
class WeakCallback:
def __init__(self, func: Callable):
self._is_coroutine = inspect.iscoroutinefunction(func)
if inspect.ismethod(func):
self._ref = weakref.WeakMethod(func)
else:
self._ref = weakref.ref(func)
self._hash = hash(func)
@property
def is_coroutine(self):
return self._is_coroutine
def get(self):
return self._ref()
def __eq__(self, other):
return isinstance(other, WeakCallback) and self._hash == other._hash
def __hash__(self):
return self._hash
class EventDispatcher:
def __init__(
self,
*,
continue_on_error=True,
log_traceback=True,
handler_timeout=None,
error_handler=None,
):
self._events = {}
self.continue_on_error = continue_on_error
self.log_traceback = log_traceback
self.handler_timeout = handler_timeout
self.error_handler = error_handler
def bind(self, event_name: str, func: Callable):
if event_name not in self._events:
self._events[event_name] = set()
self._events[event_name].add(WeakCallback(func))
def unbind(self, event_name: str, func: Callable):
if event_name not in self._events:
return
target = WeakCallback(func)
self._events[event_name] = {
cb for cb in self._events[event_name]
if cb != target
}
if not self._events[event_name]:
del self._events[event_name]
async def _run_error_handler(
self,
event_name,
func,
exc,
):
if not self.error_handler:
return
try:
if inspect.iscoroutinefunction(self.error_handler):
await self.error_handler(
event_name,
func,
exc,
)
else:
self.error_handler(
event_name,
func,
exc,
)
except Exception as e:
print(f"[EventDispatcher] error_handler failed: {e}")
async def _execute_handler(
self,
cb,
event_name,
data,
):
func = cb.get()
if func is None:
return False
try:
if cb.is_coroutine:
coro = func(data)
if self.handler_timeout:
await asyncio.wait_for(
coro,
timeout=self.handler_timeout,
)
else:
await coro
else:
if self.handler_timeout:
await asyncio.wait_for(
asyncio.to_thread(func, data),
timeout=self.handler_timeout,
)
else:
func(data)
return True
except Exception as e:
print(
f"[EventDispatcher] "
f"handler failed: "
f"event={event_name}, "
f"handler={func}"
)
if self.log_traceback:
traceback.print_exc()
await self._run_error_handler(
event_name,
func,
e,
)
if not self.continue_on_error:
raise
return True
async def dispatch(
self,
event_name: str,
data: Any = None,
):
if event_name not in self._events:
return
dead_callbacks = []
for cb in list(self._events[event_name]):
func = cb.get()
if func is None:
dead_callbacks.append(cb)
continue
await self._execute_handler(
cb,
event_name,
data,
)
for cb in dead_callbacks:
self._events[event_name].discard(cb)
if (
event_name in self._events
and not self._events[event_name]
):
del self._events[event_name]
PYEOF
# =========================================================
# README.md
# =========================================================
cat > "${PROJECT_DIR}/README.md" <<EOF
# EventDispatcher
生产级异步事件调度器。
## 特性
- 支持普通函数
- 支持 async 协程
- 支持实例方法
- 弱引用自动GC
- 异常隔离
- 超时控制
- 自定义错误处理
---
# 使用示例
${BT}python
import asyncio
from event_dispatcher import EventDispatcher
def on_login(data):
print(data)
async def main():
dispatcher = EventDispatcher()
dispatcher.bind(
"login",
on_login
)
await dispatcher.dispatch(
"login",
{
"user": "张三"
}
)
asyncio.run(main())
${BT}
---
# 项目结构
${BT}text
event_dispatcher_project/
├── event_dispatcher.py
└── README.md
${BT}
EOF
echo "================================================="
echo "生成完成:"
echo " ${PROJECT_DIR}"
echo "================================================="
echo
echo "查看:"
echo
echo " cat ${PROJECT_DIR}/README.md"
echo

View File

@ -1,5 +1,7 @@
import sys
import codecs
import threading
import queue
from traceback import format_exc
from appPublic.timeUtils import timestampstr
from appPublic.Singleton import SingletonDecorator
@ -10,7 +12,6 @@ def my_function():
caller_frame = frame_info.f_back
file_name = inspect.getframeinfo(caller_frame).filename
line_number = inspect.getframeinfo(caller_frame).lineno
# print(f"Called from file: {file_name}, line: {line_number}")
@SingletonDecorator
@ -30,41 +31,80 @@ class MyLogger:
self.levelname = levelname
self.level = self.levels.get(levelname)
self.logfile = logfile
def open_logger(self):
self.logger = None
# Async write queue + background thread
self._q = queue.Queue(maxsize=10000)
self._worker = threading.Thread(target=self._write_loop, daemon=True)
self._worker.start()
def _get_logger(self):
"""Lazy open file handle, keep it open."""
if self.logger is not None:
return self.logger
if self.logfile:
self.logger = codecs.open(self.logfile, 'a', 'utf-8')
else:
self.logger = sys.stdout
def close_logger(self):
if self.logfile:
self.logger.close();
self.logger = None
self.logger = None
return self.logger
def _write_loop(self):
"""Background thread: drain queue and write to file."""
fh = None
while True:
try:
item = self._q.get(timeout=1.0)
if item is None:
# Poison pill: shut down
if fh is not None:
try:
fh.flush()
if fh is not sys.stdout:
fh.close()
except Exception:
pass
break
if fh is None:
fh = self._get_logger()
fh.write(item)
# Only flush on critical/exception to avoid blocking on slow disks
if item.find('[exception]') >= 0 or item.find('[critical]') >= 0:
fh.flush()
except queue.Empty:
# Periodic flush to prevent data loss on crash
if fh is not None:
try:
fh.flush()
except Exception:
pass
except Exception:
pass
def log(self, levelname, message, frame_info):
caller_frame = frame_info.f_back
filename = inspect.getframeinfo(caller_frame).filename
lineno = inspect.getframeinfo(caller_frame).lineno
level = self.levels.get(levelname)
if level > self.level:
# print(f'{level=},{self.level=}')
return
data = {
'timestamp':timestampstr(),
'name':self.name,
'levelname':levelname,
'message':message,
'filename':filename,
'lineno':lineno
'timestamp': timestampstr(),
'name': self.name,
'levelname': levelname,
'message': message,
'filename': filename,
'lineno': lineno
}
self.open_logger()
s = self.formater % data
self.logger.write(s)
self.logger.flush()
self.close_logger()
try:
self._q.put_nowait(s)
except queue.Full:
# Queue full: drop oldest (non-blocking, never stall the event loop)
try:
self._q.get_nowait()
self._q.put_nowait(s)
except Exception:
pass
def clientinfo(message):
frame_info = inspect.currentframe()
logger = MyLogger('Test')
@ -97,8 +137,7 @@ def critical(message):
def exception(message):
frame_info = inspect.currentframe()
tb_msg = format_exc()
tb_msg = format_exc()
msg = f'{message}\n{tb_msg}'
logger = MyLogger('exception')
logger.log('exception', msg, frame_info)