Skip to content

Commit

Permalink
Merge pull request #41 from VigneshVSV/develop
Browse files Browse the repository at this point in the history
Bug fix serializer customization for event affordance
  • Loading branch information
VigneshVSV authored Sep 22, 2024
2 parents 3d78a7b + b317010 commit d651a50
Show file tree
Hide file tree
Showing 9 changed files with 163 additions and 67 deletions.
12 changes: 9 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
✓ means ready to try

- cookie auth & its specification in TD (cookie auth branch)
- image event handlers (develop branch) for streaming live video as JPEG and PNG ✓
- pydantic support for property models (develop branch) ✓
- adding custom handlers for each property, action and event to override default behaviour
- pydantic support for property models

## [v0.2.6] - 2024-09-09

- bug fix events when multiple serializers are used
- events support custom HTTP handlers (not polished yet, use as last resort), not yet compatible with TD
- image event handlers for streaming live video as JPEG and PNG, not yet compatible with TD

## [v0.2.5] - 2024-09-09

- released to anaconda
- released to anaconda, it can take a while to turn up. A badge will be added in README when successful.

## [v0.2.4] - 2024-09-09

Expand Down
16 changes: 10 additions & 6 deletions hololinked/client/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ def load_thing(self):
elif data.what == ResourceTypes.EVENT:
assert isinstance(data, ServerSentEvent)
event = _Event(self.zmq_client, data.name, data.obj_name, data.unique_identifier, data.socket_address,
serializer=self.zmq_client.zmq_serializer, logger=self.logger)
serialization_specific=data.serialization_specific, serializer=self.zmq_client.zmq_serializer, logger=self.logger)
_add_event(self, event, data)
self.__dict__[data.name] = event

Expand Down Expand Up @@ -755,17 +755,19 @@ def oneway_set(self, value : typing.Any) -> None:

class _Event:

