From fb25b4a28db4fb2c7547babc8a398a6bb84886d2 Mon Sep 17 00:00:00 2001 From: yumoqing Date: Fri, 15 May 2026 16:19:58 +0800 Subject: [PATCH] bugfix --- README.md | 47 ++++++++ appPublic/event_dispatcher.py | 198 ++++++++++++++++++++++++++++++++++ 2 files changed, 245 insertions(+) create mode 100644 appPublic/event_dispatcher.py diff --git a/README.md b/README.md index cf78d89..eb52656 100755 --- a/README.md +++ b/README.md @@ -1,3 +1,50 @@ # appPublic a set of icommon modules for python development +## EventDispatcher + +生产级异步事件调度器。 + +### 特性 + +- 支持普通函数 +- 支持 async 协程 +- 支持实例方法 +- 弱引用自动GC +- 异常隔离 +- 超时控制 +- 自定义错误处理 + +--- + +### 使用示例 + +```python +import asyncio + +from event_dispatcher import EventDispatcher + + +def on_login(data): + print(data) + + +async def main(): + + dispatcher = EventDispatcher() + + dispatcher.bind( + "login", + on_login + ) + + await dispatcher.dispatch( + "login", + { + "user": "张三" + } + ) + + +asyncio.run(main()) +``` diff --git a/appPublic/event_dispatcher.py b/appPublic/event_dispatcher.py new file mode 100644 index 0000000..d892e48 --- /dev/null +++ b/appPublic/event_dispatcher.py @@ -0,0 +1,198 @@ +import asyncio +import inspect +import traceback +import weakref +from typing import Callable, Any + + +class WeakCallback: + + def __init__(self, func: Callable): + + self._is_coroutine = inspect.iscoroutinefunction(func) + + if inspect.ismethod(func): + self._ref = weakref.WeakMethod(func) + else: + self._ref = weakref.ref(func) + + self._hash = hash(func) + + @property + def is_coroutine(self): + return self._is_coroutine + + def get(self): + return self._ref() + + def __eq__(self, other): + return isinstance(other, WeakCallback) and self._hash == other._hash + + def __hash__(self): + return self._hash + + +class EventDispatcher: + + def __init__( + self, + *, + continue_on_error=True, + log_traceback=True, + handler_timeout=None, + error_handler=None, + ): + + self._events = {} + + self.continue_on_error = continue_on_error + self.log_traceback = log_traceback + self.handler_timeout = handler_timeout + self.error_handler = error_handler + + def bind(self, event_name: str, func: Callable): + + if event_name not in self._events: + self._events[event_name] = set() + + self._events[event_name].add(WeakCallback(func)) + + def unbind(self, event_name: str, func: Callable): + + if event_name not in self._events: + return + + target = WeakCallback(func) + + self._events[event_name] = { + cb for cb in self._events[event_name] + if cb != target + } + + if not self._events[event_name]: + del self._events[event_name] + + async def _run_error_handler( + self, + event_name, + func, + exc, + ): + + if not self.error_handler: + return + + try: + + if inspect.iscoroutinefunction(self.error_handler): + await self.error_handler( + event_name, + func, + exc, + ) + else: + self.error_handler( + event_name, + func, + exc, + ) + + except Exception as e: + print(f"[EventDispatcher] error_handler failed: {e}") + + async def _execute_handler( + self, + cb, + event_name, + data, + ): + + func = cb.get() + + if func is None: + return False + + try: + + if cb.is_coroutine: + + coro = func(data) + + if self.handler_timeout: + await asyncio.wait_for( + coro, + timeout=self.handler_timeout, + ) + else: + await coro + + else: + + if self.handler_timeout: + + await asyncio.wait_for( + asyncio.to_thread(func, data), + timeout=self.handler_timeout, + ) + + else: + func(data) + + return True + + except Exception as e: + + print( + f"[EventDispatcher] " + f"handler failed: " + f"event={event_name}, " + f"handler={func}" + ) + + if self.log_traceback: + traceback.print_exc() + + await self._run_error_handler( + event_name, + func, + e, + ) + + if not self.continue_on_error: + raise + + return True + + async def dispatch( + self, + event_name: str, + data: Any = None, + ): + + if event_name not in self._events: + return + + dead_callbacks = [] + + for cb in list(self._events[event_name]): + + func = cb.get() + + if func is None: + dead_callbacks.append(cb) + continue + + await self._execute_handler( + cb, + event_name, + data, + ) + + for cb in dead_callbacks: + self._events[event_name].discard(cb) + + if ( + event_name in self._events + and not self._events[event_name] + ): + del self._events[event_name]