Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removed cloud dependency for local mqtt dev #31

Merged
merged 1 commit into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions docker-compose-dev.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
version: "3.8"

services:
hivemq-broker:
image: hivemq/hivemq-ce
ports:
- "8883:1883"
- "8884:8000"
networks:
- my-network
restart: always

networks:
my-network:

# docker compose -f docker-compose-dev.yml up -d
# docker compose -f docker-compose-dev.yml down -v
100 changes: 54 additions & 46 deletions mvp/client/ui/src/messaging/mqttFunctions.ts
Original file line number Diff line number Diff line change
@@ -1,54 +1,62 @@
import mqtt, { type Packet } from "mqtt";
import type { GameSessionDTO, MqttFrontendConnectionDetails } from "src/api/generated";
import mqtt from "mqtt";
import type {GameSessionDTO, MqttFrontendConnectionDetails} from "src/api/generated";

const buildUrlAndOpts = (connectionDetails: MqttFrontendConnectionDetails): [string, mqtt.IClientOptions] => {
const brokerUrl = connectionDetails.host != "localhost" ? `wss://${connectionDetails.host}:${connectionDetails.port}/mqtt` :
`ws://${connectionDetails.host}:${connectionDetails.port}/mqtt`;

const opts: mqtt.IClientOptions = {
username: connectionDetails.host != "localhost" ? connectionDetails.username : undefined,
password: connectionDetails.host != "localhost" ? connectionDetails.password : undefined,
protocolVersion: 4, // MQTT 3.1.1
};

return [brokerUrl, opts];
}

export const getClient = async (
connectionDetails: MqttFrontendConnectionDetails,
messageHandler: (
topic: string,
message: Buffer,
) => Promise<void>
connectionDetails: MqttFrontendConnectionDetails,
messageHandler: (
topic: string,
message: Buffer,
) => Promise<void>
): Promise<[mqtt.MqttClient, () => void]> => {

const brokerUrl = `wss://${connectionDetails.host}:${connectionDetails.port}/mqtt`;
const [brokerUrl, opts] = buildUrlAndOpts(connectionDetails);

if (import.meta.env.VITE_DEBUG) {
console.log(`Connecting to MQTT broker at ${brokerUrl}`);
console.log(`Base topic: `, connectionDetails.base_topic);
}

try {
try {
const client = await mqtt.connectAsync(
brokerUrl,
opts
);

await client.subscribeAsync(`${connectionDetails.base_topic}`);

client.on("message", async (topic, message): Promise<void> => {
if (import.meta.env.VITE_DEBUG) {
const casted = JSON.parse(message.toString()) as GameSessionDTO;
console.log(`Received message on topic ${topic}`, casted);
}
await messageHandler(topic, message);
});

return [
client,
() => {
if (import.meta.env.VITE_DEBUG) {
console.log(`Connecting to MQTT broker at ${brokerUrl}`);
console.log(`Base topic: `, connectionDetails.base_topic);
console.log(`Unsubscribing from topic ${connectionDetails.base_topic}`);
}

const client = await mqtt.connectAsync(
brokerUrl,
{
username: connectionDetails.username,
password: connectionDetails.password,
protocolVersion: 4, // MQTT 3.1.1
}
);

await client.subscribeAsync(`${connectionDetails.base_topic}`);

client.on("message", async (topic, message): Promise<void> => {
if (import.meta.env.VITE_DEBUG) {
const casted = JSON.parse(message.toString()) as GameSessionDTO;
console.log(`Received message on topic ${topic}`, casted);
}
await messageHandler(topic, message);
});

return [
client,
() => {
if (import.meta.env.VITE_DEBUG) {
console.log(`Unsubscribing from topic ${connectionDetails.base_topic}`);
}
client.unsubscribe(`${connectionDetails.base_topic}`);
}
];

} catch (error) {
console.error(`Error connecting to MQTT broker ${brokerUrl}: `, error);
throw error;
}
}
client.unsubscribe(`${connectionDetails.base_topic}`);
}
];

} catch (error) {
console.error(`Error connecting to MQTT broker ${brokerUrl}: `, error);
throw error;
}
}
2 changes: 1 addition & 1 deletion mvp/server/messaging/MqttFrontendConnectionDetails.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

