Skip to content

Commit

Permalink
Merge pull request #31 from linomp/feat/local-dev-mqtt
Browse files Browse the repository at this point in the history
Removed cloud dependency for local mqtt dev
  • Loading branch information
linomp authored May 14, 2024
2 parents c81011a + 4b5c1f4 commit e2e78ee
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 61 deletions.
17 changes: 17 additions & 0 deletions docker-compose-dev.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
version: "3.8"

services:
hivemq-broker:
image: hivemq/hivemq-ce
ports:
- "8883:1883"
- "8884:8000"
networks:
- my-network
restart: always

networks:
my-network:

# docker compose -f docker-compose-dev.yml up -d
# docker compose -f docker-compose-dev.yml down -v
100 changes: 54 additions & 46 deletions mvp/client/ui/src/messaging/mqttFunctions.ts
Original file line number Diff line number Diff line change
@@ -1,54 +1,62 @@
import mqtt, { type Packet } from "mqtt";
import type { GameSessionDTO, MqttFrontendConnectionDetails } from "src/api/generated";
import mqtt from "mqtt";
import type {GameSessionDTO, MqttFrontendConnectionDetails} from "src/api/generated";

const buildUrlAndOpts = (connectionDetails: MqttFrontendConnectionDetails): [string, mqtt.IClientOptions] => {
const brokerUrl = connectionDetails.host != "localhost" ? `wss://${connectionDetails.host}:${connectionDetails.port}/mqtt` :
`ws://${connectionDetails.host}:${connectionDetails.port}/mqtt`;

const opts: mqtt.IClientOptions = {
username: connectionDetails.host != "localhost" ? connectionDetails.username : undefined,
password: connectionDetails.host != "localhost" ? connectionDetails.password : undefined,
protocolVersion: 4, // MQTT 3.1.1
};

return [brokerUrl, opts];
}

export const getClient = async (
connectionDetails: MqttFrontendConnectionDetails,
messageHandler: (
topic: string,
message: Buffer,
) => Promise<void>
connectionDetails: MqttFrontendConnectionDetails,
messageHandler: (
topic: string,
message: Buffer,
) => Promise<void>
): Promise<[mqtt.MqttClient, () => void]> => {

const brokerUrl = `wss://${connectionDetails.host}:${connectionDetails.port}/mqtt`;
const [brokerUrl, opts] = buildUrlAndOpts(connectionDetails);

if (import.meta.env.VITE_DEBUG) {
console.log(`Connecting to MQTT broker at ${brokerUrl}`);
console.log(`Base topic: `, connectionDetails.base_topic);
}

try {
try {
const client = await mqtt.connectAsync(
brokerUrl,
opts
);

await client.subscribeAsync(`${connectionDetails.base_topic}`);

client.on("message", async (topic, message): Promise<void> => {
if (import.meta.env.VITE_DEBUG) {
const casted = JSON.parse(message.toString()) as GameSessionDTO;
console.log(`Received message on topic ${topic}`, casted);
}
await messageHandler(topic, message);
});

return [
client,
() => {
if (import.meta.env.VITE_DEBUG) {
console.log(`Connecting to MQTT broker at ${brokerUrl}`);
console.log(`Base topic: `, connectionDetails.base_topic);
console.log(`Unsubscribing from topic ${connectionDetails.base_topic}`);
}

const client = await mqtt.connectAsync(
brokerUrl,
{
username: connectionDetails.username,
password: connectionDetails.password,
protocolVersion: 4, // MQTT 3.1.1
}
);

await client.subscribeAsync(`${connectionDetails.base_topic}`);

client.on("message", async (topic, message): Promise<void> => {
if (import.meta.env.VITE_DEBUG) {
const casted = JSON.parse(message.toString()) as GameSessionDTO;
console.log(`Received message on topic ${topic}`, casted);
}
await messageHandler(topic, message);
});

return [
client,
() => {
if (import.meta.env.VITE_DEBUG) {
console.log(`Unsubscribing from topic ${connectionDetails.base_topic}`);
}
client.unsubscribe(`${connectionDetails.base_topic}`);
}
];

} catch (error) {
console.error(`Error connecting to MQTT broker ${brokerUrl}: `, error);
throw error;
}
}
client.unsubscribe(`${connectionDetails.base_topic}`);
}
];

} catch (error) {
console.error(`Error connecting to MQTT broker ${brokerUrl}: `, error);
throw error;
}
}
2 changes: 1 addition & 1 deletion mvp/server/messaging/MqttFrontendConnectionDetails.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

