Skip to content

Commit

Permalink
Merge pull request #26 from FreeTAKTeam/service-support
Browse files Browse the repository at this point in the history
Service support
  • Loading branch information
naman108 authored Jan 6, 2023
2 parents be9bf96 + 58fa340 commit 848d3ae
Show file tree
Hide file tree
Showing 16 changed files with 487 additions and 218 deletions.
1 change: 1 addition & 0 deletions digitalpy/core/main/event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Callable
from digitalpy.core.main.event import Event

# TODO decide whether or not to deprecate this class
class EventManager(ABC):
"""EventManager is responsible for dispatching events to registered listeners."""

Expand Down
2 changes: 1 addition & 1 deletion digitalpy/core/main/state_change_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
#######################################################
from typing import Any
from digitalpy.core.persistence.persistent_object import PersistentObject
from digitalpy.core.event import Event
from digitalpy.core.main.event import Event

class StateChangeEvent(Event):
"""StateChangeEvent signals a change of the state of a PersistentObject instance.
Expand Down
44 changes: 36 additions & 8 deletions digitalpy/core/service_management/digitalpy_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,35 @@
#######################################################
from digitalpy.core.zmanager.service import Service
from digitalpy.core.zmanager.impl.zmq_subscriber import ZmqSubscriber
from digitalpy.core.zmanager.impl.zeroless_pusher import ZerolessPusher
from digitalpy.core.zmanager.impl.zmq_pusher import ZMQPusher
from digitalpy.core.parsing.formatter import Formatter

class DigitalPyService(Service, ZmqSubscriber, ZerolessPusher):
class DigitalPyService(Service, ZmqSubscriber, ZMQPusher):
# on the reception of messages from the subscriber interface or the socket
#TODO: what is the service manager supposed to do? is this going to be a new service

def __init__(self, service_id: str, subject_address: str, subject_port: int, integration_manager_address: str, integration_manager_port: int, formatter: Formatter):
def __init__(self, service_id: str, subject_address: str, subject_port: int, subject_protocol: str, integration_manager_address: str, integration_manager_port: int, integration_manager_protocol: str, formatter: Formatter):
"""the constructor for the digitalpy service class
Args:
service_id (str): the unique id of the service inheriting from DigitalpyService
subject_address (str): the address of the zmanager "subject"
subject_port (int): the port of the zmanager "subject"
subject_protocol (str): the protocol of the zmanager "subject"
integration_manager_address (str): the address of the zmanager "integration_manager"
integration_manager_port (int): the port of the zmanager "integration_manager"
integration_manager_protocol (str): the protocol of the zmanager "integration_manager"
formatter (Formatter): the formatter used by the service to serialize the request values to and from messages, (should be injected by object factory)
"""
Service.__init__(self)
ZmqSubscriber.__init__(self, formatter)
ZerolessPusher.__init__(self, formatter)
ZMQPusher.__init__(self, formatter)
self.subject_address = subject_address
self.subject_port = subject_port
self.subject_protocol = subject_protocol
self.integration_manager_address = integration_manager_address
self.integration_port_address = integration_manager_port
self.integration_manager_port = integration_manager_port
self.integration_manager_protocol = integration_manager_protocol
self.service_id = service_id

def discovery(self):
Expand All @@ -37,6 +51,20 @@ def send_heart_beat(self):
# TODO: once the service manager has been well defined then we will need
# to define the format for this service.

def initialize_connections(self):
ZerolessPusher.initiate_connections(self, self.subject_port, self.subject_address)
self.broker_connect(self.integration_port_address, self.integration_manager_address,self.service_id)
def initialize_connections(self, application_protocol: str):
"""initialize connections to the subject and the integration manager
Args:
application_protocol (str): the application protocol of the service
"""
ZMQPusher.initiate_connections(self, self.subject_port, self.subject_address)
self.broker_connect(self.integration_manager_address, self.integration_manager_port, self.integration_manager_protocol, self.service_id, application_protocol)

def __getstate__(self):
ZmqSubscriber.__getstate__(self)
ZMQPusher.__getstate__(self)
return self.__dict__

def __setstate__(self, state):
ZmqSubscriber.__setstate__(self, state)
ZMQPusher.__setstate__(self, self.__dict__)
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,14 @@ def register_service(
)

if ServiceRegistrationHandler.validate_manifest(
facade_instance.get_manifest(), service_name
service_instance.get_manifest(), service_name
):
facade_instance.register(config)
service_instance.register(config)
else:
return False
else:
return False
ServiceRegistrationHandler.save_service(facade_instance.get_manifest(), service_name)
ServiceRegistrationHandler.save_service(service_instance.get_manifest(), service_name)
return True
except Exception as e:
# must use a print because logger may not be available
Expand All @@ -93,6 +93,12 @@ def register_service(

@staticmethod
def save_service(manifest: Configuration, service_name: str):
"""save a service to the service registry
Args:
manifest (Configuration): _description_
service_name (str): _description_
"""
section = manifest.get_section(service_name + MANIFEST, include_meta=True)
ServiceRegistrationHandler.registered_services[section[NAME]] = section

Expand Down
8 changes: 7 additions & 1 deletion digitalpy/core/telemetry/meter.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,13 @@

class Meter(ABC):

def __init__(self, provider: MetricsProvider, meter_name=None):
def __init__(self, provider: MetricsProvider, meter_name: str=None):
"""the constructor for the abstract meter class
Args:
provider (MetricsProvider): the metrics provider used to create this meter
meter_name (str, optional): the name of this meter instance. Defaults to None.
"""
if meter_name is None:
meter_name = self.__class__.__name__
self.meter = provider.get_meter(meter_name)
Expand Down
46 changes: 41 additions & 5 deletions digitalpy/core/zmanager/impl/async_action_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@


class AsyncActionMapper(ActionMapper):
# TODO the may need to be deprecated with the new zmanager architecture
#
# Constructor
# @param session
Expand All @@ -27,9 +28,18 @@ def __init__(
event_manager: EventManager,
configuration: Configuration,
formatter: Formatter,
routing_publisher_address,
routing_subscriber_address,
routing_publisher_address: str,
routing_subscriber_address: str,
):
"""_summary_
Args:
event_manager (EventManager): the event manager used to dispatch events to listeners
configuration (Configuration): the
formatter (Formatter): the formatter used to serialize message values to their specified format
routing_publisher_address (str): the address of the routing publisher to send the request
routing_subscriber_address (str): the address of the routing subscriber to receive the response
"""
self.eventManager = event_manager
self.configuration = configuration
self.is_finished = False
Expand Down Expand Up @@ -89,7 +99,18 @@ def get_responses(self) -> list:

def get_response(
self, response: Response, request: Request, listener: zmq.Socket, timeout=10000
):
) -> Response:
"""get the response from the routing_subscriber
Args:
response (Response): an empty response object
request (Request): an empty request object
listener (zmq.Socket): a socket to listen on
timeout (int, optional): how long to wait for a response. Defaults to 10000.
Returns:
Response: the passed respoonse object with data received by the routing subscriber
"""
try:
topic = f"/routing/response/{request.get_sender()}/{request.get_context()}/{request.get_action()}/{request.get_format()}/{request.get_id()}"
listener.RCVTIMEO = timeout
Expand All @@ -114,7 +135,17 @@ def get_response(
#
def process_action(
self, request: Request, response: Response, return_listener: bool
):
) -> zmq.Socket:
"""_summary_
Args:
request (Request): a request object with data to be sent to the routing publisher
response (Response): a response object to be passed the basic requset parameters
return_listener (bool): whether or not to return the listener
Returns:
zmq.Socket: the zmq socket with a subscription to the endpoint to be listened to
"""
self.eventManager.dispatch(
ApplicationEvent.NAME,
ApplicationEvent(ApplicationEvent.BEFORE_ROUTE_ACTION, request),
Expand All @@ -141,7 +172,12 @@ def process_action(

return routing_subscriber

def submit_request(self, request):
def submit_request(self, request: Request):
"""send a request object to the routing publisher
Args:
request (Request): the request object
"""
with self.get_routing_publisher() as routing_publisher:
routing_publisher.send_multipart(
[
Expand Down
16 changes: 14 additions & 2 deletions digitalpy/core/zmanager/impl/default_action_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,20 @@ def initialize_tracing(self):
except Exception as e:
pass

def process_action(self, request: Request, response: Response):
def process_action(self, request: Request, response: Response)-> None:
# TODO break up this method
"""this is the main method for routing and processing requests within the action mapper
Args:
request (Request): the request containing an action to be routed
response (Response): the response to be filled with response data from the component
Raises:
Exception
Returns:
None: return a none
"""
# this is added for the sake of the latter use of multiprocessing
if not self.tracing_provider:
self.initialize_tracing()
Expand Down Expand Up @@ -82,7 +94,7 @@ def process_action(self, request: Request, response: Response):
+ actionKey
+ ". Request was referrer?context?action"
)
Exception(request, response)
raise Exception("No controller found for best action key "+ actionKey)

# check if the controller definition contains a method besides the class name
controllerMethod = None
Expand Down
42 changes: 36 additions & 6 deletions digitalpy/core/zmanager/impl/default_request.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,52 @@
from digitalpy.core.zmanager.impl.abstract_controller_message import AbstractControllerMessage
from digitalpy.core.zmanager.request import Request
from digitalpy.core.zmanager.response import Response
import json

class DefaultRequest(Request, AbstractControllerMessage):

"""default request object used in most cases
"""
def __init__(self):
"""constructor for default request object
"""
super().__init__()
self.response = None
self.method = None

def set_response(self, response):
def set_response(self, response: Response):
"""Set the response for this request.
This method sets the response object for this request and sets this request
as the request for the response.
Args:
response (Response): The response object to set for this request.
Returns:
None.
"""
self.response = response
if response.get_request != self:
if response.get_request() != self:
response.set_request(self)

def get_response(self):

def get_response(self) -> Response:
"""Get the response for this request.
This method returns the response object for this request.
Returns:
Response: The response object for this request.
"""
return self.response

def get_method(self):
def get_method(self) -> str:
"""Get the method of this request.
This method returns the method of this request.
Returns:
str: The method of this request.
"""
return self.method


24 changes: 21 additions & 3 deletions digitalpy/core/zmanager/impl/default_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,34 @@ class DefaultResponse(Response, AbstractControllerMessage):
def __init__(self):
super().__init__()

def set_request(self, request: Request):
def set_request(self, request: Request) -> None:
"""Sets the request object for this response object. If the response object has not been set in the request object, it also sets the response object in the request object.
Args:
request (Request): the request object to be set in this response object
Returns:
None: returns None"""
self.request = request
if request.get_response() != self:
request.set_response(self)

def get_request(self):
"""
Gets the request object associated with this response object.
Returns:
Request: the request object associated with this response object"""
return self.request

def set_status(self, status):
def set_status(self, status: int) -> None:
"""Sets the status of this response object.
Args:
status (int): the status code to be set for this response object
Returns:
None: returns None"""
self.status = status

def get_status(self, status):
def get_status(self):
"""Gets the status of this response object.
Returns:
int: the status code of this response object"""
return self.status
Loading

0 comments on commit 848d3ae

Please sign in to comment.