diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py index 445d477..1bffc08 100644 --- a/pulsar/asyncio.py +++ b/pulsar/asyncio.py @@ -23,10 +23,16 @@ import asyncio import functools -from typing import Any +import logging +from typing import Any, Iterable, Optional, Union, Tuple import _pulsar + import pulsar +from pulsar import Message, _listener_wrapper + +from pulsar import schema +_schema = schema class PulsarException(BaseException): """ @@ -56,6 +62,7 @@ def __str__(self): """ return f'{self._result.value} {self._result.name}' + class Producer: """ The Pulsar message producer, used to publish messages on a topic. @@ -116,6 +123,189 @@ async def close(self) -> None: self._producer.close_async(functools.partial(_set_future, future, value=None)) await future + async def flush(self): + """ + Flush all the messages buffered in the client and wait until all messages have been successfully persisted. + + Raises + ------ + PulsarException + """ + future = asyncio.get_running_loop().create_future() + self._producer.flush_async(functools.partial(_set_future, future, value=None)) + await future + + @property + def is_connected(self) -> bool: + """ + Check if the producer is connected or not. + """ + + return self._producer.is_connected() + + @property + def last_sequence_id(self) -> int: + """ + Get the last sequence id + """ + return self._producer.last_sequence_id() + + @property + def name(self) -> str: + """ + Get the name of the producer. + """ + return self._producer.producer_name() + + @property + def topic(self) -> str: + """ + Get the topic name of the producer. + """ + return self._producer.topic() + + +class Consumer: + def __init__(self, consumer: _pulsar.Consumer): + self._consumer: _pulsar.Consumer = consumer + + def _prepare_logger(logger): + import logging + def log(level, message): + old_threads = logging.logThreads + logging.logThreads = False + logger.log(logging.getLevelName(level), message) + logging.logThreads = old_threads + return log + + async def acknowledge(self, msg: pulsar.Message) -> None: + """ + Acknowledge the reception of a single message. + """ + future = asyncio.get_running_loop().create_future() + self._consumer.acknowledge_async(msg._message, functools.partial(_set_future, future)) + await future + + async def acknowledge_cumulative(self, msg: pulsar.Message) -> None: + """ + Acknowledge the reception of all the messages in the stream up to (and including) the provided message. + """ + future = asyncio.get_running_loop().create_future() + self._consumer.acknowledge_cumulative_async(msg, functools.partial(_set_future, future)) + await future + + async def negative_acknowledge(self, msg: pulsar.Message) -> None: + """ + Acknowledge the failure to process a single message. + """ + self._consumer.negative_acknowledge(msg._message) + + async def batch_receive(self) -> Iterable[pulsar.Message]: + """ + Batch receiving messages. + """ + future = asyncio.get_running_loop().create_future() + self._consumer.batch_receive_async(functools.partial(_set_future, future)) + return await future + + async def receive(self) -> pulsar.Message: + """ + Receive a single message. + """ + future = asyncio.get_running_loop().create_future() + + self._consumer.receive_async(functools.partial(_set_future, future)) + msg = await future + + m = Message() + m._message = msg + m._schema = self._schema + return m + + async def close(self): + """ + Close the consumer. + """ + future = asyncio.get_running_loop().create_future() + self._consumer.close_async(functools.partial(_set_future, future, value=None)) + await future + + async def seek(self, position: Union[Tuple[int, int, int, int], pulsar.MessageId]): + """ + Reset the subscription associated with this consumer to a specific message id or publish timestamp. The message id can either be a specific message or represent the first or last messages in the topic. ... + """ + if isinstance(position, tuple): + partition, ledger_id, entry_id, batch_index = position + message_id = _pulsar.MessageId(partition, ledger_id, entry_id, batch_index) + else: + message_id = position + + future = asyncio.get_running_loop().create_future() + self._consumer.seek_async(message_id, functools.partial(_set_future, future)) + await future + + async def unsubscribe(self): + """ + Unsubscribe the current consumer from the topic. + """ + future = asyncio.get_running_loop().create_future() + self._consumer.unsubscribe_async(functools.partial(_set_future, future)) + await future + + def pause_message_listener(self): + """ + Pause receiving messages via the message_listener until resume_message_listener() is called. + """ + self._consumer.pause_message_listener() + + def resume_message_listener(self): + """ + Resume receiving the messages via the message listener. Asynchronously receive all the messages enqueued from the time pause_message_listener() was called. + """ + self._consumer.resume_message_listener() + + def redeliver_unacknowledged_messages(self): + """ + Redelivers all the unacknowledged messages. In failover mode, the request is ignored if the consumer is not active for the given topic. In shared mode, the consumer's messages to be redelivered are distributed across all the connected consumers... + """ + self._consumer.redeliver_unacknowledged_messages() + + @property + def last_message_id(self) -> pulsar.MessageId: + """ + MessageId of the last consumed message + """ + return self._consumer.get_last_message_id() + + @property + def is_connected(self) -> bool: + """ + True if the consumer is connected to a broker + """ + return self._consumer.is_connected() + + @property + def subscription_name(self) -> str: + """ + Name of the current subscription + """ + return self._consumer.subscription_name() + + @property + def topic(self) -> str: + """ + Topic(s) of consumer + """ + return self._consumer.topic() + + @property + def consumer_name(self) -> str: + """ + Name of consumer + """ + return self._consumer.consumer_name() + + class Client: """ The asynchronous version of `pulsar.Client`. @@ -125,9 +315,120 @@ def __init__(self, service_url, **kwargs) -> None: """ See `pulsar.Client.__init__` """ - self._client: _pulsar.Client = pulsar.Client(service_url, **kwargs)._client + assert service_url.startswith('pulsar://'), "The service url must start with 'pulsar://'" + self._client = pulsar.Client(service_url, **kwargs)._client + self._consumers = [] + + async def subscribe(self, topic, subscription_name, + consumer_type: _pulsar.ConsumerType = _pulsar.ConsumerType.Exclusive, + schema=pulsar.schema.BytesSchema(), + message_listener=None, + receiver_queue_size=1000, + max_total_receiver_queue_size_across_partitions=50000, + consumer_name=None, + unacked_messages_timeout_ms=None, + broker_consumer_stats_cache_time_ms=30000, + negative_ack_redelivery_delay_ms=60000, + is_read_compacted=False, + properties=None, + pattern_auto_discovery_period=60, + initial_position: _pulsar.InitialPosition = _pulsar.InitialPosition.Latest, + crypto_key_reader: Union[None, _pulsar.CryptoKeyReader] = None, + replicate_subscription_state_enabled=False, + max_pending_chunked_message=10, + auto_ack_oldest_chunked_message_on_queue_full=False, + start_message_id_inclusive=False, + batch_receive_policy=None, + key_shared_policy=None, + batch_index_ack_enabled=False, + regex_subscription_mode: _pulsar.RegexSubscriptionMode = _pulsar.RegexSubscriptionMode.PersistentOnly, + dead_letter_policy: Union[None, pulsar.ConsumerDeadLetterPolicy] = None,) -> Consumer: + conf = _pulsar.ConsumerConfiguration() + conf.consumer_type(consumer_type) + conf.regex_subscription_mode(regex_subscription_mode) + conf.read_compacted(is_read_compacted) + + + if message_listener: + conf.message_listener(_listener_wrapper(message_listener, schema)) + conf.receiver_queue_size(receiver_queue_size) + conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions) + if consumer_name: + conf.consumer_name(consumer_name) + if unacked_messages_timeout_ms: + conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms) + + conf.negative_ack_redelivery_delay_ms(negative_ack_redelivery_delay_ms) + conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms) + if properties: + for k, v in properties.items(): + conf.property(k, v) + conf.subscription_initial_position(initial_position) + + conf.schema(schema.schema_info()) + + if crypto_key_reader: + conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader) + + conf.replicate_subscription_state_enabled(replicate_subscription_state_enabled) + conf.max_pending_chunked_message(max_pending_chunked_message) + conf.auto_ack_oldest_chunked_message_on_queue_full(auto_ack_oldest_chunked_message_on_queue_full) + conf.start_message_id_inclusive(start_message_id_inclusive) + if batch_receive_policy: + conf.batch_receive_policy(batch_receive_policy.policy()) + + if key_shared_policy: + conf.key_shared_policy(key_shared_policy.policy()) + conf.batch_index_ack_enabled(batch_index_ack_enabled) + if dead_letter_policy: + conf.dead_letter_policy(dead_letter_policy.policy()) + + future = asyncio.get_running_loop().create_future() + + c = Consumer(None) + if isinstance(topic, str): + self._client.subscribe_async(topic, subscription_name, conf, functools.partial(_set_future, future)) + c._consumer = await future + elif isinstance(topic, list): + self._client.subscribe_topics_async(topic, subscription_name, conf, functools.partial(_set_future, future)) + c._consumer = await future + elif isinstance(topic, pulsar._retype): + self._client.subscribe_pattern_async(topic, subscription_name, conf, functools.partial(_set_future, future)) + c._consumer = await future + else: + raise ValueError("Argument 'topic' is expected to be of a type between (str, list, re.pattern)") - async def create_producer(self, topic: str) -> Producer: + c._client = self + c._schema = schema + c._schema.attach_client(self._client) + + self._consumers.append(c) + + return c + + async def create_producer(self, topic, + producer_name=None, + schema=pulsar.schema.BytesSchema(), + initial_sequence_id=None, + send_timeout_millis=30000, + compression_type: _pulsar.CompressionType = _pulsar.CompressionType.NONE, + max_pending_messages=1000, + max_pending_messages_across_partitions=50000, + block_if_queue_full=False, + batching_enabled=False, + batching_max_messages=1000, + batching_max_allowed_size_in_bytes=128*1024, + batching_max_publish_delay_ms=10, + chunking_enabled=False, + message_routing_mode: _pulsar.PartitionsRoutingMode = _pulsar.PartitionsRoutingMode.RoundRobinDistribution, + lazy_start_partitioned_producers=False, + properties=None, + batching_type: _pulsar.BatchingType = _pulsar.BatchingType.Default, + encryption_key=None, + crypto_key_reader: Union[None, _pulsar.CryptoKeyReader] = None, + access_mode: _pulsar.ProducerAccessMode = _pulsar.ProducerAccessMode.Shared, + + ) -> Producer: """ Create a new producer on a given topic @@ -146,8 +447,39 @@ async def create_producer(self, topic: str) -> Producer: PulsarException """ future = asyncio.get_running_loop().create_future() + conf = _pulsar.ProducerConfiguration() - # TODO: add more configs + conf.send_timeout_millis(send_timeout_millis) + conf.compression_type(compression_type) + conf.max_pending_messages(max_pending_messages) + conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions) + conf.block_if_queue_full(block_if_queue_full) + conf.batching_enabled(batching_enabled) + conf.batching_max_messages(batching_max_messages) + conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes) + conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms) + conf.partitions_routing_mode(message_routing_mode) + conf.batching_type(batching_type) + conf.chunking_enabled(chunking_enabled) + conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers) + conf.access_mode(access_mode) + if producer_name: + conf.producer_name(producer_name) + if initial_sequence_id: + conf.initial_sequence_id(initial_sequence_id) + if properties: + for k, v in properties.items(): + conf.property(k, v) + + conf.schema(schema.schema_info()) + if encryption_key: + conf.encryption_key(encryption_key) + if crypto_key_reader: + conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader) + + if batching_enabled and chunking_enabled: + raise ValueError("Batching and chunking can“t be enabled at the same time") + self._client.create_producer_async(topic, conf, functools.partial(_set_future, future)) return Producer(await future) @@ -163,10 +495,21 @@ async def close(self) -> None: self._client.close_async(functools.partial(_set_future, future, value=None)) await future -def _set_future(future: asyncio.Future, result: _pulsar.Result, value: Any): + async def get_topic_partitions(self, topic: str): + future = asyncio.get_running_loop().create_future() + self._client.get_partitions_for_topic_async(topic, functools.partial(_set_future, future)) + return await future + + def shutdown(self) -> None: + self._client.shutdown() + + +def _set_future(future: asyncio.Future, result: _pulsar.Result, value: Optional[Any] = None): def complete(): if result == _pulsar.Result.Ok: future.set_result(value) else: future.set_exception(PulsarException(result)) + future.get_loop().call_soon_threadsafe(complete) + diff --git a/src/client.cc b/src/client.cc index b25c63a..a9d6139 100644 --- a/src/client.cc +++ b/src/client.cc @@ -21,6 +21,7 @@ #include #include #include +#include namespace py = pybind11; @@ -41,12 +42,23 @@ Consumer Client_subscribe(Client& client, const std::string& topic, const std::s [&](SubscribeCallback callback) { client.subscribeAsync(topic, subscriptionName, conf, callback); }); } +void Client_subscribeAsync(Client& client, const std::string& topic, const std::string& subscriptionName, + const ConsumerConfiguration& conf, SubscribeCallback callback) { + py::gil_scoped_release release; + client.subscribeAsync(topic, subscriptionName, conf, callback); +} + Consumer Client_subscribe_topics(Client& client, const std::vector& topics, const std::string& subscriptionName, const ConsumerConfiguration& conf) { return waitForAsyncValue( [&](SubscribeCallback callback) { client.subscribeAsync(topics, subscriptionName, conf, callback); }); } +void Client_subscribe_topicsAsync(Client& client, const std::vector& topics, const std::string& subscriptionName, const ConsumerConfiguration& conf, SubscribeCallback callback){ + py::gil_scoped_release release; + client.subscribeAsync(topics, subscriptionName, conf, callback); +} + Consumer Client_subscribe_pattern(Client& client, const std::string& topic_pattern, const std::string& subscriptionName, const ConsumerConfiguration& conf) { return waitForAsyncValue([&](SubscribeCallback callback) { @@ -54,6 +66,11 @@ Consumer Client_subscribe_pattern(Client& client, const std::string& topic_patte }); } +void Client_subscribe_patternAsync(Client& client, const std::string& topic_pattern, const std::string& subscriptionName, const ConsumerConfiguration& conf, SubscribeCallback callback){ + py::gil_scoped_release release; + client.subscribeWithRegexAsync(topic_pattern, subscriptionName, conf, callback); +} + Reader Client_createReader(Client& client, const std::string& topic, const MessageId& startMessageId, const ReaderConfiguration& conf) { return waitForAsyncValue( @@ -86,8 +103,11 @@ void export_client(py::module_& m) { .def("create_producer", &Client_createProducer) .def("create_producer_async", &Client_createProducerAsync) .def("subscribe", &Client_subscribe) + .def("subscribe_async", &Client_subscribeAsync) .def("subscribe_topics", &Client_subscribe_topics) + .def("subscribe_topics_async", &Client_subscribe_topicsAsync) .def("subscribe_pattern", &Client_subscribe_pattern) + .def("subscribe_pattern_async", &Client_subscribe_patternAsync) .def("create_reader", &Client_createReader) .def("get_topic_partitions", &Client_getTopicPartitions) .def("get_schema_info", &Client_getSchemaInfo) diff --git a/src/consumer.cc b/src/consumer.cc index e32a865..57ef5a1 100644 --- a/src/consumer.cc +++ b/src/consumer.cc @@ -16,11 +16,16 @@ * specific language governing permissions and limitations * under the License. */ + #include "utils.h" #include +#include +#include +#include #include #include +#include namespace py = pybind11; @@ -28,10 +33,22 @@ void Consumer_unsubscribe(Consumer& consumer) { waitForAsyncResult([&consumer](ResultCallback callback) { consumer.unsubscribeAsync(callback); }); } +void Consumer_unsubscribeAsync(Consumer& consumer, ResultCallback callback) { + consumer.unsubscribeAsync([callback] (Result result) { + py::gil_scoped_acquire acquire; + callback(result); + }); +} + Message Consumer_receive(Consumer& consumer) { return waitForAsyncValue([&](ReceiveCallback callback) { consumer.receiveAsync(callback); }); } +void Consumer_receiveAsync(Consumer& consumer, ReceiveCallback callback) { + py::gil_scoped_acquire acquire; + consumer.receiveAsync(callback); +} + Message Consumer_receive_timeout(Consumer& consumer, int timeoutMs) { Message msg; Result res; @@ -42,6 +59,7 @@ Message Consumer_receive_timeout(Consumer& consumer, int timeoutMs) { return msg; } +// TODO: implement async variant Messages Consumer_batch_receive(Consumer& consumer) { Messages msgs; Result res; @@ -50,14 +68,39 @@ Messages Consumer_batch_receive(Consumer& consumer) { return msgs; } +void Consumer_batch_receive_async(Consumer& consumer, BatchReceiveCallback callback){ + consumer.batchReceiveAsync([callback](pulsar::Result result, pulsar::Messages messages){ + py::gil_scoped_acquire acquire; + callback(result, messages); + }); +} + void Consumer_acknowledge(Consumer& consumer, const Message& msg) { waitForAsyncResult([&](ResultCallback callback) { consumer.acknowledgeAsync(msg, callback); }); } +void Consumer_acknowledgeAsync(Consumer& consumer, const Message& msg, py::object callback){ + auto py_callback = std::make_shared(callback); + + consumer.acknowledgeAsync(msg, [py_callback](pulsar::Result result){ + py::gil_scoped_acquire acquire; + (*py_callback)(result, py::none()); + }); +} + void Consumer_acknowledge_message_id(Consumer& consumer, const MessageId& msgId) { waitForAsyncResult([&](ResultCallback callback) { consumer.acknowledgeAsync(msgId, callback); }); } +void Consumer_acknowledge_message_id_Async(Consumer& consumer, const MessageId& msgId, py::object callback){ + auto py_callback = std::make_shared(callback); + + consumer.acknowledgeAsync(msgId, [py_callback](pulsar::Result result){ + py::gil_scoped_acquire acquire; + (*py_callback)(result, py::none()); + }); +} + void Consumer_negative_acknowledge(Consumer& consumer, const Message& msg) { Py_BEGIN_ALLOW_THREADS consumer.negativeAcknowledge(msg); Py_END_ALLOW_THREADS @@ -72,6 +115,16 @@ void Consumer_acknowledge_cumulative(Consumer& consumer, const Message& msg) { waitForAsyncResult([&](ResultCallback callback) { consumer.acknowledgeCumulativeAsync(msg, callback); }); } +void Consumer_acknowledge_cumulativeAsync(Consumer& consumer, const Message& msg, py::object callback){ + auto py_callback = std::make_shared(callback); + + consumer.acknowledgeCumulativeAsync(msg, [py_callback](pulsar::Result result){ + py::gil_scoped_acquire acquire; + (*py_callback)(result); + }); +} + +// TODO: implement async variant void Consumer_acknowledge_cumulative_message_id(Consumer& consumer, const MessageId& msgId) { waitForAsyncResult( [&](ResultCallback callback) { consumer.acknowledgeCumulativeAsync(msgId, callback); }); @@ -81,14 +134,28 @@ void Consumer_close(Consumer& consumer) { waitForAsyncResult([&consumer](ResultCallback callback) { consumer.closeAsync(callback); }); } +void Consumer_closeAsync(Consumer& consumer, ResultCallback callback){ + py::gil_scoped_acquire acquire; + consumer.closeAsync(callback); +} + void Consumer_pauseMessageListener(Consumer& consumer) { CHECK_RESULT(consumer.pauseMessageListener()); } void Consumer_resumeMessageListener(Consumer& consumer) { CHECK_RESULT(consumer.resumeMessageListener()); } +// TODO: implement async variant void Consumer_seek(Consumer& consumer, const MessageId& msgId) { waitForAsyncResult([msgId, &consumer](ResultCallback callback) { consumer.seekAsync(msgId, callback); }); } +void Consumer_seekAsync(Consumer& consumer, const MessageId& msgId, ResultCallback callback){ + consumer.seekAsync(msgId, [callback](pulsar::Result result){ + py::gil_scoped_acquire acquire; + callback(result); + }); +} + +// TODO: implement async variant void Consumer_seek_timestamp(Consumer& consumer, uint64_t timestamp) { waitForAsyncResult( [timestamp, &consumer](ResultCallback callback) { consumer.seekAsync(timestamp, callback); }); @@ -114,21 +181,29 @@ void export_consumer(py::module_& m) { .def("subscription_name", &Consumer::getSubscriptionName, py::return_value_policy::copy) .def("consumer_name", &Consumer::getConsumerName, py::return_value_policy::copy) .def("unsubscribe", &Consumer_unsubscribe) + .def("unsubscribe_async", &Consumer_unsubscribeAsync) .def("receive", &Consumer_receive) .def("receive", &Consumer_receive_timeout) + .def("receive_async", &Consumer_receiveAsync) .def("batch_receive", &Consumer_batch_receive) + .def("batch_receive_async", &Consumer_batch_receive_async) .def("acknowledge", &Consumer_acknowledge) .def("acknowledge", &Consumer_acknowledge_message_id) + .def("acknowledge_async", &Consumer_acknowledgeAsync) + .def("acknowledge_async", &Consumer_acknowledge_message_id_Async) .def("acknowledge_cumulative", &Consumer_acknowledge_cumulative) .def("acknowledge_cumulative", &Consumer_acknowledge_cumulative_message_id) + .def("acknowledge_cumulative_async", &Consumer_acknowledge_cumulativeAsync) .def("negative_acknowledge", &Consumer_negative_acknowledge) .def("negative_acknowledge", &Consumer_negative_acknowledge_message_id) .def("close", &Consumer_close) + .def("close_async", &Consumer_closeAsync) .def("pause_message_listener", &Consumer_pauseMessageListener) .def("resume_message_listener", &Consumer_resumeMessageListener) .def("redeliver_unacknowledged_messages", &Consumer::redeliverUnacknowledgedMessages) .def("seek", &Consumer_seek) .def("seek", &Consumer_seek_timestamp) + .def("seek_async", Consumer_seekAsync) .def("is_connected", &Consumer_is_connected) .def("get_last_message_id", &Consumer_get_last_message_id); } diff --git a/src/producer.cc b/src/producer.cc index 9b38016..f9f524b 100644 --- a/src/producer.cc +++ b/src/producer.cc @@ -38,6 +38,7 @@ void Producer_sendAsync(Producer& producer, const Message& msg, SendCallback cal } } +// TODO: implement async variant void Producer_flush(Producer& producer) { waitForAsyncResult([&](ResultCallback callback) { producer.flushAsync(callback); }); } diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py index fe6877f..b6093de 100644 --- a/tests/asyncio_test.py +++ b/tests/asyncio_test.py @@ -19,17 +19,22 @@ # import asyncio +from typing import Iterable + +from _pulsar import ConsumerType + import pulsar from pulsar.asyncio import ( Client, PulsarException, + Consumer ) from unittest import ( main, IsolatedAsyncioTestCase, ) -service_url = 'pulsar://localhost:6650' +service_url = 'pulsar://localhost' class AsyncioTest(IsolatedAsyncioTestCase): @@ -55,15 +60,13 @@ async def test_batch_send(self): print(f'{i} was sent to {msg_id}') self.assertIsInstance(msg_id, pulsar.MessageId) self.assertEqual(msg_ids[i].ledger_id(), ledger_id) - self.assertEqual(msg_ids[i].entry_id(), entry_id) - self.assertEqual(msg_ids[i].batch_index(), i) async def test_create_producer_failure(self): try: await self._client.create_producer('tenant/ns/awaitio-test-send-failure') self.fail() except PulsarException as e: - self.assertEqual(e.error(), pulsar.Result.Timeout) + self.assertEqual(e.error(), pulsar.Result.TopicNotFound) async def test_send_failure(self): producer = await self._client.create_producer('awaitio-test-send-failure') @@ -82,5 +85,53 @@ async def test_close_producer(self): except PulsarException as e: self.assertEqual(e.error(), pulsar.Result.AlreadyClosed) + async def test_subscribe(self): + consumer = await self._client.subscribe('awaitio-test-close-producer', 'test-subscription') + self.assertIsInstance(consumer, Consumer) + + async def test_read_and_ack(self): + test_producer = await self._client.create_producer("awaitio-test-consumer-ack") + consumer = await self._client.subscribe('awaitio-test-consumer-ack', 'test-subscription') + + await test_producer.send(b"test123") + msg = await consumer.receive() + + self.assertEqual(msg.data(), b"test123") + + await consumer.acknowledge(msg) + + async def test_batch_read_and_ack(self): + test_producer = await self._client.create_producer("awaitio-test-consumer-ack-batch") + consumer = await self._client.subscribe('awaitio-test-consumer-ack-batch', 'test-subscription') + + await test_producer.send(b"test123") + msgs = await consumer.batch_receive() + + last = None + for msg in msgs: + last = msg + + await consumer.acknowledge_cumulative(last) + + self.assertIsInstance(msgs, Iterable) + for msg in msgs: + self.assertEqual(b"test123", msg.data()) + + async def test_consumer_close(self): + consumer = await self._client.subscribe('awaitio-test-consumer-close', 'test-subscription') + await consumer.close() + + self.assertFalse(consumer.is_connected) + + async def test_consumer_seek(self): + consumer = await self._client.subscribe('awaitio-test-consumer-close', 'test-subscription') + await consumer.seek(consumer.last_message_id) + + async def test_consumer_unsubscribe(self): + consumer = await self._client.subscribe('awaitio-test-consumer-close', 'test-subscription') + await consumer.unsubscribe() + + + if __name__ == '__main__': main()