diff --git a/pulsar-client-cpp/python/pulsar/__init__.py b/pulsar-client-cpp/python/pulsar/__init__.py index 95cc9d4460651..e5180a5f00471 100644 --- a/pulsar-client-cpp/python/pulsar/__init__.py +++ b/pulsar-client-cpp/python/pulsar/__init__.py @@ -745,6 +745,7 @@ def my_listener(consumer, message): conf = _pulsar.ConsumerConfiguration() conf.consumer_type(consumer_type) conf.read_compacted(is_read_compacted) + conf.pattern_auto_discovery_period(pattern_auto_discovery_period) if message_listener: conf.message_listener(_listener_wrapper(message_listener, schema)) conf.receiver_queue_size(receiver_queue_size) diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py index 19cbc26cabd6e..0f43a60999ec5 100755 --- a/pulsar-client-cpp/python/pulsar_test.py +++ b/pulsar-client-cpp/python/pulsar_test.py @@ -18,7 +18,9 @@ # under the License. # - +import re +import logging +import random from unittest import TestCase, main import time import os @@ -53,6 +55,8 @@ TM = 10000 # Do not wait forever in tests +logging.basicConfig(level=os.getenv("LOG_LEVEL", logging.INFO)) + def doHttpPost(url, data): req = Request(url, data.encode()) @@ -254,6 +258,32 @@ def test_deliver_after(self): producer.close() client.close() + def test_pattern_auto_discovery_period(self): + # use random topic to make it likely we're not using existing topics + topic_prefix = f"persistent://public/default/testing{random.randint(0, 99999)}" + client = Client(self.serviceUrl, logger=logging.getLogger("pulsar")) + consumer = client.subscribe( + re.compile(f"{topic_prefix}-.*"), + f"sub_auto_discovery", + consumer_type=ConsumerType.Shared, + pattern_auto_discovery_period=1, + ) + + # create the producer *after* the consumer to ensure use of topic discovery + producer = client.create_producer(f"{topic_prefix}-pattern1") + # wait *before* message sent, otherwise it's not picked up by consumer + time.sleep(1.5) + producer.send(b"hello") + + # Message should be available now + msg = consumer.receive(100) + + self.assertIsNotNone(msg) + self.assertEqual(msg.data(), b"hello") + consumer.unsubscribe() + producer.close() + client.close() + def test_consumer_initial_position(self): client = Client(self.serviceUrl) producer = client.create_producer("consumer-initial-position") @@ -997,8 +1027,6 @@ def test_topics_consumer(self): client.close() def test_topics_pattern_consumer(self): - import re - client = Client(self.serviceUrl) topics_pattern = "persistent://public/default/my-python-pattern-consumer.*" @@ -1024,12 +1052,8 @@ def test_topics_pattern_consumer(self): "my-pattern-consumer-sub", consumer_type=ConsumerType.Shared, receiver_queue_size=10, - pattern_auto_discovery_period=1, ) - # wait enough time to trigger auto discovery - time.sleep(2) - for i in range(100): producer1.send(b"hello-1-%d" % i)