-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathevents.py
32 lines (25 loc) · 1.1 KB
/
events.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import asyncio
import logging
logging.basicConfig(level=logging.INFO)
class EventBus:
def __init__(self):
self.subscribers: dict[str|int,list] = {}
async def subscribe(self, event_type, handler):
if event_type not in self.subscribers:
self.subscribers[event_type] = []
self.subscribers[event_type].append(handler)
async def unsubscribe(self, event_type, handler):
if (
event_type in self.subscribers
and handler in self.subscribers[event_type]
):
self.subscribers[event_type].remove(handler)
async def publish(self, event_type, *args, **kwargs):
if event_type in self.subscribers:
try:
async with asyncio.TaskGroup() as tg:
for handler in self.subscribers[event_type]:
tg.create_task(handler(*args, **kwargs))
except* Exception as e:
for error in e.exceptions:
logging.error(f"Error while executing handler {handler.__name__} for event {event_type}: {error}")