Skip to content

Commit 9047170

Browse files
authored
Added missing publish option ordering_key (#152)
The `ordering_key` was not available to set when publishing messages. This has been added, aping how partition_key is set. Added test to cover new functionality.
1 parent 8c36eb7 commit 9047170

File tree

4 files changed

+44
-4
lines changed

4 files changed

+44
-4
lines changed

pulsar/__init__.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,12 @@ def partition_key(self):
146146
"""
147147
return self._message.partition_key()
148148

149+
def ordering_key(self):
150+
"""
151+
Get the ordering key for the message.
152+
"""
153+
return self._message.ordering_key()
154+
149155
def publish_timestamp(self):
150156
"""
151157
Get the timestamp in milliseconds with the message publish time.
@@ -1020,7 +1026,7 @@ def my_listener(reader, message):
10201026
Symmetric encryption class implementation, configuring public key encryption messages for the producer
10211027
and private key decryption messages for the consumer
10221028
"""
1023-
1029+
10241030
# If a pulsar.MessageId object is passed, access the _pulsar.MessageId object
10251031
if isinstance(start_message_id, MessageId):
10261032
start_message_id = start_message_id._msg_id
@@ -1142,6 +1148,7 @@ def last_sequence_id(self):
11421148
def send(self, content,
11431149
properties=None,
11441150
partition_key=None,
1151+
ordering_key=None,
11451152
sequence_id=None,
11461153
replication_clusters=None,
11471154
disable_replication=False,
@@ -1164,6 +1171,8 @@ def send(self, content,
11641171
partition_key: optional
11651172
Sets the partition key for message routing. A hash of this key is used
11661173
to determine the message's topic partition.
1174+
ordering_key: optional
1175+
Sets the ordering key for message routing.
11671176
sequence_id: optional
11681177
Specify a custom sequence id for the message being published.
11691178
replication_clusters: optional
@@ -1180,14 +1189,15 @@ def send(self, content,
11801189
deliver_after: optional
11811190
Specify a delay in timedelta for the delivery of the messages.
11821191
"""
1183-
msg = self._build_msg(content, properties, partition_key, sequence_id,
1192+
msg = self._build_msg(content, properties, partition_key, ordering_key, sequence_id,
11841193
replication_clusters, disable_replication, event_timestamp,
11851194
deliver_at, deliver_after)
11861195
return self._producer.send(msg)
11871196

11881197
def send_async(self, content, callback,
11891198
properties=None,
11901199
partition_key=None,
1200+
ordering_key=None,
11911201
sequence_id=None,
11921202
replication_clusters=None,
11931203
disable_replication=False,
@@ -1239,6 +1249,8 @@ def callback(res, msg_id):
12391249
partition_key: optional
12401250
Sets the partition key for the message routing. A hash of this key is
12411251
used to determine the message's topic partition.
1252+
ordering_key: optional
1253+
Sets the ordering key for the message routing.
12421254
sequence_id: optional
12431255
Specify a custom sequence id for the message being published.
12441256
replication_clusters: optional
@@ -1254,7 +1266,7 @@ def callback(res, msg_id):
12541266
deliver_after: optional
12551267
Specify a delay in timedelta for the delivery of the messages.
12561268
"""
1257-
msg = self._build_msg(content, properties, partition_key, sequence_id,
1269+
msg = self._build_msg(content, properties, partition_key, ordering_key, sequence_id,
12581270
replication_clusters, disable_replication, event_timestamp,
12591271
deliver_at, deliver_after)
12601272
self._producer.send_async(msg, callback)
@@ -1274,14 +1286,15 @@ def close(self):
12741286
"""
12751287
self._producer.close()
12761288

1277-
def _build_msg(self, content, properties, partition_key, sequence_id,
1289+
def _build_msg(self, content, properties, partition_key, ordering_key, sequence_id,
12781290
replication_clusters, disable_replication, event_timestamp,
12791291
deliver_at, deliver_after):
12801292
data = self._schema.encode(content)
12811293

12821294
_check_type(bytes, data, 'data')
12831295
_check_type_or_none(dict, properties, 'properties')
12841296
_check_type_or_none(str, partition_key, 'partition_key')
1297+
_check_type_or_none(str, ordering_key, 'ordering_key')
12851298
_check_type_or_none(int, sequence_id, 'sequence_id')
12861299
_check_type_or_none(list, replication_clusters, 'replication_clusters')
12871300
_check_type(bool, disable_replication, 'disable_replication')
@@ -1296,6 +1309,8 @@ def _build_msg(self, content, properties, partition_key, sequence_id,
12961309
mb.property(k, v)
12971310
if partition_key:
12981311
mb.partition_key(partition_key)
1312+
if ordering_key:
1313+
mb.ordering_key(ordering_key)
12991314
if sequence_id:
13001315
mb.sequence_id(sequence_id)
13011316
if replication_clusters:

pulsar/functions/context.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,11 @@ def get_partition_key(self):
125125
"""Returns partition key of the input message is one exists"""
126126
pass
127127

128+
@abstractmethod
129+
def get_ordering_key(self):
130+
"""Returns ordering key of the input message, if one exists"""
131+
pass
132+
128133
@abstractmethod
129134
def record_metric(self, metric_name, metric_value):
130135
"""Records the metric_value. metric_value has to satisfy isinstance(metric_value, numbers.Number)"""
@@ -140,6 +145,7 @@ def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", p
140145
141146
properties,
142147
partition_key,
148+
ordering_key,
143149
sequence_id,
144150
replication_clusters,
145151
disable_replication,

src/message.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ void export_message(py::module_& m) {
4242
.def("deliver_after", &MessageBuilder::setDeliverAfter, return_value_policy::reference)
4343
.def("deliver_at", &MessageBuilder::setDeliverAt, return_value_policy::reference)
4444
.def("partition_key", &MessageBuilder::setPartitionKey, return_value_policy::reference)
45+
.def("ordering_key", &MessageBuilder::setOrderingKey, return_value_policy::reference)
4546
.def("event_timestamp", &MessageBuilder::setEventTimestamp, return_value_policy::reference)
4647
.def("replication_clusters", &MessageBuilder::setReplicationClusters, return_value_policy::reference)
4748
.def("disable_replication", &MessageBuilder::disableReplication, return_value_policy::reference)
@@ -87,6 +88,7 @@ void export_message(py::module_& m) {
8788
.def("data", [](const Message& msg) { return bytes(msg.getDataAsString()); })
8889
.def("length", &Message::getLength)
8990
.def("partition_key", &Message::getPartitionKey, return_value_policy::copy)
91+
.def("ordering_key", &Message::getOrderingKey, return_value_policy::copy)
9092
.def("publish_timestamp", &Message::getPublishTimestamp)
9193
.def("event_timestamp", &Message::getEventTimestamp)
9294
.def("message_id", &Message::getMessageId, return_value_policy::copy)

tests/pulsar_test.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,23 @@ def test_producer_consumer(self):
251251
consumer.unsubscribe()
252252
client.close()
253253

254+
def test_ordering_key(self):
255+
client = Client(self.serviceUrl)
256+
consumer = client.subscribe(
257+
"my-python-topic-ordering-key", "my-sub", consumer_type=ConsumerType.KeyShared
258+
)
259+
producer = client.create_producer("my-python-topic-ordering-key")
260+
producer.send(b"ordered-hello", ordering_key="random-ordering-key")
261+
262+
# Message should be available immediately with ordering key set
263+
msg = consumer.receive(TM)
264+
self.assertTrue(msg)
265+
self.assertEqual(msg.data(), b"ordered-hello")
266+
self.assertEqual(msg.ordering_key(), "random-ordering-key")
267+
consumer.unsubscribe()
268+
producer.close()
269+
client.close()
270+
254271
def test_redelivery_count(self):
255272
client = Client(self.serviceUrl)
256273
consumer = client.subscribe(

0 commit comments

Comments
 (0)