-
Notifications
You must be signed in to change notification settings - Fork 6
/
DigitalPy.py
181 lines (152 loc) · 6.21 KB
/
DigitalPy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
#######################################################
#
# DigitalPy.py
# Python implementation of the Class DigitalPy
# Generated by Enterprise Architect
# Created on: 28-Dec-2022 1:18:23 PM
# Original author: FreeTAKTeam
#
#######################################################
import multiprocessing
import os
import pathlib
from digitalpy.core.digipy_configuration.configuration import Configuration
from digitalpy.core.digipy_configuration.impl.inifile_configuration import InifileConfiguration
from digitalpy.core.component_management.impl.component_registration_handler import ComponentRegistrationHandler
from digitalpy.core.main.factory import Factory
from digitalpy.core.main.impl.default_factory import DefaultFactory
from digitalpy.core.main.object_factory import ObjectFactory
class DigitalPy:
"""this is the executable of the digitalPy framework, providing the starting point
for a bare bone application.
"""
def __init__(self):
# Set up necessary resources and configurations for the application to run
self.resources = []
self.configuration: Configuration = InifileConfiguration("")
# register the digitalpy action mapping under ../digitalpy/action_mapping.ini
self.configuration.add_configuration(
str(
pathlib.PurePath(
pathlib.PurePath(os.path.abspath(__file__)).parent.parent,
"action_mapping.ini",
)
),
)
# the central digitalpy configuration used throughout the application
self.factory: Factory = DefaultFactory(self.configuration)
# register the factory and configuration to the object factory singleton
ObjectFactory.configure(self.factory)
ObjectFactory.register_instance("configuration", self.configuration)
# factory instance is registered for use by the routing worker so that
# the instances in the instance dictionary can be preserved when the
# new object factory is instantiated in the sub-process
ObjectFactory.register_instance("factory", self.factory)
def register_components(self):
"""register all components of the application
"""
# register base digitalpy components
digipy_components = ComponentRegistrationHandler.discover_components(
component_folder_path=pathlib.PurePath(
str(
pathlib.PurePath(
os.path.abspath(__file__)
).parent.parent
),
)
)
for digipy_component in digipy_components:
ComponentRegistrationHandler.register_component(
digipy_component,
"digitalpy.core",
self.configuration,
)
def start(self):
"""Begin the execution of the application, this should be overriden
by any inheriting classes"""
self.register_components()
self.configure()
self.start_services()
def start_services(self):
self.start_routing_proxy_service()
self.start_integration_manager_service()
def start_routing_proxy_service(self):
"""this function is responsible for starting the routing proxy service"""
try:
# begin the routing proxy
self.routing_proxy_service = ObjectFactory.get_instance("Subject")
proc = multiprocessing.Process(
target=self.routing_proxy_service.begin_routing
)
proc.start()
return 1
except Exception as ex:
return -1
def stop_routing_proxy_service(self):
"""this function is responsible for stopping the routing proxy service"""
try:
# TODO: add a pre termination call to shutdown workers and sockets before a
# termination to prevent hanging resources
if self.routing_proxy_service.is_alive():
self.routing_proxy_service.terminate()
self.routing_proxy_service.join()
else:
self.routing_proxy_service.join()
return 1
except Exception as e:
return -1
def start_integration_manager_service(self) -> bool:
"""Starts the integration manager service.
Returns:
bool: True if successful, False otherwise.
"""
try:
# begin the integration_manager_service
self.integration_manager_service = ObjectFactory.get_instance("IntegrationManager")
proc = multiprocessing.Process(target=self.integration_manager_service.start)
proc.start()
return True
except Exception as ex:
raise ex
def stop_integration_manager_service(self) -> bool:
"""Stops the integration manager service.
Returns:
bool: True if successful, False otherwise.
"""
try:
if self.integration_manager_service.is_alive():
self.integration_manager_service.terminate()
self.integration_manager_service.join()
else:
self.routing_proxy_service.join()
return True
except Exception as ex:
raise ex
def stop(self):
# End the execution of the application
pass
def restart(self):
# End and then restart the execution of the application
self.stop()
self.start()
def save_state(self):
# Persist the current state of the application to allow for future restoration
pass
def load_state(self):
# Restore the application to a previously saved state
pass
def configure(self):
"""Set or modify the configuration of the application"""
def get_status(self):
# Retrieve the current status of the application (e.g. running, stopped, etc.)
pass
def get_logs(self):
# Retrieve the log records generated by the application
pass
def handle_errors(self, error):
# Deal with errors that occur during the execution of the application
pass
def shutdown(self):
# Close all resources and terminate the application
self.resources = []
self.configurations = {}