Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix mqtt #130

Merged
merged 2 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 21 additions & 20 deletions mango/container/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
"""
super().__init__(
codec=codec,
addr=broker_addr,
addr=inbox_topic or client_id,
clock=clock,
name=client_id,
**kwargs,
Expand All @@ -83,7 +83,8 @@
self.client_id: str = client_id
# the client will be created on start.
self.mqtt_client: paho.Client = None
self.inbox_topic: None | str = inbox_topic
self.inbox_topic: None | str = inbox_topic or client_id
self.broker_addr = broker_addr
# dict mapping additionally subscribed topics to a set of aids
self.additional_subscriptions: dict[str, set[str]] = {}
# Future for pending sub requests
Expand All @@ -93,7 +94,7 @@
self._loop = asyncio.get_event_loop()
if not self.client_id:
raise ValueError("client_id is required!")
if not self.addr:
if not self.broker_addr:
raise ValueError("broker_addr is required!")

# get parameters for Client.init()
Expand Down Expand Up @@ -141,29 +142,29 @@
mqtt_messenger.on_connect = on_con

# check broker_addr input and connect
if isinstance(self.addr, tuple):
if not 0 < len(self.addr) < 4:
if isinstance(self.broker_addr, tuple):
if not 0 < len(self.broker_addr) < 4:
raise ValueError("Invalid broker address argument count")
if len(self.addr) > 0 and not isinstance(self.addr[0], str):
if len(self.broker_addr) > 0 and not isinstance(self.broker_addr[0], str):
raise ValueError("Invalid broker address - host must be str")
if len(self.addr) > 1 and not isinstance(self.addr[1], int):
if len(self.broker_addr) > 1 and not isinstance(self.broker_addr[1], int):
raise ValueError("Invalid broker address - port must be int")
if len(self.addr) > 2 and not isinstance(self.addr[2], int):
if len(self.broker_addr) > 2 and not isinstance(self.broker_addr[2], int):
raise ValueError("Invalid broker address - keepalive must be int")
mqtt_messenger.connect(*self.addr, **self._kwargs)
mqtt_messenger.connect(*self.broker_addr, **self._kwargs)

elif isinstance(self.addr, dict):
if "hostname" not in self.addr.keys():
elif isinstance(self.broker_addr, dict):
if "hostname" not in self.broker_addr.keys():

Check warning on line 157 in mango/container/mqtt.py

View check run for this annotation

Codecov / codecov/patch

mango/container/mqtt.py#L156-L157

Added lines #L156 - L157 were not covered by tests
raise ValueError("Invalid broker address - host not given")
mqtt_messenger.connect(**self.addr, **self._kwargs)
mqtt_messenger.connect(**self.broker_addr, **self._kwargs)

Check warning on line 159 in mango/container/mqtt.py

View check run for this annotation

Codecov / codecov/patch

mango/container/mqtt.py#L159

Added line #L159 was not covered by tests

else:
if not isinstance(self.addr, str):
if not isinstance(self.broker_addr, str):

Check warning on line 162 in mango/container/mqtt.py

View check run for this annotation

Codecov / codecov/patch

mango/container/mqtt.py#L162

Added line #L162 was not covered by tests
raise ValueError("Invalid broker address")
mqtt_messenger.connect(self.addr, **self._kwargs)
mqtt_messenger.connect(self.broker_addr, **self._kwargs)

Check warning on line 164 in mango/container/mqtt.py

View check run for this annotation

Codecov / codecov/patch

mango/container/mqtt.py#L164

Added line #L164 was not covered by tests

logger.info(
"[%s]: Going to connect to broker at %s..", self.client_id, self.addr
"[%s]: Going to connect to broker at %s..", self.client_id, self.broker_addr
)

counter = 0
Expand All @@ -178,12 +179,12 @@
if not connected.done():
# timeout
raise ConnectionError(
f"Connection to {self.addr} could not be "
f"Connection to {self.broker_addr} could not be "
f"established after {counter * 0.1} seconds"
)
if connected.result() != 0:
raise ConnectionError(
f"Connection to {self.addr} could not be "
f"Connection to {self.broker_addr} could not be "
f"set up. Callback returner error code "
f"{connected.result()}"
)
Expand Down Expand Up @@ -212,7 +213,7 @@
# subscription to inbox topic was not successful
mqtt_messenger.disconnect()
raise ConnectionError(
f"Subscription request to {self.inbox_topic} at {self.addr} "
f"Subscription request to {self.inbox_topic} at {self.broker_addr} "
f"returned error code: {result}"
)

Expand All @@ -224,7 +225,7 @@
counter += 1
if not subscribed.done():
raise ConnectionError(
f"Subscription request to {self.inbox_topic} at {self.addr} "
f"Subscription request to {self.inbox_topic} at {self.broker_addr} "
f"did not succeed after {counter * 0.1} seconds."
)
logger.info("successfully subscribed to topic")
Expand Down Expand Up @@ -373,7 +374,7 @@
actual_mqtt_kwargs = {} if kwargs is None else kwargs
if (
self.inbox_topic
and receiver_addr == self.inbox_topic
and receiver_addr.protocol_addr == self.inbox_topic
and not actual_mqtt_kwargs.get("retain", False)
):
meta.update(
Expand Down
27 changes: 19 additions & 8 deletions tests/integration_tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,18 @@ def create_test_container(type, init_addr, repl_addr, codec):
broker = ("127.0.0.1", 1883, 60)

clock_man = ExternalClock(5)
clock_ag = ExternalClock()
if type == "tcp":
container_man = create_tcp_container(
addr=init_addr,
codec=codec,
clock=clock_man,
)
container_ag = create_tcp_container(
addr=repl_addr,
codec=codec,
clock=clock_ag,
)
elif type == "mqtt":
container_man = create_mqtt_container(
broker_addr=broker,
Expand All @@ -21,14 +27,6 @@ def create_test_container(type, init_addr, repl_addr, codec):
inbox_topic=init_addr,
transport="tcp",
)
clock_ag = ExternalClock()
if type == "tcp":
container_ag = create_tcp_container(
addr=repl_addr,
codec=codec,
clock=clock_ag,
)
elif type == "mqtt":
container_ag = create_mqtt_container(
broker_addr=broker,
client_id="container_2",
Expand All @@ -37,4 +35,17 @@ def create_test_container(type, init_addr, repl_addr, codec):
inbox_topic=repl_addr,
transport="tcp",
)
elif type == "mqtt_minimal":
container_man = create_mqtt_container(
broker_addr=broker,
client_id=init_addr,
clock=clock_man,
codec=codec,
)
container_ag = create_mqtt_container(
broker_addr=broker,
client_id=repl_addr,
clock=clock_ag,
codec=codec,
)
return container_man, container_ag
16 changes: 14 additions & 2 deletions tests/integration_tests/test_message_roundtrip.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ def string_serializer():

async def setup_and_run_test_case(connection_type, codec):
comm_topic = "test_topic"
init_addr = ("127.0.0.1", 1555) if connection_type == "tcp" else None
repl_addr = ("127.0.0.1", 1556) if connection_type == "tcp" else None
init_addr = ("127.0.0.1", 1555) if connection_type == "tcp" else "c1"
repl_addr = ("127.0.0.1", 1556) if connection_type == "tcp" else "c2"

container_1, container_2 = create_test_container(
connection_type, init_addr, repl_addr, codec
Expand Down Expand Up @@ -164,3 +164,15 @@ async def test_mqtt_json():
@pytest.mark.mqtt
async def test_mqtt_proto():
await setup_and_run_test_case("mqtt", PROTO_CODEC)


@pytest.mark.asyncio
@pytest.mark.mqtt
async def test_mqtt_minimal_json():
await asyncio.wait_for(setup_and_run_test_case("mqtt_minimal", JSON_CODEC), 1)


@pytest.mark.asyncio
@pytest.mark.mqtt
async def test_mqtt_minimal_proto():
await asyncio.wait_for(setup_and_run_test_case("mqtt_minimal", PROTO_CODEC), 1)
2 changes: 2 additions & 0 deletions tests/unit_tests/core/test_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ async def test_schedule_acl_message():
# THEN
assert agent2.test_counter == 1


def test_register_twice():
c = create_tcp_container(addr=("127.0.0.1", 5555))
agent = MyAgent()
Expand All @@ -110,6 +111,7 @@ def test_register_twice():
with pytest.raises(ValueError):
c.register(agent)


def test_sync_setup_agent():
# this test is not async and therefore does not provide a running event loop
c = create_tcp_container(addr=("127.0.0.1", 5555))
Expand Down