Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions pulsar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ def partition_key(self):
"""
return self._message.partition_key()

def ordering_key(self):
"""
Get the ordering key for the message.
"""
return self._message.ordering_key()

def publish_timestamp(self):
"""
Get the timestamp in milliseconds with the message publish time.
Expand Down Expand Up @@ -1020,7 +1026,7 @@ def my_listener(reader, message):
Symmetric encryption class implementation, configuring public key encryption messages for the producer
and private key decryption messages for the consumer
"""

# If a pulsar.MessageId object is passed, access the _pulsar.MessageId object
if isinstance(start_message_id, MessageId):
start_message_id = start_message_id._msg_id
Expand Down Expand Up @@ -1142,6 +1148,7 @@ def last_sequence_id(self):
def send(self, content,
properties=None,
partition_key=None,
ordering_key=None,
sequence_id=None,
replication_clusters=None,
disable_replication=False,
Expand All @@ -1164,6 +1171,8 @@ def send(self, content,
partition_key: optional
Sets the partition key for message routing. A hash of this key is used
to determine the message's topic partition.
ordering_key: optional
Sets the ordering key for message routing.
sequence_id: optional
Specify a custom sequence id for the message being published.
replication_clusters: optional
Expand All @@ -1180,14 +1189,15 @@ def send(self, content,
deliver_after: optional
Specify a delay in timedelta for the delivery of the messages.
"""
msg = self._build_msg(content, properties, partition_key, sequence_id,
msg = self._build_msg(content, properties, partition_key, ordering_key, sequence_id,
replication_clusters, disable_replication, event_timestamp,
deliver_at, deliver_after)
return self._producer.send(msg)

def send_async(self, content, callback,
properties=None,
partition_key=None,
ordering_key=None,
sequence_id=None,
replication_clusters=None,
disable_replication=False,
Expand Down Expand Up @@ -1239,6 +1249,8 @@ def callback(res, msg_id):
partition_key: optional
Sets the partition key for the message routing. A hash of this key is
used to determine the message's topic partition.
ordering_key: optional
Sets the ordering key for the message routing.
sequence_id: optional
Specify a custom sequence id for the message being published.
replication_clusters: optional
Expand All @@ -1254,7 +1266,7 @@ def callback(res, msg_id):
deliver_after: optional
Specify a delay in timedelta for the delivery of the messages.
"""
msg = self._build_msg(content, properties, partition_key, sequence_id,
msg = self._build_msg(content, properties, partition_key, ordering_key, sequence_id,
replication_clusters, disable_replication, event_timestamp,
deliver_at, deliver_after)
self._producer.send_async(msg, callback)
Expand All @@ -1274,14 +1286,15 @@ def close(self):
"""
self._producer.close()

def _build_msg(self, content, properties, partition_key, sequence_id,
def _build_msg(self, content, properties, partition_key, ordering_key, sequence_id,
replication_clusters, disable_replication, event_timestamp,
deliver_at, deliver_after):
data = self._schema.encode(content)

_check_type(bytes, data, 'data')
_check_type_or_none(dict, properties, 'properties')
_check_type_or_none(str, partition_key, 'partition_key')
_check_type_or_none(str, ordering_key, 'ordering_key')
_check_type_or_none(int, sequence_id, 'sequence_id')
_check_type_or_none(list, replication_clusters, 'replication_clusters')
_check_type(bool, disable_replication, 'disable_replication')
Expand All @@ -1296,6 +1309,8 @@ def _build_msg(self, content, properties, partition_key, sequence_id,
mb.property(k, v)
if partition_key:
mb.partition_key(partition_key)
if ordering_key:
mb.ordering_key(ordering_key)
if sequence_id:
mb.sequence_id(sequence_id)
if replication_clusters:
Expand Down
6 changes: 6 additions & 0 deletions pulsar/functions/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ def get_partition_key(self):
"""Returns partition key of the input message is one exists"""
pass

@abstractmethod
def get_ordering_key(self):
"""Returns ordering key of the input message, if one exists"""
pass

@abstractmethod
def record_metric(self, metric_name, metric_value):
"""Records the metric_value. metric_value has to satisfy isinstance(metric_value, numbers.Number)"""
Expand All @@ -140,6 +145,7 @@ def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", p

properties,
partition_key,
ordering_key,
sequence_id,
replication_clusters,
disable_replication,
Expand Down
2 changes: 2 additions & 0 deletions src/message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ void export_message(py::module_& m) {
.def("deliver_after", &MessageBuilder::setDeliverAfter, return_value_policy::reference)
.def("deliver_at", &MessageBuilder::setDeliverAt, return_value_policy::reference)
.def("partition_key", &MessageBuilder::setPartitionKey, return_value_policy::reference)
.def("ordering_key", &MessageBuilder::setOrderingKey, return_value_policy::reference)
.def("event_timestamp", &MessageBuilder::setEventTimestamp, return_value_policy::reference)
.def("replication_clusters", &MessageBuilder::setReplicationClusters, return_value_policy::reference)
.def("disable_replication", &MessageBuilder::disableReplication, return_value_policy::reference)
Expand Down Expand Up @@ -87,6 +88,7 @@ void export_message(py::module_& m) {
.def("data", [](const Message& msg) { return bytes(msg.getDataAsString()); })
.def("length", &Message::getLength)
.def("partition_key", &Message::getPartitionKey, return_value_policy::copy)
.def("ordering_key", &Message::getOrderingKey, return_value_policy::copy)
.def("publish_timestamp", &Message::getPublishTimestamp)
.def("event_timestamp", &Message::getEventTimestamp)
.def("message_id", &Message::getMessageId, return_value_policy::copy)
Expand Down
17 changes: 17 additions & 0 deletions tests/pulsar_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,23 @@ def test_producer_consumer(self):
consumer.unsubscribe()
client.close()

def test_ordering_key(self):
client = Client(self.serviceUrl)
consumer = client.subscribe(
"my-python-topic-ordering-key", "my-sub", consumer_type=ConsumerType.KeyShared
)
producer = client.create_producer("my-python-topic-ordering-key")
producer.send(b"ordered-hello", ordering_key="random-ordering-key")

# Message should be available immediately with ordering key set
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b"ordered-hello")
self.assertEqual(msg.ordering_key(), "random-ordering-key")
consumer.unsubscribe()
producer.close()
client.close()

def test_redelivery_count(self):
client = Client(self.serviceUrl)
consumer = client.subscribe(
Expand Down