Skip to content

Commit

Permalink
Merge pull request #16 from FreeTAKTeam/telemetry-serialization
Browse files Browse the repository at this point in the history
added support for pickling of telemetry objects
  • Loading branch information
naman108 authored Nov 23, 2022
2 parents 68047e1 + 9f23067 commit 9ff38bf
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 63 deletions.
12 changes: 7 additions & 5 deletions digitalpy/component/impl/default_facade.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def __init__(
log_file_path,
component_name=None,
type_mapping=None,
action_mapper=None,
action_mapper: DefaultActionMapper = None,
base=object,
request=None,
response=None,
Expand Down Expand Up @@ -49,8 +49,8 @@ def __init__(
else:
self.component_name = self.__class__.__name__

# get a tacer from the tracer provider
# get a tracer from the tracer provider

if tracing_provider_instance is not None:
self.tracer: Tracer = tracing_provider_instance.create_tracer(
component_name
Expand Down Expand Up @@ -93,6 +93,8 @@ def execute(self, method):
except Exception as e:
self.logger.fatal(str(e))

self.response.set_value("tracer", None)

def get_logs(self):
"""get all the log files available"""
return self.log_manager.get_logs()
Expand Down Expand Up @@ -127,7 +129,7 @@ def _register_type_mapping(self):
request.set_action("RegisterMachineToHumanMapping")
request.set_value("machine_to_human_mapping", self.type_mapping)

actionmapper = ObjectFactory.get_instance("syncactionMapper")
actionmapper = ObjectFactory.get_instance("SyncActionMapper")
response = ObjectFactory.get_new_instance("response")
actionmapper.process_action(request, response)

Expand All @@ -138,7 +140,7 @@ def _register_type_mapping(self):
"human_to_machine_mapping", {k: v for v, k in self.type_mapping.items()}
)

actionmapper = ObjectFactory.get_instance("actionMapper")
actionmapper = ObjectFactory.get_instance("SyncActionMapper")
response = ObjectFactory.get_new_instance("response")
actionmapper.process_action(request, response)

Expand Down
59 changes: 32 additions & 27 deletions digitalpy/logic/impl/default_business_rule_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,30 +78,35 @@ def _get_matchable_object(self, matchable, rule_dict):
return matchable

def _evaluate_actions(self, rule_dict):
if "actions" in rule_dict:
for action in rule_dict["actions"]:
# here we define the current request and response objects
cur_request = self.get_request()
cur_response = self.get_response()

# we instantiate the sub request object to use all of the same properties as
# the current request except for the action which is taken from the business rule configuration
# and the sender which is set to the current controller
sub_request = ObjectFactory.get_new_instance("request")
sub_request.set_sender(self.__class__.__name__)
sub_request.set_context(cur_request.get_context())
sub_request.set_action(action)
sub_request.set_values(cur_request.get_values())
sub_request.set_format(cur_request.get_format())
# here we instantiate the sub response object, it is simpler than the request object
# taking only the format from the current response
sub_response = ObjectFactory.get_new_instance("response")
sub_response.set_format(cur_response.get_format())
# finally we call the internal_action_mapper to process the action
# it should be noted that the internal_action_mapper is synchronous and can
# only access the internal action mapping configuration
self.internal_action_mapper.process_action(sub_request, sub_response)

# add all the sub_response values to the current response
for key, value in sub_response.get_values().items():
cur_response.set_value(key, value)
try:
if "actions" in rule_dict:
for action in rule_dict["actions"]:
# here we define the current request and response objects
cur_request = self.get_request()
cur_response = self.get_response()

# we instantiate the sub request object to use all of the same properties as
# the current request except for the action which is taken from the business rule configuration
# and the sender which is set to the current controller
sub_request = ObjectFactory.get_new_instance("request")
sub_request.set_sender(self.__class__.__name__)
sub_request.set_context(cur_request.get_context())
sub_request.set_action(action)
sub_request.set_values(cur_request.get_values())
sub_request.set_format(cur_request.get_format())
# here we instantiate the sub response object, it is simpler than the request object
# taking only the format from the current response
sub_response = ObjectFactory.get_new_instance("response")
sub_response.set_format(cur_response.get_format())
# finally we call the internal_action_mapper to process the action
# it should be noted that the internal_action_mapper is synchronous and can
# only access the internal action mapping configuration
self.internal_action_mapper.process_action(
sub_request, sub_response
)

# add all the sub_response values to the current response
for key, value in sub_response.get_values().items():
cur_response.set_value(key, value)
except Exception as e:
raise e
10 changes: 8 additions & 2 deletions digitalpy/routing/impl/default_action_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ def __init__(
#
def initialize_tracing(self):
try:
self.tracing_provider = ObjectFactory.get_instance("tracingprovider")
# new instance used because the contents of tracing_provider cant be serialized between
# processes and so shouldnt be persisted in the factory hance we have custom handling
# to avoid duplication of the tracing provider
self.tracing_provider = ObjectFactory.get_new_instance("tracingprovider")
ObjectFactory.register_instance(
"tracingproviderinstance", self.tracing_provider
)
Expand Down Expand Up @@ -119,7 +122,10 @@ def process_action(self, request: Request, response: Response):
controllerObj,
),
)
controllerObj.execute(controllerMethod)
try:
controllerObj.execute(controllerMethod)
except Exception as e:
raise e
self.eventManager.dispatch(
ApplicationEvent.NAME,
ApplicationEvent(
Expand Down
30 changes: 25 additions & 5 deletions digitalpy/telemetry/impl/opentel_tracing_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
from digitalpy.telemetry.impl.opentel_tracing_exporter import OpenTelTracingExporter

class OpenTelTracingProvider(TracingProvider):
"""tracing provider implementation for the open telemetry protocol.
"""
def __init__(self):
"""tracing provider implementation for the open telemetry protocol."""

def initialize_tracing(self):
self.provider = TracerProvider()
exporter = ObjectFactory.get_instance("TracerExporter")
exporter = ObjectFactory.get_new_instance("TracerExporter")
self.exporter = OpenTelTracingExporter(exporter)
self.processor = ObjectFactory.get_instance("TracerProcessor", dynamic_configuration={"span_exporter": exporter})
self.processor = ObjectFactory.get_new_instance("TracerProcessor", dynamic_configuration={"span_exporter": exporter})
self.provider.add_span_processor(self.processor)

def create_tracer(self, tracer_name: str) -> OpenTelTracer:
Expand All @@ -24,4 +24,24 @@ def create_tracer(self, tracer_name: str) -> OpenTelTracer:
OpenTelTracer: an open telemetry tracer instance
from the current provider instance with the passed name
"""
if not hasattr(self, 'provider'):
self.initialize_tracing()
return OpenTelTracer(self.provider.get_tracer(tracer_name))

# __getstate__ used to address the issue of passing a dictionary containing the
# openteltracingprovider to a multiprocess, basically the shutdown
# thread object still causes problems in the processor so deleting
# the object every time a class instance is serialized
# addresses the issue.

def __getstate__(self):
"""get the state of the object in without un-serializable objects"""
if hasattr(self, "provider"):
self.provider.shutdown()
del self.provider
if hasattr(self, "processor"):
self.processor.shutdown()
del self.processor
if hasattr(self, "exporter"):
del self.exporter
return self.__dict__
5 changes: 5 additions & 0 deletions digitalpy/telemetry/tracing_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from abc import ABC

class TracingProcessor(ABC):
"""class used to process and retrieve traces from tracers"""
pass
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from setuptools import setup, find_packages

setup(name='digitalpy',
version='0.2.5.2',
version='0.2.5.3',
description="A python implementation of the aphrodite's specification, heavily based on WCMF",
author='Natha Paquette',
author_email='natha.paquette@gmail.com',
Expand Down
23 changes: 0 additions & 23 deletions tests/test_domain/test_domain.py

This file was deleted.

File renamed without changes.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from digitalpy.component.impl.default_facade import DefaultFacade

class TestFacade(DefaultFacade):
def __init__(self,
1 change: 1 addition & 0 deletions tests/test_registration/test_registration_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
def test_component_discovery(se)

0 comments on commit 9ff38bf

Please sign in to comment.