load_dotenv()

MQTT_HOST = os.environ.get("MQTT_HOST", "test.mosquitto.org")
MQTT_HOST = os.environ.get("MQTT_HOST", "localhost")
MQTT_WSS_PORT = int(os.environ.get("MQTT_WSS_PORT", 1883))
MQTT_FE_USER = os.environ.get("MQTT_FE_USER")
MQTT_FE_PASSWORD = os.environ.get("MQTT_FE_PASSWORD")
Expand Down
32 changes: 20 additions & 12 deletions mvp/server/messaging/mqtt_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,27 @@
import paho.mqtt.client as paho
from dotenv import load_dotenv
from paho import mqtt
from paho.mqtt.enums import CallbackAPIVersion

from mvp.server.core.game import GameSessionDTO

load_dotenv()

MQTT_HOST = os.environ.get("MQTT_HOST", "test.mosquitto.org")
MQTT_HOST = os.environ.get("MQTT_HOST", "localhost")
MQTT_PORT = int(os.environ.get("MQTT_PORT", 1883))
MQTT_USER = os.environ.get("MQTT_USER")
MQTT_PASSWORD = os.environ.get("MQTT_PASSWORD")
MQTT_TOPIC_PREFIX = os.environ.get("MQTT_TOPIC_PREFIX", "pdmgame/sessions")
MQTT_QOS = int(os.environ.get("MQTT_QOS", 0))

DISABLE_MQTT = MQTT_USER is None
USE_MQTT_AUTH = MQTT_HOST is not "localhost"


def get_mqtt_client() -> 'MqttClientBase':
if DISABLE_MQTT:
return MqttClientBase()
return MqttClient()


def on_connect(client, userdata, flags, rc, properties=None):
if rc == 0:
Expand All @@ -25,23 +33,26 @@ def on_connect(client, userdata, flags, rc, properties=None):
print(f"{datetime.now()}: MqttClient - connection failed. Code: {rc}")


class MqttClient:
class MqttClientBase:
def publish_session_state(self, client_uid: str, session: GameSessionDTO):
pass


class MqttClient(MqttClientBase):
__client__: paho.Client = None

def __init__(self):
if MQTT_USER is None:
return

client = paho.Client(
protocol=paho.MQTTv311,
callback_api_version=CallbackAPIVersion.VERSION2,
callback_api_version=paho.CallbackAPIVersion.VERSION2,
reconnect_on_failure=False,
clean_session=True
)
client.on_connect = on_connect

client.tls_set(tls_version=mqtt.client.ssl.PROTOCOL_TLS)
client.username_pw_set(MQTT_USER, MQTT_PASSWORD)
if USE_MQTT_AUTH:
client.tls_set(tls_version=mqtt.client.ssl.PROTOCOL_TLS)
client.username_pw_set(MQTT_USER, MQTT_PASSWORD)

print(
f"{datetime.now()}: MqttClient - attempting connection to {MQTT_HOST}:{MQTT_PORT} with user {MQTT_USER}"
Expand All @@ -53,9 +64,6 @@ def __init__(self):
self.__client__ = client

def publish_session_state(self, client_uid: str, session: GameSessionDTO):
if self.__client__ is None:
return

self.__client__.publish(
f"{MQTT_TOPIC_PREFIX}/{client_uid}", payload=bytes(session.json(), "utf-8"), qos=MQTT_QOS
)
4 changes: 2 additions & 2 deletions mvp/server/routers/sessions.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
from mvp.server.core.game.GameSession import GameSession
from mvp.server.core.game.GameSessionDTO import GameSessionDTO
from mvp.server.messaging.MqttFrontendConnectionDetails import MqttFrontendConnectionDetails
from mvp.server.messaging.mqtt_client import MqttClient
from mvp.server.messaging.mqtt_client import get_mqtt_client

sessions: dict[str, GameSession] = {}
game_metrics = GameMetrics()
mqtt_client = MqttClient()
mqtt_client = get_mqtt_client()

router = APIRouter(
prefix="/sessions",
Expand Down

0 comments on commit e2e78ee

Please sign in to comment.