Skip to content

Commit 71257dc

Browse files
committed
Support seek a MessageId from pulsar module
1 parent fb4523b commit 71257dc

File tree

2 files changed

+36
-6
lines changed

2 files changed

+36
-6
lines changed

pulsar/__init__.py

+14-6
Original file line numberDiff line numberDiff line change
@@ -1576,7 +1576,7 @@ def redeliver_unacknowledged_messages(self):
15761576
"""
15771577
self._consumer.redeliver_unacknowledged_messages()
15781578

1579-
def seek(self, messageid):
1579+
def seek(self, messageid: Union[MessageId, _pulsar.MessageId, int]):
15801580
"""
15811581
Reset the subscription associated with this consumer to a specific message id or publish timestamp.
15821582
The message id can either be a specific message or represent the first or last messages in the topic.
@@ -1586,10 +1586,10 @@ def seek(self, messageid):
15861586
Parameters
15871587
----------
15881588
1589-
messageid:
1589+
messageid: MessageId, _pulsar.MessageId or int
15901590
The message id for seek, OR an integer event time to seek to
15911591
"""
1592-
self._consumer.seek(messageid)
1592+
self._consumer.seek(_seek_arg_convert(messageid))
15931593

15941594
def close(self):
15951595
"""
@@ -1745,7 +1745,7 @@ def has_message_available(self):
17451745
"""
17461746
return self._reader.has_message_available();
17471747

1748-
def seek(self, messageid):
1748+
def seek(self, messageid: Union[MessageId, _pulsar.MessageId, int]):
17491749
"""
17501750
Reset this reader to a specific message id or publish timestamp.
17511751
The message id can either be a specific message or represent the first or last messages in the topic.
@@ -1755,10 +1755,10 @@ def seek(self, messageid):
17551755
Parameters
17561756
----------
17571757
1758-
messageid:
1758+
messageid: MessageId, _pulsar.MessageId or int
17591759
The message id for seek, OR an integer event time to seek to
17601760
"""
1761-
self._reader.seek(messageid)
1761+
self._reader.seek(_seek_arg_convert(messageid))
17621762

17631763
def close(self):
17641764
"""
@@ -1829,3 +1829,11 @@ def wrapper(consumer, msg):
18291829
m._schema = schema
18301830
listener(c, m)
18311831
return wrapper
1832+
1833+
def _seek_arg_convert(seek_arg):
1834+
if isinstance(seek_arg, MessageId):
1835+
return seek_arg._msg_id
1836+
elif isinstance(seek_arg, (_pulsar.MessageId, int)):
1837+
return seek_arg
1838+
else:
1839+
raise ValueError(f"invalid seek_arg type {type(seek_arg)}")

tests/pulsar_test.py

+22
Original file line numberDiff line numberDiff line change
@@ -1019,11 +1019,22 @@ def test_seek(self):
10191019
msg = consumer.receive(TM)
10201020
self.assertEqual(msg.data(), b"hello-0")
10211021

1022+
# seek with wrong type
1023+
with self.assertRaises(ValueError, msg="invalid seek_arg type <class 'float'>"):
1024+
consumer.seek(1.0)
1025+
10221026
# seek on messageId
10231027
consumer.seek(ids[50])
10241028
msg = consumer.receive(TM)
10251029
self.assertEqual(msg.data(), b"hello-51")
10261030

1031+
# seek on a user provided MessageId
1032+
msg_id = MessageId(ledger_id=ids[60].ledger_id(),
1033+
entry_id=ids[60].entry_id())
1034+
consumer.seek(msg_id)
1035+
msg = consumer.receive(TM)
1036+
self.assertEqual(msg.data(), b"hello-61")
1037+
10271038
# ditto, but seek on timestamp
10281039
consumer.seek(timestamps[42])
10291040
msg = consumer.receive(TM)
@@ -1034,6 +1045,10 @@ def test_seek(self):
10341045
with self.assertRaises(pulsar.Timeout):
10351046
reader.read_next(100)
10361047

1048+
# seek with wrong type
1049+
with self.assertRaises(ValueError, msg="invalid seek_arg type <class 'float'>"):
1050+
consumer.seek(1.0)
1051+
10371052
# earliest
10381053
reader.seek(MessageId.earliest)
10391054
msg = reader.read_next(TM)
@@ -1048,6 +1063,13 @@ def test_seek(self):
10481063
msg = reader.read_next(TM)
10491064
self.assertEqual(msg.data(), b"hello-35")
10501065

1066+
# seek on a user provided MessageId
1067+
msg_id = MessageId(ledger_id=ids[44].ledger_id(),
1068+
entry_id=ids[44].entry_id())
1069+
reader.seek(msg_id)
1070+
msg = reader.read_next(TM)
1071+
self.assertEqual(msg.data(), b"hello-45")
1072+
10511073
# seek on timestamp
10521074
reader.seek(timestamps[79])
10531075
msg = reader.read_next(TM)

0 commit comments

Comments
 (0)