__slots__ = ['_zmq_client', '_name', '_obj_name', '_unique_identifier', '_socket_address', '_callbacks',
__slots__ = ['_zmq_client', '_name', '_obj_name', '_unique_identifier', '_socket_address', '_callbacks', '_serialization_specific',
'_serializer', '_subscribed', '_thread', '_thread_callbacks', '_event_consumer', '_logger']
# event subscription
# Dont add class doc otherwise __doc__ in slots will conflict with class variable

def __init__(self, client : SyncZMQClient, name : str, obj_name : str, unique_identifier : str, socket : str,
serializer : BaseSerializer = None, logger : logging.Logger = None) -> None:
serialization_specific : bool = False, serializer : BaseSerializer = None, logger : logging.Logger = None) -> None:
self._zmq_client = client
self._name = name
self._obj_name = obj_name
self._unique_identifier = unique_identifier
self._socket_address = socket
self._serialization_specific = serialization_specific
self._callbacks = None
self._serializer = serializer
self._logger = logger
Expand All @@ -781,9 +783,11 @@ def add_callbacks(self, callbacks : typing.Union[typing.List[typing.Callable], t

def subscribe(self, callbacks : typing.Union[typing.List[typing.Callable], typing.Callable],
thread_callbacks : bool = False):
self._event_consumer = EventConsumer(self._unique_identifier, self._socket_address,
f"{self._name}|RPCEvent|{uuid.uuid4()}", b'PROXY',
zmq_serializer=self._serializer, logger=self._logger)
self._event_consumer = EventConsumer(
'zmq-' + self._unique_identifier if self._serialization_specific else self._unique_identifier,
self._socket_address, f"{self._name}|RPCEvent|{uuid.uuid4()}", b'PROXY',
zmq_serializer=self._serializer, logger=self._logger
)
self.add_callbacks(callbacks)
self._subscribed = True
self._thread_callbacks = thread_callbacks
Expand Down
70 changes: 63 additions & 7 deletions hololinked/server/HTTPServer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
from dataclasses import dataclass
import zmq
import zmq.asyncio
import logging
Expand All @@ -14,19 +15,34 @@
from ..param import Parameterized
from ..param.parameters import (Integer, IPAddress, ClassSelector, Selector, TypedList, String)
from .constants import ZMQ_PROTOCOLS, CommonRPC, HTTPServerTypes, ResourceTypes, ServerMessage
from .utils import get_IP_from_interface
from .utils import get_IP_from_interface, issubklass
from .dataklasses import HTTPResource, ServerSentEvent
from .utils import get_default_logger
from .serializers import JSONSerializer
from .database import ThingInformation
from .zmq_message_brokers import AsyncZMQClient, MessageMappedZMQClientPool
from .handlers import RPCHandler, BaseHandler, EventHandler, ThingsHandler, StopHandler
from .schema_validators import BaseSchemaValidator, JsonSchemaValidator
from .events import Event
from .eventloop import EventLoop
from .config import global_config




@dataclass
class InteractionAffordance:
URL_path : str
obj : Event # typing.Union[Property, Action, Event]
http_methods : typing.Tuple[str, typing.Optional[str], typing.Optional[str]]
handler : BaseHandler
kwargs : dict

def __eq__(self, other : "InteractionAffordance") -> bool:
return self.obj == other.obj



class HTTPServer(Parameterized):
"""
HTTP(s) server to route requests to ``Thing``.
Expand Down Expand Up @@ -63,7 +79,7 @@ class HTTPServer(Parameterized):
Unlike pure CORS, the server resource is not even executed if the client is not
an allowed client. if None any client is served.""")
host = String(default=None, allow_None=True,
doc="Host Server to subscribe to coordinate starting sequence of remote objects & web GUI" ) # type: str
doc="Host Server to subscribe to coordinate starting sequence of things & web GUI" ) # type: str
# network_interface = String(default='Ethernet',
# doc="Currently there is no logic to detect the IP addresss (as externally visible) correctly, \
# therefore please send the network interface name to retrieve the IP. If a DNS server is present, \
Expand Down Expand Up @@ -138,6 +154,7 @@ def __init__(self, things : typing.List[str], *, port : int = 8080, address : st
self._zmq_protocol = ZMQ_PROTOCOLS.IPC
self._zmq_inproc_socket_context = None
self._zmq_inproc_event_context = None
self._local_rules = dict() # type: typing.Dict[str, typing.List[InteractionAffordance]]

@property
def all_ok(self) -> bool:
Expand All @@ -147,6 +164,9 @@ def all_ok(self) -> bool:
f"{self.address}:{self.port}"),
self.log_level)

if self._zmq_protocol == ZMQ_PROTOCOLS.INPROC and (self._zmq_inproc_socket_context is None or self._zmq_inproc_event_context is None):
raise ValueError("Inproc socket context is not provided. Logic Error.")

