Skip to content

Commit

Permalink
Adapt for python 3.11 asyncio
Browse files Browse the repository at this point in the history
  • Loading branch information
Tihomir Heidelberg committed Jun 8, 2023
1 parent f7129c6 commit 5ac9b7b
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 28 deletions.
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = py-centrometal-web-boiler
version = 0.0.52
version = 0.0.53
author = Tihomir Heidelberg
author_email = tihomir.heidelberg@lite.hr
description = Python library to interact with Centrometal Boiler System.
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

setup(
name='py-centrometal-web-boiler',
version='0.0.52',
version='0.0.53',
description='Python library to interact with Centrometal Boiler System.',
author='Tihomir Heidelberg',
author_email='tihomir.heidelberg@lite.hr',
Expand Down
102 changes: 76 additions & 26 deletions src/centrometal_web_boiler/ws/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,54 +8,79 @@
from .wsprotocols import WSCProtocol
from .collector import EventCollector


class ClientSocket(BaseSocket):
def __init__(self):
super().__init__()
self.listeners['message'].append(self.on_message)
self.listeners['connect'].append(self.on_connect)
self.listeners['disconnect'].append(self.on_disconnect)
self.listeners["message"].append(self.on_message)
self.listeners["connect"].append(self.on_connect)
self.listeners["disconnect"].append(self.on_disconnect)
self.listeners.close.append(self.on_close)
self.connection = None
self.disconnection = None

def connect(self, uri: str, **kwargs):
kwargs.pop("create_protocol", None)
self.loop.run_until_complete(self.__main(uri, **kwargs))
self.loop.run_forever()

async def on_message(self, message):
pass

async def __message_consumer(self):
try:
async for message in self.connection:
try:
data = json.loads(message)
except JSONDecodeError:
data = message
message_cls: Message = Message(data=data, websocket=self.connection, created_at=datetime.utcnow())
self.loop.create_task(asyncio.wait(
[coro(message_cls) for coro in self.listeners['message']]+[self.__collector_verifier(futures, 'message', message_cls)
for futures in self.listeners.message_collector
]
))
message_cls: Message = Message(
data=data, websocket=self.connection, created_at=datetime.utcnow()
)
async with asyncio.TaskGroup() as tg:
for coro in self.listeners["message"]:
tg.create_task(coro(message_cls))
for futures in self.listeners.message_collector:
tg.create_task(
self.__collector_verifier(futures, "message", message_cls)
)
except ConnectionClosedError as e:
self.disconnection = Object({'code': e.code, 'reason': e.reason, 'disconnected': True})
self.loop.create_task(asyncio.wait([coro(e.code, e.reason) for coro in self.listeners.disconnect]+[
self.__collector_verifier(futures, 'disconnect', e.code, e.reason)
for futures in self.listeners.disconnect_collector
self.disconnection = Object(
{"code": e.code, "reason": e.reason, "disconnected": True}
)
self.loop.create_task(
asyncio.wait(
[coro(e.code, e.reason) for coro in self.listeners.disconnect]
+ [
self.__collector_verifier(
futures, "disconnect", e.code, e.reason
)
for futures in self.listeners.disconnect_collector
]
))
)
)
return e

async def __on_connect(self):
await asyncio.wait([coro() for coro in self.listeners['connect']])
async with asyncio.TaskGroup() as tg:
for coro in self.listeners["connect"]:
tg.create_task(coro())

async def on_connect(self):
pass

async def on_disconnect(self, code, reason):
pass

async def on_close(self, code, reason):
pass

async def __collector_verifier(self, futures, event, *event_data):
if futures[1](*event_data):
try:
futures[0].set_result(event_data[0] if len(event_data) == 1 else event_data)
futures[0].set_result(
event_data[0] if len(event_data) == 1 else event_data
)
self.listeners[f"{event}_collector"].remove(futures)
except asyncio.exceptions.InvalidStateError:
try:
Expand All @@ -64,19 +89,44 @@ async def __collector_verifier(self, futures, event, *event_data):
pass
except ValueError:
pass

def collector(self, time: float):
return EventCollector(websocket=self, time=time)

async def __main(self, uri, **kwargs):
self.connection = await websockets.connect(uri, create_protocol=WSCProtocol, **kwargs)
self.loop.create_task(self.__on_connect())
done, pending = await asyncio.wait([self.__message_consumer()], return_when=asyncio.ALL_COMPLETED)
if ConnectionClosedError in [type(ret.result()) for ret in done]: return
self.disconnection = Object({'code': self.connection.close_code, 'reason': self.connection.close_reason, 'disconnected': True})
await asyncio.wait([coro(self.connection.close_code, self.connection.close_reason) for coro in self.listeners.close]+[
self.__collector_verifier(futures, 'close', self.connection.close_code, self.connection.close_reason)
for futures in self.listeners.close_collector
])
self.connection = await websockets.connect(
uri, create_protocol=WSCProtocol, **kwargs
)
async with asyncio.TaskGroup() as tg:
tg.create_task(self.__on_connect())
done, pending = await asyncio.wait_for(self.__message_consumer(), timeout=None)
if ConnectionClosedError in [type(ret.result()) for ret in done]:
return
self.disconnection = Object(
{
"code": self.connection.close_code,
"reason": self.connection.close_reason,
"disconnected": True,
}
)
await asyncio.wait(
[
coro(self.connection.close_code, self.connection.close_reason)
for coro in self.listeners.close
]
+ [
self.__collector_verifier(
futures,
"close",
self.connection.close_code,
self.connection.close_reason,
)
for futures in self.listeners.close_collector
]
)

async def send(self, content: typing.Any = None, *, data: dict = None):
await self.connection.send(content=content, data=data)
async def close(self, code: int = 1000, reason: str = ''):

async def close(self, code: int = 1000, reason: str = ""):
await self.connection.close(code=code, reason=reason)

0 comments on commit 5ac9b7b

Please sign in to comment.