From 5cf010e8d410498e67175dbc4169cc98aafed8db Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Wed, 13 Sep 2017 15:28:05 -0700 Subject: [PATCH] Added Python wrapper for C++ reader API (#718) --- pulsar-client-cpp/python/CMakeLists.txt | 3 +- pulsar-client-cpp/python/pulsar.py | 124 ++++++++++++++++++++++++ pulsar-client-cpp/python/pulsar_test.py | 118 +++++++++++++++++++++- pulsar-client-cpp/python/src/client.cc | 15 +++ pulsar-client-cpp/python/src/config.cc | 31 ++++-- pulsar-client-cpp/python/src/message.cc | 25 ++++- pulsar-client-cpp/python/src/pulsar.cc | 2 + pulsar-client-cpp/python/src/reader.cc | 78 +++++++++++++++ 8 files changed, 378 insertions(+), 18 deletions(-) create mode 100644 pulsar-client-cpp/python/src/reader.cc diff --git a/pulsar-client-cpp/python/CMakeLists.txt b/pulsar-client-cpp/python/CMakeLists.txt index 2c8fe4232091e..a4ff679236d10 100644 --- a/pulsar-client-cpp/python/CMakeLists.txt +++ b/pulsar-client-cpp/python/CMakeLists.txt @@ -21,7 +21,8 @@ INCLUDE_DIRECTORIES("${Boost_INCLUDE_DIRS}" "${PYTHON_INCLUDE_DIRS}") ADD_LIBRARY(_pulsar SHARED src/pulsar.cc src/producer.cc src/consumer.cc src/config.cc src/enums.cc src/client.cc - src/message.cc src/authentication.cc) + src/message.cc src/authentication.cc + src/reader.cc) SET(CMAKE_SHARED_LIBRARY_PREFIX ) SET(CMAKE_SHARED_LIBRARY_SUFFIX .so) diff --git a/pulsar-client-cpp/python/pulsar.py b/pulsar-client-cpp/python/pulsar.py index 9fb717f0b1e75..756f0038fe58b 100644 --- a/pulsar-client-cpp/python/pulsar.py +++ b/pulsar-client-cpp/python/pulsar.py @@ -104,6 +104,33 @@ def send_callback(res, msg): from _pulsar import Result, CompressionType, ConsumerType, PartitionsRoutingMode # noqa: F401 +class MessageId: + """ + Represents a message id + """ + + 'Represents the earliest message stored in a topic' + earliest = _pulsar.MessageId.earliest + + 'Represents the latest message published on a topic' + latest = _pulsar.MessageId.latest + + def serialize(self): + """ + Returns a string representation of the message id. + This string can be stored and later deserialized. + """ + return self._msg_id.serialize() + + @staticmethod + def deserialize(message_id_str): + """ + Deserialize a message id object from a previously + serialized string. + """ + return _pulsar.MessageId.deserialize(message_id_str) + + class Message: """ Message objects are returned by a consumer, either by calling `receive` or @@ -353,6 +380,66 @@ def my_listener(consumer, message): self._consumers.append(c) return c + def create_reader(self, topic, start_message_id, + reader_listener=None, + receiver_queue_size=1000, + reader_name=None + ): + """ + Create a reader on a particular topic + + **Args** + + * `topic`: The name of the topic. + * `start_message_id`: The initial reader positioning is done by specifying a message id. + The options are: + * `MessageId.earliest`: Start reading from the earliest message available in the topic + * `MessageId.latest`: Start reading from the end topic, only getting messages published + after the reader was created + * `MessageId`: When passing a particular message id, the reader will position itself on + that specific position. The first message to be read will be the message next to the + specified messageId. Message id can be serialized into a string and deserialized + back into a `MessageId` object: + + # Serialize to string + s = msg.message_id().serialize() + + # Deserialize from string + msg_id = MessageId.deserialize(s) + + **Options** + + * `reader_listener`: + Sets a message listener for the reader. When the listener is set, + the application will receive messages through it. Calls to + `reader.read_next()` will not be allowed. The listener function needs + to accept (reader, message), for example: + + def my_listener(reader, message): + # process message + pass + + * `receiver_queue_size`: + Sets the size of the reader receive queue. The reader receive + queue controls how many messages can be accumulated by the reader + before the application calls `read_next()`. Using a higher value could + potentially increase the reader throughput at the expense of higher + memory utilization. + * `reader_name`: + Sets the reader name. + """ + conf = _pulsar.ReaderConfiguration() + if reader_listener: + conf.reader_listener(reader_listener) + conf.receiver_queue_size(receiver_queue_size) + if reader_name: + conf.reader_name(reader_name) + c = Reader() + c._reader = self._client.create_reader(topic, start_message_id, conf) + c._client = self + self._consumers.append(c) + return c + def close(self): """ Close the client and all the associated producers and consumers @@ -574,3 +661,40 @@ def close(self): """ self._consumer.close() self._client._consumers.remove(self) + + +class Reader: + """ + Pulsar topic reader. + """ + + def topic(self): + """ + Return the topic this reader is reading from. + """ + return self._reader.topic() + + def read_next(self, timeout_millis=None): + """ + Read a single message. + + If a message is not immediately available, this method will block until + a new message is available. + + **Options** + + * `timeout_millis`: + If specified, the receive will raise an exception if a message is not + available within the timeout. + """ + if timeout_millis is None: + return self._reader.read_next() + else: + return self._reader.read_next(timeout_millis) + + def close(self): + """ + Close the reader. + """ + self._reader.close() + self._client._consumers.remove(self) diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py index 22d6ba644babb..89f92f45ec2dd 100755 --- a/pulsar-client-cpp/python/pulsar_test.py +++ b/pulsar-client-cpp/python/pulsar_test.py @@ -21,13 +21,16 @@ from unittest import TestCase, main import time -from pulsar import Client, \ +from pulsar import Client, MessageId, \ CompressionType, ConsumerType from _pulsar import ProducerConfiguration, ConsumerConfiguration class PulsarTest(TestCase): + + serviceUrl = 'pulsar://localhost:8885' + def test_producer_config(self): conf = ProducerConfiguration() conf.send_timeout_millis(12) @@ -51,14 +54,14 @@ def test_consumer_config(self): self.assertEqual(conf.consumer_name(), "my-name") def test_simple_producer(self): - client = Client('pulsar://localhost:6650/') + client = Client(self.serviceUrl) producer = client.create_producer('persistent://sample/standalone/ns/my-python-topic') producer.send('hello') producer.close() client.close() def test_producer_send_async(self): - client = Client('pulsar://localhost:6650/') + client = Client(self.serviceUrl) producer = client.create_producer('persistent://sample/standalone/ns/my-python-topic') sent_messages = [] @@ -75,7 +78,7 @@ def send_callback(producer, msg): client.close() def test_producer_consumer(self): - client = Client('pulsar://localhost:6650/') + client = Client(self.serviceUrl) consumer = client.subscribe('persistent://sample/standalone/ns/my-python-topic-producer-consumer', 'my-sub', consumer_type=ConsumerType.Shared) @@ -95,7 +98,7 @@ def test_producer_consumer(self): client.close() def test_message_listener(self): - client = Client('pulsar://localhost:6650/') + client = Client(self.serviceUrl) received_messages = [] @@ -120,6 +123,111 @@ def listener(consumer, msg): self.assertEqual(received_messages[2].data(), "hello-3") client.close() + def test_reader_simple(self): + client = Client(self.serviceUrl) + reader = client.create_reader('persistent://sample/standalone/ns/my-python-topic-reader-simple', + MessageId.earliest) + + producer = client.create_producer('persistent://sample/standalone/ns/my-python-topic-reader-simple') + producer.send('hello') + + msg = reader.read_next() + self.assertTrue(msg) + self.assertEqual(msg.data(), 'hello') + + try: + msg = reader.read_next(100) + self.assertTrue(False) # Should not reach this point + except: + pass # Exception is expected + + reader.close() + client.close() + + def test_reader_on_last_message(self): + client = Client(self.serviceUrl) + producer = client.create_producer('persistent://sample/standalone/ns/my-python-topic-reader-on-last-message') + + for i in range(10): + producer.send('hello-%d' % i) + + reader = client.create_reader('persistent://sample/standalone/ns/my-python-topic-reader-on-last-message', + MessageId.latest) + + for i in range(10, 20): + producer.send('hello-%d' % i) + + for i in range(10, 20): + msg = reader.read_next() + self.assertTrue(msg) + self.assertEqual(msg.data(), 'hello-%d' % i) + + reader.close() + client.close() + + def test_reader_on_specific_message(self): + client = Client(self.serviceUrl) + producer = client.create_producer( + 'persistent://sample/standalone/ns/my-python-topic-reader-on-specific-message') + + for i in range(10): + producer.send('hello-%d' % i) + + reader1 = client.create_reader( + 'persistent://sample/standalone/ns/my-python-topic-reader-on-specific-message', + MessageId.earliest) + + for i in range(5): + msg = reader1.read_next() + last_msg_id = msg.message_id() + + reader2 = client.create_reader( + 'persistent://sample/standalone/ns/my-python-topic-reader-on-specific-message', + last_msg_id) + + for i in range(5, 10): + msg = reader2.read_next() + self.assertTrue(msg) + self.assertEqual(msg.data(), 'hello-%d' % i) + + reader1.close() + reader2.close() + client.close() + + def test_reader_on_specific_message_with_batches(self): + client = Client(self.serviceUrl) + producer = client.create_producer( + 'persistent://sample/standalone/ns/my-python-topic-reader-on-specific-message-with-batches', + batching_enabled=True, + batching_max_publish_delay_ms=1000) + + for i in range(10): + producer.send_async('hello-%d' % i, None) + + # Send one sync message to make sure everything was published + producer.send('hello-10') + + reader1 = client.create_reader( + 'persistent://sample/standalone/ns/my-python-topic-reader-on-specific-message-with-batches', + MessageId.earliest) + + for i in range(5): + msg = reader1.read_next() + last_msg_id = msg.message_id() + + reader2 = client.create_reader( + 'persistent://sample/standalone/ns/my-python-topic-reader-on-specific-message-with-batches', + last_msg_id) + + for i in range(5, 11): + msg = reader2.read_next() + self.assertTrue(msg) + self.assertEqual(msg.data(), 'hello-%d' % i) + + reader1.close() + reader2.close() + client.close() + if __name__ == '__main__': main() diff --git a/pulsar-client-cpp/python/src/client.cc b/pulsar-client-cpp/python/src/client.cc index 8bc191370ba72..ad78ed24a54eb 100644 --- a/pulsar-client-cpp/python/src/client.cc +++ b/pulsar-client-cpp/python/src/client.cc @@ -43,6 +43,20 @@ Consumer Client_subscribe(Client& client, const std::string& topic, const std::s return consumer; } +Reader Client_createReader(Client& client, const std::string& topic, + const BatchMessageId& startMessageId, + const ReaderConfiguration& conf) { + Reader reader; + Result res; + + Py_BEGIN_ALLOW_THREADS + res = client.createReader(topic, startMessageId, conf, reader); + Py_END_ALLOW_THREADS + + CHECK_RESULT(res); + return reader; +} + void Client_close(Client& client) { Result res; @@ -59,6 +73,7 @@ void export_client() { class_("Client", init()) .def("create_producer", &Client_createProducer) .def("subscribe", &Client_subscribe) + .def("create_reader", &Client_createReader) .def("close", &Client_close) .def("shutdown", &Client::shutdown) ; diff --git a/pulsar-client-cpp/python/src/config.cc b/pulsar-client-cpp/python/src/config.cc index bdbd325ac2e8e..b17cd26e2067f 100644 --- a/pulsar-client-cpp/python/src/config.cc +++ b/pulsar-client-cpp/python/src/config.cc @@ -18,30 +18,31 @@ */ #include "utils.h" -struct Consumer_MessageListener { +template +struct ListenerWrapper { PyObject* _pyListener; - Consumer_MessageListener(py::object pyListener) : + ListenerWrapper(py::object pyListener) : _pyListener(pyListener.ptr()) { Py_XINCREF(_pyListener); } - Consumer_MessageListener(const Consumer_MessageListener& other) { + ListenerWrapper(const ListenerWrapper& other) { _pyListener = other._pyListener; Py_XINCREF(_pyListener); } - Consumer_MessageListener& operator=(const Consumer_MessageListener& other) { + ListenerWrapper& operator=(const ListenerWrapper& other) { _pyListener = other._pyListener; Py_XINCREF(_pyListener); return *this; } - virtual ~Consumer_MessageListener() { + virtual ~ListenerWrapper() { Py_XDECREF(_pyListener); } - void operator()(Consumer consumer, const Message& msg) { + void operator()(T consumer, const Message& msg) { PyGILState_STATE state = PyGILState_Ensure(); try { @@ -55,8 +56,14 @@ struct Consumer_MessageListener { }; static ConsumerConfiguration& ConsumerConfiguration_setMessageListener(ConsumerConfiguration& conf, - py::object pyListener) { - conf.setMessageListener(Consumer_MessageListener(pyListener)); + py::object pyListener) { + conf.setMessageListener(ListenerWrapper(pyListener)); + return conf; +} + +static ReaderConfiguration& ReaderConfiguration_setReaderListener(ReaderConfiguration& conf, + py::object pyListener) { + conf.setReaderListener(ListenerWrapper(pyListener)); return conf; } @@ -124,4 +131,12 @@ void export_config() { .def("broker_consumer_stats_cache_time_ms", &ConsumerConfiguration::getBrokerConsumerStatsCacheTimeInMs) .def("broker_consumer_stats_cache_time_ms", &ConsumerConfiguration::setBrokerConsumerStatsCacheTimeInMs) ; + + class_("ReaderConfiguration") + .def("message_listener", &ReaderConfiguration_setReaderListener, return_self<>()) + .def("receiver_queue_size", &ReaderConfiguration::getReceiverQueueSize) + .def("receiver_queue_size", &ReaderConfiguration::setReceiverQueueSize) + .def("reader_name", &ReaderConfiguration::getReaderName, return_value_policy()) + .def("reader_name", &ReaderConfiguration::setReaderName) + ; } diff --git a/pulsar-client-cpp/python/src/message.cc b/pulsar-client-cpp/python/src/message.cc index ff4d964acb6b2..e94e0fc625c20 100644 --- a/pulsar-client-cpp/python/src/message.cc +++ b/pulsar-client-cpp/python/src/message.cc @@ -20,18 +20,28 @@ #include -std::string MessageId_str(const MessageId& msgId) { +std::string MessageId_str(const BatchMessageId& msgId) { std::stringstream ss; ss << msgId; return ss.str(); } +std::string MessageId_serialize(const BatchMessageId& msgId) { + std::string serialized; + msgId.serialize(serialized); + return serialized; +} + std::string Message_str(const Message& msg) { std::stringstream ss; ss << msg; return ss.str(); } +const BatchMessageId& Message_getMessageId(const Message& msg) { + return static_cast(msg.getMessageId()); +} + void export_message() { using namespace boost::python; @@ -51,8 +61,15 @@ void export_message() { .def(map_indexing_suite()) ; - class_("MessageId") + static const BatchMessageId& _MessageId_earliest = static_cast(MessageId::earliest()); + static const BatchMessageId& _MessageId_latest = static_cast(MessageId::latest()); + + class_ >("MessageId") .def("__str__", &MessageId_str) + .add_static_property("earliest", make_getter(&_MessageId_earliest)) + .add_static_property("latest", make_getter(&_MessageId_latest)) + .def("serialize", &MessageId_serialize) + .def("deserialize", &MessageId::deserialize).staticmethod("deserialize") ; class_("Message") @@ -61,7 +78,7 @@ void export_message() { .def("length", &Message::getLength) .def("partition_key", &Message::getPartitionKey, return_value_policy()) .def("publish_timestamp", &Message::getPublishTimestamp) - .def("message_id", &Message::getMessageId, return_value_policy()) + .def("message_id", &Message_getMessageId, return_value_policy()) .def("__str__", &Message_str) ; -} \ No newline at end of file +} diff --git a/pulsar-client-cpp/python/src/pulsar.cc b/pulsar-client-cpp/python/src/pulsar.cc index 273eb8df4ea34..f3ceefd1cc0e5 100644 --- a/pulsar-client-cpp/python/src/pulsar.cc +++ b/pulsar-client-cpp/python/src/pulsar.cc @@ -22,6 +22,7 @@ void export_client(); void export_message(); void export_producer(); void export_consumer(); +void export_reader(); void export_config(); void export_enums(); void export_authentication(); @@ -46,6 +47,7 @@ BOOST_PYTHON_MODULE(_pulsar) export_message(); export_producer(); export_consumer(); + export_reader(); export_config(); export_enums(); export_authentication(); diff --git a/pulsar-client-cpp/python/src/reader.cc b/pulsar-client-cpp/python/src/reader.cc new file mode 100644 index 0000000000000..81aa78b3da1d7 --- /dev/null +++ b/pulsar-client-cpp/python/src/reader.cc @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "utils.h" + +Message Reader_readNext(Reader& reader) { + Message msg; + Result res; + + while (true) { + Py_BEGIN_ALLOW_THREADS + // Use 100ms timeout to periodically check whether the + // interpreter was interrupted + res = reader.readNext(msg, 100); + Py_END_ALLOW_THREADS + + if (res != ResultTimeout) { + // In case of timeout we keep calling receive() to simulate a + // blocking call until a message is available, while breaking + // every once in a while to check the Python signal status + break; + } + + if (PyErr_CheckSignals() == -1) { + PyErr_SetInterrupt(); + return msg; + } + } + + CHECK_RESULT(res); + return msg; +} + +Message Reader_readNextTimeout(Reader& reader, int timeoutMs) { + Message msg; + Result res; + Py_BEGIN_ALLOW_THREADS + res = reader.readNext(msg, timeoutMs); + Py_END_ALLOW_THREADS + + CHECK_RESULT(res); + return msg; +} + +void Reader_close(Reader& reader) { + Result res; + Py_BEGIN_ALLOW_THREADS + res = reader.close(); + Py_END_ALLOW_THREADS + + CHECK_RESULT(res); +} + +void export_reader() { + using namespace boost::python; + + class_("Reader", no_init) + .def("topic", &Reader::getTopic, return_value_policy()) + .def("read_next", &Reader_readNext) + .def("read_next", &Reader_readNextTimeout) + .def("close", &Reader_close) + ; +}