@@ -1576,7 +1576,7 @@ def redeliver_unacknowledged_messages(self):
1576
1576
"""
1577
1577
self ._consumer .redeliver_unacknowledged_messages ()
1578
1578
1579
- def seek (self , messageid ):
1579
+ def seek (self , messageid : Union [ MessageId , _pulsar . MessageId , int ] ):
1580
1580
"""
1581
1581
Reset the subscription associated with this consumer to a specific message id or publish timestamp.
1582
1582
The message id can either be a specific message or represent the first or last messages in the topic.
@@ -1586,10 +1586,10 @@ def seek(self, messageid):
1586
1586
Parameters
1587
1587
----------
1588
1588
1589
- messageid:
1589
+ messageid: MessageId, _pulsar.MessageId or int
1590
1590
The message id for seek, OR an integer event time to seek to
1591
1591
"""
1592
- self ._consumer .seek (messageid )
1592
+ self ._consumer .seek (_seek_arg_convert ( messageid ) )
1593
1593
1594
1594
def close (self ):
1595
1595
"""
@@ -1745,7 +1745,7 @@ def has_message_available(self):
1745
1745
"""
1746
1746
return self ._reader .has_message_available ();
1747
1747
1748
- def seek (self , messageid ):
1748
+ def seek (self , messageid : Union [ MessageId , _pulsar . MessageId , int ] ):
1749
1749
"""
1750
1750
Reset this reader to a specific message id or publish timestamp.
1751
1751
The message id can either be a specific message or represent the first or last messages in the topic.
@@ -1755,10 +1755,10 @@ def seek(self, messageid):
1755
1755
Parameters
1756
1756
----------
1757
1757
1758
- messageid:
1758
+ messageid: MessageId, _pulsar.MessageId or int
1759
1759
The message id for seek, OR an integer event time to seek to
1760
1760
"""
1761
- self ._reader .seek (messageid )
1761
+ self ._reader .seek (_seek_arg_convert ( messageid ) )
1762
1762
1763
1763
def close (self ):
1764
1764
"""
@@ -1829,3 +1829,11 @@ def wrapper(consumer, msg):
1829
1829
m ._schema = schema
1830
1830
listener (c , m )
1831
1831
return wrapper
1832
+
1833
+ def _seek_arg_convert (seek_arg ):
1834
+ if isinstance (seek_arg , MessageId ):
1835
+ return seek_arg ._msg_id
1836
+ elif isinstance (seek_arg , (_pulsar .MessageId , int )):
1837
+ return seek_arg
1838
+ else :
1839
+ raise ValueError (f"invalid seek_arg type { type (seek_arg )} " )
0 commit comments