load_dotenv()

MQTT_HOST = os.environ.get("MQTT_HOST", "test.mosquitto.org")
MQTT_HOST = os.environ.get("MQTT_HOST", "localhost")
MQTT_WSS_PORT = int(os.environ.get("MQTT_WSS_PORT", 1883))
MQTT_FE_USER = os.environ.get("MQTT_FE_USER")
MQTT_FE_PASSWORD = os.environ.get("MQTT_FE_PASSWORD")
Expand Down
32 changes: 20 additions & 12 deletions mvp/server/messaging/mqtt_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,27 @@
import paho.mqtt.client as paho
from dotenv import load_dotenv
from paho import mqtt
from paho.mqtt.enums import CallbackAPIVersion

from mvp.server.core.game import GameSessionDTO

load_dotenv()

MQTT_HOST = os.environ.get("MQTT_HOST", "test.mosquitto.org")
MQTT_HOST = os.environ.get("MQTT_HOST", "localhost")
MQTT_PORT = int(os.environ.get("MQTT_PORT", 1883))
MQTT_USER = os.environ.get("MQTT_USER")
MQTT_PASSWORD = os.environ.get("MQTT_PASSWORD")
MQTT_TOPIC_PREFIX = os.environ.get("MQTT_TOPIC_PREFIX", "pdmgame/sessions")
MQTT_QOS = int(os.environ.get("MQTT_QOS", 0))

DISABLE_MQTT = MQTT_USER is None
USE_MQTT_AUTH = MQTT_HOST is not "localhost"


def get_mqtt_client() -> 'MqttClientBase':
if DISABLE_MQTT:
return MqttClientBase()
return MqttClient()


def on_connect(client, userdata, flags, rc, properties=None):
if rc == 0:
Expand All @@ -25,23 +33,26 @@ def on_connect(client, userdata, flags, rc, properties=None):
print(f"{datetime.now()}: MqttClient - connection failed. Code: {rc}")


class MqttClient:
class MqttClientBase:
def publish_session_state(self, client_uid: str, session: GameSessionDTO):
pass


class MqttClient(MqttClientBase):
__client__: paho.Client = None

def __init__(self):
if MQTT_USER is None:
return

client = paho.Client(
protocol=paho.MQTTv311,
callback_api_version=CallbackAPIVersion.VERSION2,
callback_api_version=paho.CallbackAPIVersion.VERSION2,
reconnect_on_failure=False,
clean_session=True
)
client.on_connect = on_connect

client.tls_set(tls_version=mqtt.client.ssl.PROTOCOL_TLS)
client.username_pw_set(MQTT_USER, MQTT_PASSWORD)
if USE_MQTT_AUTH:
client.tls_set(tls_version=mqtt.client.ssl.PROTOCOL_TLS)
client.username_pw_set(MQTT_USER, MQTT_PASSWORD)

print(
f"{datetime.now()}: MqttClient - attempting connection to {MQTT_HOST}:{MQTT_PORT} with user {MQTT_USER}"
Expand All @@ -53,9 +64,6 @@ def __init__(self):
self.__client__ = client

def publish_session_state(self, client_uid: str, session: GameSessionDTO):
if self.__client__ is None:
return

self.__client__.publish(
f"{MQTT_TOPIC_PREFIX}/{client_uid}", payload=bytes(session.json(), "utf-8"), qos=MQTT_QOS
)
4 changes: 2 additions & 2 deletions mvp/server/routers/sessions.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
from mvp.server.core.game.GameSession import GameSession
from mvp.server.core.game.GameSessionDTO import GameSessionDTO
from mvp.server.messaging.MqttFrontendConnectionDetails import MqttFrontendConnectionDetails
from mvp.server.messaging.mqtt_client import MqttClient
from mvp.server.messaging.mqtt_client import get_mqtt_client

sessions: dict[str, GameSession] = {}
game_metrics = GameMetrics()
mqtt_client = MqttClient()
mqtt_client = get_mqtt_client()

router = APIRouter(
prefix="/sessions",
Expand Down
Loading