Skip to content

Commit

Permalink
Merge pull request #36 from bringauto/BAF-1029/fix_for_integration_tests
Browse files Browse the repository at this point in the history
BAF 1029 - Bug fixes
  • Loading branch information
MarioIvancik authored Jan 9, 2025
2 parents f2c3cfb + b720166 commit db813e4
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 15 deletions.
2 changes: 1 addition & 1 deletion external_server/adapters/mqtt/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,6 @@ def _get_message(self, ignore_timeout: bool = False) -> _ExternalClientMsg | Non
def _start_communication(self) -> int:
"""Set up the MQTT client traffic processing (callbacks and subscriptions) and ensure the traffic processing loop is running."""
self._set_up_callbacks()
self._mqtt_client.subscribe(self._subscribe_topic, qos=_QOS)
code = self._start_client_loop()
connection = self._wait_for_connection(_MQTT_CONNECTION_STATE_UPDATE_TIMEOUT)
if connection:
Expand Down Expand Up @@ -370,6 +369,7 @@ def _on_connect(self, client: _Client, data: Any, flags: Any, rc, properties: An
- `rc` The connection result code indicating success or failure.
- `properties` The properties associated with the connection event.
"""
self._mqtt_client.subscribe(self._subscribe_topic, qos=_QOS)
self._log_connection_result(rc)

def _on_disconnect(self, client: _Client, data: Any, flags: Any, rc, properties: Any) -> None:
Expand Down
12 changes: 10 additions & 2 deletions external_server/server/all_cars.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,25 @@ def car_servers(self) -> dict[str, CarServer]:
"""Return parts of the external server responsible for each car defined in configuration."""
return self._car_servers.copy()

def start(self) -> None:
def start(self, wait_for_join: bool = False) -> None:
"""Start the external server.
For reach car defined in the configuration, create a separate thread and inside that,
start an instance of the CarServer class.
The 'wait_for_join' parameter is used to determine if the main thread should wait
for all car threads to finish or just return and let the car threads run in the background.
"""
for car in self._car_servers:
self._car_threads[car] = threading.Thread(target=self._car_servers[car].start)
for t in self._car_threads.values():
t.start()
# The following for loop ensures, that the main thread waits for all car threads to finish
# This allows for the external_server_main script to run while the car threads are running
# This ensures the server can be stopped by KeyboardInterrupt
if wait_for_join:
for t in self._car_threads.values():
t.join()

def stop(self, reason: str = "") -> None:
"""Stop the external server.
Expand All @@ -79,7 +88,6 @@ def stop(self, reason: str = "") -> None:
try:
for car_server in self.car_servers().values():
car_server.stop(reason)
car_server._event_queue.add(_EventType.SERVER_STOPPED)
for car_thread in self._car_threads.values():
if car_thread.is_alive():
car_thread.join()
Expand Down
17 changes: 12 additions & 5 deletions external_server/server/single_car.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,12 +247,13 @@ def stop(self, reason: str = "") -> None:
"""
msg = f"Stopping the external server part for car {self._config.car_name} of company {self._config.company_name}."
self._set_state(ServerState.STOPPED)
self._event_queue.add(_EventType.SERVER_STOPPED)
if reason:
msg += f" Reason: {reason}"
logger.info(msg, self._car_name)
self._set_running_flag(False)
self._clear_context()
self._clear_modules()
self._clear_context()

def tls_set(self, ca_certs: str, certfile: str, keyfile: str) -> None:
"Set tls security to MQTT client."
Expand Down Expand Up @@ -306,8 +307,9 @@ def _clear_context(self) -> None:
self._command_checker.reset()
self._status_checker.reset()
for device in self._known_devices.list_connected():
module_adapter = self._modules[device.module_id].api
module_adapter.device_disconnected(DisconnectTypes.timeout, device.to_device())
if device.module_id in self._modules:
module_adapter = self._modules[device.module_id].api
module_adapter.device_disconnected(DisconnectTypes.timeout, device.to_device())
self._known_devices.clear()
self._event_queue.clear()

Expand Down Expand Up @@ -398,7 +400,7 @@ def _disconnect_device(self, disconnect_types: DisconnectTypes, device: _Device)
drepr = device_repr(device)
logger.info(f"Disconnecting device {drepr}.", self._car_name)
if self._known_devices.is_not_connected(device):
logger.info(f"Device {drepr} is already disconnected.", self._car_name)
logger.warning(f"Device {drepr} is already disconnected.", self._car_name)
else:
self._known_devices.remove(DevicePy.from_device(device))
code = self._modules[device.module].api.device_disconnected(disconnect_types, device)
Expand Down Expand Up @@ -517,6 +519,8 @@ def _handle_checked_status(self, status: _Status) -> None:
module.api.forward_status(status)
logger.info(f"Status from '{device_repr(device)}' has been forwarded.", self._car_name)
self._publish_status_response(status)
if status.deviceState == _Status.DISCONNECT:
self._disconnect_device(DisconnectTypes.announced, device)

def _handle_checked_status_by_device_state(self, status: _Status, device: _Device) -> bool:
"""Handle the status that has been checked by the status checker.
Expand All @@ -532,11 +536,14 @@ def _handle_checked_status_by_device_state(self, status: _Status, device: _Devic
logger.warning("Device is not connected. Ignoring status.", self._car_name)
status_ok = False
case _Status.DISCONNECT:
status_ok = self._disconnect_device(DisconnectTypes.announced, device)
logger.info(
f"Received status with a disconnect message for device {device_repr(device)}.", self._car_name
)
case _:
logger.warning(
f"Unknown device state: {status.deviceState}. Ignoring status.", self._car_name
)
status_ok = False
return status_ok

def _handle_command(self, module_id: int, data: bytes, device: _Device) -> None:
Expand Down
3 changes: 0 additions & 3 deletions external_server/server_module/command_waiting_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ def __init__(
module_connection_check: Callable[[], bool],
event_queue: _EventQueue,
timeout_ms: int = 1000,
poll_interval: float = 0.2,
) -> None:

self._api_adapter: APIClientAdapter = api_client
Expand All @@ -83,7 +82,6 @@ def __init__(
self._continue_thread = True
self._timeout_ms = timeout_ms
self._car = api_client.car
self._poll_interval = poll_interval

@property
def timeout_ms(self) -> int:
Expand Down Expand Up @@ -158,4 +156,3 @@ def _put_single_popped_command_into_queue(self, command: bytes, device: _Device)
def _main_thread(self) -> None:
while self._continue_thread:
self.poll_commands()
time.sleep(self._poll_interval)
2 changes: 1 addition & 1 deletion external_server_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def main() -> None:
server.set_tls(args.ca, args.cert, args.key)

try:
server.start()
server.start(wait_for_join=True)
except KeyboardInterrupt:
server.stop(reason="keyboard interrupt")

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "external_server"
version = "2.1.1"
version = "2.1.3"


[tool.setuptools.packages.find]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@


def wait_for_server_connection(
server: CarServer, test_case: unittest.TestCase, timeout: float = 2.0
server: CarServer, test_case: unittest.TestCase, timeout: float = 5.0
):
t = time.time()
while time.time() - t < timeout:
Expand All @@ -47,7 +47,7 @@ def wait_for_server_connection(


def _wait_for_server_initialization(
server: CarServer, test_case: unittest.TestCase, timeout: float = 2.0
server: CarServer, test_case: unittest.TestCase, timeout: float = 5.0
):
t = time.time()
while time.time() - t < timeout:
Expand Down

0 comments on commit db813e4

Please sign in to comment.