Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add properties for client.publish() #29

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/mqttproto/_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class InsufficientData(MQTTDecodeError):

class MQTTUnsupportedPropertyType(MQTTDecodeError):
"""
Raised when decoding an MQTT packet and it contains a property of a type not
Raised when decoding or encoding an MQTT packet and it contains a property of a type not
supported by that packet type.
"""

Expand Down
3 changes: 3 additions & 0 deletions src/mqttproto/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,9 @@ class PropertiesMixin:
def encode_properties(self, buffer: bytearray) -> None:
internal_buffer = bytearray()
for identifier, value in self.properties.items():
if identifier not in self.allowed_property_types:
raise MQTTUnsupportedPropertyType(identifier, self.__class__)

encode_variable_integer(identifier, internal_buffer)
identifier.encoder(value, internal_buffer)

Expand Down
6 changes: 5 additions & 1 deletion src/mqttproto/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
MQTTSubscribeAckPacket,
MQTTUnsubscribeAckPacket,
PropertyType,
PropertyValue,
QoS,
ReasonCode,
RetainHandling,
Expand Down Expand Up @@ -498,6 +499,7 @@ async def publish(
*,
qos: QoS = QoS.AT_MOST_ONCE,
retain: bool = False,
properties: dict[PropertyType, PropertyValue] | None = None,
) -> None:
"""
Publish a message to the given topic.
Expand All @@ -510,7 +512,9 @@ async def publish(
before the subscription happened

"""
packet_id = self._state_machine.publish(topic, payload, qos=qos, retain=retain)
packet_id = self._state_machine.publish(
topic, payload, qos=qos, retain=retain, properties=properties
)
if qos is QoS.EXACTLY_ONCE:
assert packet_id is not None
await self._run_operation(MQTTQoS2PublishOperation(packet_id))
Expand Down
9 changes: 8 additions & 1 deletion src/mqttproto/client_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
MQTTUnsubscribeAckPacket,
MQTTUnsubscribePacket,
PropertyType,
PropertyValue,
QoS,
ReasonCode,
Subscription,
Expand Down Expand Up @@ -153,6 +154,7 @@ def publish(
*,
qos: QoS = QoS.AT_MOST_ONCE,
retain: bool = False,
properties: dict[PropertyType, PropertyValue] | None = None,
) -> int | None:
"""
Send a ``PUBLISH`` request.
Expand All @@ -171,7 +173,12 @@ def publish(
self._out_require_state(MQTTClientState.CONNECTED)
packet_id = self._generate_packet_id() if qos > QoS.AT_MOST_ONCE else None
packet = MQTTPublishPacket(
topic=topic, payload=payload, qos=qos, retain=retain, packet_id=packet_id
topic=topic,
payload=payload,
qos=qos,
retain=retain,
packet_id=packet_id,
properties=properties if properties is not None else {},
)
packet.encode(self._out_buffer)
if packet_id is not None:
Expand Down
14 changes: 12 additions & 2 deletions src/mqttproto/sync_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@
from anyio.from_thread import BlockingPortal, BlockingPortalProvider
from attrs import define

from ._types import MQTTPublishPacket, QoS, RetainHandling, Will
from ._types import (
MQTTPublishPacket,
PropertyType,
PropertyValue,
QoS,
RetainHandling,
Will,
)
from .async_client import AsyncMQTTClient, AsyncMQTTSubscription

if sys.version_info >= (3, 11):
Expand Down Expand Up @@ -104,9 +111,12 @@ def publish(
*,
qos: QoS = QoS.AT_MOST_ONCE,
retain: bool = False,
properties: dict[PropertyType, PropertyValue] | None = None,
) -> None:
return self._portal.call(
lambda: self._async_client.publish(topic, payload, qos=qos, retain=retain)
lambda: self._async_client.publish(
topic, payload, qos=qos, retain=retain, properties=properties
)
)

@contextmanager
Expand Down
Loading