self.app = Application(handlers=[
(r'/remote-objects', ThingsHandler, dict(request_handler=self.request_handler,
event_handler=self.event_handler)),
Expand Down Expand Up @@ -250,7 +270,7 @@ async def update_router_with_thing(self, client : AsyncZMQClient):
# Just to avoid duplication of this call as we proceed at single client level and not message mapped level
return
self._lost_things[client.instance_name] = client
self.logger.info(f"attempting to update router with remote object {client.instance_name}.")
self.logger.info(f"attempting to update router with thing {client.instance_name}.")
while True:
try:
await client.handshake_complete()
Expand All @@ -272,7 +292,13 @@ async def update_router_with_thing(self, client : AsyncZMQClient):
)))
elif http_resource["what"] == ResourceTypes.EVENT:
resource = ServerSentEvent(**http_resource)
handlers.append((instruction, self.event_handler, dict(
if resource.class_name in self._local_rules and any(ia.obj._obj_name == resource.obj_name for ia in self._local_rules[resource.class_name]):
for ia in self._local_rules[resource.class_name]:
if ia.obj._obj_name == resource.obj_name:
handlers.append((f'/{client.instance_name}{ia.URL_path}', ia.handler, dict(resource=resource, validator=None,
owner=self, **ia.kwargs)))
else:
handlers.append((instruction, self.event_handler, dict(
resource=resource,
validator=None,
owner=self
Expand Down Expand Up @@ -306,10 +332,11 @@ def __init__(
to make RPCHandler work
"""
self.app.wildcard_router.add_rules(handlers)
self.logger.info(f"updated router with remote object {client.instance_name}.")
self.logger.info(f"updated router with thing {client.instance_name}.")
break
except Exception as ex:
self.logger.error(f"error while trying to update router with remote object - {str(ex)}. " +
print("error", ex)
self.logger.error(f"error while trying to update router with thing - {str(ex)}. " +
"Trying again in 5 seconds")
await asyncio.sleep(5)

Expand All @@ -328,10 +355,39 @@ def __init__(
raise_client_side_exception=True
)
except Exception as ex:
self.logger.error(f"error while trying to update remote object with HTTP server details - {str(ex)}. " +
self.logger.error(f"error while trying to update thing with HTTP server details - {str(ex)}. " +
"Trying again in 5 seconds")
self.zmq_client_pool.poller.register(client.socket, zmq.POLLIN)
self._lost_things.pop(client.instance_name)


def add_event(self, URL_path : str, event : Event, handler : typing.Optional[BaseHandler] = None,
**kwargs) -> None:
"""
Add an event to be served by HTTP server
Parameters
----------
URL_path : str
URL path to access the event
event : Event
Event to be served
handler : BaseHandler, optional
custom handler for the event
kwargs : dict
additional keyword arguments to be passed to the handler's __init__
"""
if not isinstance(event, Event):
raise TypeError("event should be of type Event")
if not issubklass(handler, BaseHandler):
raise TypeError("handler should be subclass of BaseHandler")
if event.owner.__name__ not in self._local_rules:
self._local_rules[event.owner.__name__] = []
obj = InteractionAffordance(URL_path=URL_path, obj=event,
http_methods=('GET',), handler=handler or self.event_handler,
kwargs=kwargs)
if obj not in self._local_rules[event.owner.__name__]:
self._local_rules[event.owner.__name__].append(obj)


__all__ = [
Expand Down
20 changes: 17 additions & 3 deletions hololinked/server/dataklasses.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ class HTTPResource(SerializableDataclass):
pass the request as a argument to the callable. For HTTP server ``tornado.web.HTTPServerRequest`` will be passed.
"""
what : str
class_name : str # just metadata
instance_name : str
obj_name : str
fullpath : str
Expand All @@ -298,10 +299,11 @@ class HTTPResource(SerializableDataclass):
request_as_argument : bool = field(default=False)


def __init__(self, *, what : str, instance_name : str, obj_name : str, fullpath : str,
def __init__(self, *, what : str, class_name : str, instance_name : str, obj_name : str, fullpath : str,
request_as_argument : bool = False, argument_schema : typing.Optional[JSON] = None,
**instructions) -> None:
self.what = what
self.class_name = class_name
self.instance_name = instance_name
self.obj_name = obj_name
self.fullpath = fullpath
Expand Down Expand Up @@ -340,6 +342,7 @@ class ZMQResource(SerializableDataclass):
argument schema of the method/action for validation before passing over the instruction to the RPC server.
"""
what : str
class_name : str # just metadata
instance_name : str
instruction : str
obj_name : str
Expand All @@ -350,10 +353,11 @@ class ZMQResource(SerializableDataclass):
return_value_schema : typing.Optional[JSON]
request_as_argument : bool = field(default=False)

def __init__(self, *, what : str, instance_name : str, instruction : str, obj_name : str,
def __init__(self, *, what : str, class_name : str, instance_name : str, instruction : str, obj_name : str,
qualname : str, doc : str, top_owner : bool, argument_schema : typing.Optional[JSON] = None,
return_value_schema : typing.Optional[JSON] = None, request_as_argument : bool = False) -> None:
self.what = what
self.class_name = class_name
self.instance_name = instance_name
self.instruction = instruction
self.obj_name = obj_name
Expand Down Expand Up @@ -390,7 +394,9 @@ class ServerSentEvent(SerializableDataclass):
"""
name : str = field(default=UNSPECIFIED)
obj_name : str = field(default=UNSPECIFIED)
class_name : str = field(default=UNSPECIFIED) # just metadata
unique_identifier : str = field(default=UNSPECIFIED)
serialization_specific : bool = field(default=False)
socket_address : str = field(default=UNSPECIFIED)
what : str = field(default=ResourceTypes.EVENT)

Expand All @@ -404,7 +410,7 @@ def build_our_temp_TD(instance):

assert isinstance(instance, Thing), f"got invalid type {type(instance)}"

our_TD = instance.get_thing_description()
our_TD = instance.get_thing_description(ignore_errors=True)
our_TD["inheritance"] = [class_.__name__ for class_ in instance.__class__.mro()]

for instruction, remote_info in instance.instance_resources.items():
Expand Down Expand Up @@ -470,13 +476,15 @@ def get_organised_resources(instance):

httpserver_resources[fullpath] = HTTPResource(
what=ResourceTypes.PROPERTY,
class_name=instance.__class__.__name__,
instance_name=instance._owner.instance_name if instance._owner is not None else instance.instance_name,
obj_name=remote_info.obj_name,
fullpath=fullpath,
**instructions
)
zmq_resources[fullpath] = ZMQResource(
what=ResourceTypes.PROPERTY,
class_name=instance.__class__.__name__,
instance_name=instance._owner.instance_name if instance._owner is not None else instance.instance_name,
instruction=fullpath,
doc=prop.__doc__,
Expand All @@ -494,6 +502,8 @@ def get_organised_resources(instance):
assert isinstance(prop._observable_event_descriptor, Event), f"observable event not yet set for {prop.name}. logic error."
evt_fullpath = f"{instance._full_URL_path_prefix}{prop._observable_event_descriptor.URL_path}"
dispatcher = EventDispatcher(evt_fullpath)
dispatcher._remote_info.class_name = instance.__class__.__name__
dispatcher._remote_info.serialization_specific = instance.zmq_serializer != instance.http_serializer
setattr(instance, prop._observable_event_descriptor._obj_name, dispatcher)
# prop._observable_event_descriptor._remote_info.unique_identifier = evt_fullpath
httpserver_resources[evt_fullpath] = dispatcher._remote_info
Expand All @@ -515,6 +525,7 @@ def get_organised_resources(instance):
# needs to be cleaned up for multiple HTTP methods
httpserver_resources[instruction] = HTTPResource(
what=ResourceTypes.ACTION,
class_name=instance.__class__.__name__,
instance_name=instance._owner.instance_name if instance._owner is not None else instance.instance_name,
obj_name=remote_info.obj_name,
fullpath=fullpath,
Expand All @@ -524,6 +535,7 @@ def get_organised_resources(instance):
)
zmq_resources[instruction] = ZMQResource(
what=ResourceTypes.ACTION,
class_name=instance.__class__.__name__,
instance_name=instance._owner.instance_name if instance._owner is not None else instance.instance_name,
instruction=instruction,
obj_name=getattr(resource, '__name__'),
Expand All @@ -545,6 +557,8 @@ def get_organised_resources(instance):
fullpath = f"{instance._full_URL_path_prefix}{resource.URL_path}"
# resource._remote_info.unique_identifier = fullpath
dispatcher = EventDispatcher(fullpath)
dispatcher._remote_info.class_name = instance.__class__.__name__
dispatcher._remote_info.serialization_specific = instance.zmq_serializer != instance.http_serializer
setattr(instance, name, dispatcher) # resource._remote_info.unique_identifier))
httpserver_resources[fullpath] = dispatcher._remote_info
zmq_resources[fullpath] = dispatcher._remote_info
Expand Down
Loading

0 comments on commit d651a50

Please sign in to comment.