Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add ready made ws support #2

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
add ws client/server
  • Loading branch information
jryannel committed Mar 30, 2023

Verified

This commit was signed with the committer’s verified signature. The key has expired.
addaleax Anna Henningsen
commit ef30e189487d279e9b0f67cb82dc72ac8881bf45
38 changes: 38 additions & 0 deletions src/olink/ws/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import asyncio
import websockets as ws
from .emitter import Emitter


class Client:
send_queue = asyncio.Queue()
recv_queue = asyncio.Queue()
node = None
def __init__(self, node):
self.node = node

def send(self, msg):
self.send_queue.put_nowait(msg)

async def handle_send(self):
async for msg in self.send_queue:
data = self.serializer.serialize(msg)
await self.conn.send(data)

async def handle_recv(self):
async for msg in self.recv_queue:
self.emitter.emit(msg.object, msg)

async def recv(self):
async for data in self.conn:
msg = self.serializer.deserialize(data)
self.recv_queue.put_nowait(msg)

async def connect(self, addr: str):
# connect to server
async for conn in ws.connect(addr):
self.conn = conn
# start send and recv tasks
await asyncio.gather(self.handle_send(), self.handle_recv(), self.recv())
# wait for all queues to be empty
await self.send_queue.join()
await self.recv_queue.join()
14 changes: 14 additions & 0 deletions src/olink/ws/emitter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@

class Emitter:
def __init__(self):
self._callbacks = {}

def on(self, event, callback):
self._callbacks[event] = callback

def emit(self, event, *args):
self._callbacks[event](*args)

def off(self, event):
self._callbacks.pop(event)

85 changes: 85 additions & 0 deletions src/olink/ws/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import websockets as ws
from .emitter import Emitter
from typing import Any
import asyncio
from .client import Client
from ..remotenode import IObjectSource, RemoteNode

class SourceAdapter(IObjectSource):
node: RemoteNode = None
object_id: str = None
def __init__(self, objectId: str, impl) -> None:
self.object_id = objectId
self.impl = impl
RemoteNode.register_source(self)

def olink_object_name() -> str:
return self.objectId

def olink_invoke(self, name: str, args: list[Any]) -> Any:
path = Name.path_from_name(name)
func = getattr(self.impl, path)
try:
result = func(**args)
except Exception as e:
print('error: %s' % e)
result = None
return result

def olink_set_property(self, name: str, value: Any):
# set property value on implementation
path = Name.path_from_name(name)
setattr(self, self.impl, value)

def olink_linked(self, name: str, node: "RemoteNode"):
# called when the source is linked to a client node
self.node = node

def olink_collect_properties(self) -> object:
# collect properties from implementation to send back to client node initially
return {k: getattr(self.impl, k) for k in ['count']}


class RemotePipe:
send_queue = asyncio.Queue()
recv_queue = asyncio.Queue()
node = RemoteNode()
def __init__(self, conn: ws.ClientConnection):
self.conn = conn
self.node.on_write(self._send)

def _send(self, data):
self.send_queue.put_nowait(data)

async def handle_send(self):
async for data in self.send_queue:
await self.conn.send(data)

async def handle_recv(self):
async for data in self.recv_queue:
self.node.handle_message(data)

async def recv(self):
async for data in self.conn:
self.recv_queue.put_nowait(data)

class Server:
pipes = []
def handle_connection(self, pipe: ws.WebSocketServerProtocol, path: str):
pipe = RemotePipe(pipe, self.serializer)
self.pipes.append(pipe)

async def serve(self, host: str, port: int):
async with ws.serve(self.handle_connection, host, port):
await asyncio.Future()




def run_server(host: str, port: int):
server = Server()
asyncio.run(server.serve(host, port))


if __name__ == "__main__":
run_server("localhost", 8152)