Skip to content

Commit

Permalink
Merge pull request #21 from zpieslak/19-move-mqtt-client-initializati…
Browse files Browse the repository at this point in the history
…on-to-appcall-method

Initialize mqtt client inside App.call
  • Loading branch information
zpieslak authored Oct 19, 2024
2 parents b2921a1 + 186f325 commit 9f75b70
Show file tree
Hide file tree
Showing 6 changed files with 237 additions and 211 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ The tests can be run with the following command:
To generate a coverage report:

coverage run -m unittest discover
coverage report
coverage report -m

To regenerate proto files

Expand Down
42 changes: 18 additions & 24 deletions mobilus_client/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
import secrets
import socket

from mobilus_client.client import Client
from mobilus_client.config import Config
from mobilus_client.messages.serializer import MessageSerializer
from mobilus_client.mqtt_client import MqttClient
from mobilus_client.registries.key import KeyRegistry
from mobilus_client.registries.message import MessageRegistry

Expand All @@ -14,50 +14,44 @@
class App:
def __init__(self, config: Config) -> None:
self.config = config
self.message_registry = MessageRegistry()
self.key_registry = KeyRegistry(config.user_key)
self.client = MqttClient(
client_id=secrets.token_hex(6).upper(),
transport=config.gateway_protocol,
userdata={
"config": config,
"key_registry": self.key_registry,
"message_registry": self.message_registry,
},
)

def call(self, commands: list[tuple[str, dict[str, str]]]) -> str:
if not commands:
return self._empty_response()

try:
# Connect to the MQTT broker and start the loop
self.client.connect(self.config.gateway_host, self.config.gateway_port, 60)
self.client.loop_start()
# Initialize client and registries
key_registry = KeyRegistry(self.config.user_key)
message_registry = MessageRegistry()

# Wait for the client to authenticate
self.client.authenticated_event.wait(timeout=self.config.auth_timeout_period)
client = Client(
client_id=secrets.token_hex(6).upper(),
config=self.config,
key_registry=key_registry,
message_registry=message_registry,
)

if not self.client.authenticated_event.is_set():
logger.error("Failed to authenticate with the gateway host")
try:
# Connect to the MQTT broker and authenticate
if not client.connect_and_authenticate():
return self._empty_response()


# Execute the provided commands
for command, params in commands:
self.client.send_request(command, **params)
client.send_request(command, **params)

# Wait for the completion event to be triggered
self.client.completed_event.wait(timeout=self.config.timeout_period)
client.completed_event.wait(timeout=self.config.timeout_period)
except socket.gaierror:
logger.error("Failed to connect to the gateway host")
except TimeoutError:
logger.error("Timeout occurred")
finally:
self.client.disconnect()
client.terminate()

# Return serialized responses from the message registry
return MessageSerializer.serialize_list_to_json(
self.message_registry.get_responses(),
message_registry.get_responses(),
)

def _empty_response(self) -> str:
Expand Down
123 changes: 123 additions & 0 deletions mobilus_client/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
from __future__ import annotations

import logging
import threading
from typing import TYPE_CHECKING, Any

import paho.mqtt.client as mqtt

from mobilus_client.messages.encryptor import MessageEncryptor
from mobilus_client.messages.factory import MessageFactory
from mobilus_client.messages.status import MessageStatus
from mobilus_client.messages.validator import MessageValidator
from mobilus_client.proto import LoginRequest, LoginResponse

if TYPE_CHECKING:
from mobilus_client.config import Config
from mobilus_client.registries.key import KeyRegistry
from mobilus_client.registries.message import MessageRegistry

logger = logging.getLogger(__name__)


class Client:
def __init__(
self, client_id: str, config: Config, key_registry: KeyRegistry, message_registry: MessageRegistry) -> None:
self.config = config
self.client_id = client_id
self.key_registry = key_registry
self.message_registry = message_registry
self.authenticated_event = threading.Event()
self.completed_event = threading.Event()

self.mqtt_client = mqtt.Client(client_id=self.client_id, transport=config.gateway_protocol)
self._configure_client()

def connect_and_authenticate(self) -> bool:
self.mqtt_client.connect(self.config.gateway_host, self.config.gateway_port)
self.mqtt_client.loop_start()

# Wait for the client to authenticate
self.authenticated_event.wait(timeout=self.config.auth_timeout_period)

if not self.authenticated_event.is_set():
logger.error("Failed to authenticate with the gateway host")
return False

return True

def send_request(self, command: str, **params: str | bytes | int | None) -> None:
if not self.mqtt_client.is_connected():
logger.error("Sending request - %s failed. Client is not connected.", command)
return

message = MessageFactory.create_message(command, **params)
status = MessageValidator.validate(message)

if status != MessageStatus.SUCCESS or message is None:
logger.error("Command - %s returned an error - %s", command, status.name)
self.terminate()
return

if not isinstance(message, LoginRequest):
self.message_registry.register_request(message)

encrypted_message = MessageEncryptor.encrypt(
message,
self.client_id,
self.key_registry,
)

self.mqtt_client.publish("module", encrypted_message)

def terminate(self) -> None:
self.mqtt_client.disconnect()
self.mqtt_client.loop_stop()

def on_disconnect_callback(self, _client: mqtt.Client, _userdata: None, reason_code: int) -> None:
logger.info("Disconnected with result code - %s", reason_code)

def on_connect_callback(
self, _client: mqtt.Client, _userdata: None, _flags: dict[str, Any], _reason_code: int) -> None:
self.mqtt_client.subscribe([
(self.client_id, 0),
("clients", 0),
])

def on_subscribe_callback(self, _client: mqtt.Client, _userdata: None, _mid: int, _granted_qos: tuple[int]) -> None:
self.send_request(
"login",
login=self.config.user_login,
password=self.config.user_key,
)

def on_message_callback(self, _client: mqtt.Client, _userdata: None, mqtt_message: mqtt.MQTTMessage) -> None:
logger.info("Received message on topic - %s", mqtt_message.topic)

message = MessageEncryptor.decrypt(mqtt_message.payload, self.key_registry)
logger.info("Decrypted message - %s", type(message).__name__)

status = MessageValidator.validate(message)

if status != MessageStatus.SUCCESS or message is None:
logger.error("Message - %s returned an error - %s", type(message).__name__, status.name)
self.terminate()
return

logger.info("Message - %s validated successfully", type(message).__name__)

if isinstance(message, LoginResponse):
self.key_registry.register_keys(message)
self.authenticated_event.set()
else:
self.message_registry.register_response(message)

if self.message_registry.all_responses_received():
self.completed_event.set()

def _configure_client(self) -> None:
self.mqtt_client.enable_logger(logger)
self.mqtt_client.on_connect = self.on_connect_callback
self.mqtt_client.on_disconnect = self.on_disconnect_callback
self.mqtt_client.on_subscribe = self.on_subscribe_callback
self.mqtt_client.on_message = self.on_message_callback
91 changes: 0 additions & 91 deletions mobilus_client/mqtt_client.py

This file was deleted.

Loading

0 comments on commit 9f75b70

Please sign in to comment.