Skip to content

Commit

Permalink
Added Python wrapper for C++ reader API (#718)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Sep 13, 2017
1 parent c19a1ce commit 5cf010e
Show file tree
Hide file tree
Showing 8 changed files with 378 additions and 18 deletions.
3 changes: 2 additions & 1 deletion pulsar-client-cpp/python/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ INCLUDE_DIRECTORIES("${Boost_INCLUDE_DIRS}" "${PYTHON_INCLUDE_DIRS}")

ADD_LIBRARY(_pulsar SHARED src/pulsar.cc src/producer.cc src/consumer.cc
src/config.cc src/enums.cc src/client.cc
src/message.cc src/authentication.cc)
src/message.cc src/authentication.cc
src/reader.cc)
SET(CMAKE_SHARED_LIBRARY_PREFIX )
SET(CMAKE_SHARED_LIBRARY_SUFFIX .so)

Expand Down
124 changes: 124 additions & 0 deletions pulsar-client-cpp/python/pulsar.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,33 @@ def send_callback(res, msg):
from _pulsar import Result, CompressionType, ConsumerType, PartitionsRoutingMode # noqa: F401


class MessageId:
"""
Represents a message id
"""

'Represents the earliest message stored in a topic'
earliest = _pulsar.MessageId.earliest

'Represents the latest message published on a topic'
latest = _pulsar.MessageId.latest

def serialize(self):
"""
Returns a string representation of the message id.
This string can be stored and later deserialized.
"""
return self._msg_id.serialize()

@staticmethod
def deserialize(message_id_str):
"""
Deserialize a message id object from a previously
serialized string.
"""
return _pulsar.MessageId.deserialize(message_id_str)


class Message:
"""
Message objects are returned by a consumer, either by calling `receive` or
Expand Down Expand Up @@ -353,6 +380,66 @@ def my_listener(consumer, message):
self._consumers.append(c)
return c

def create_reader(self, topic, start_message_id,
reader_listener=None,
receiver_queue_size=1000,
reader_name=None
):
"""
Create a reader on a particular topic
**Args**
* `topic`: The name of the topic.
* `start_message_id`: The initial reader positioning is done by specifying a message id.
The options are:
* `MessageId.earliest`: Start reading from the earliest message available in the topic
* `MessageId.latest`: Start reading from the end topic, only getting messages published
after the reader was created
* `MessageId`: When passing a particular message id, the reader will position itself on
that specific position. The first message to be read will be the message next to the
specified messageId. Message id can be serialized into a string and deserialized
back into a `MessageId` object:
# Serialize to string
s = msg.message_id().serialize()
# Deserialize from string
msg_id = MessageId.deserialize(s)
**Options**
* `reader_listener`:
Sets a message listener for the reader. When the listener is set,
the application will receive messages through it. Calls to
`reader.read_next()` will not be allowed. The listener function needs
to accept (reader, message), for example:
def my_listener(reader, message):
# process message
pass
* `receiver_queue_size`:
Sets the size of the reader receive queue. The reader receive
queue controls how many messages can be accumulated by the reader
before the application calls `read_next()`. Using a higher value could
potentially increase the reader throughput at the expense of higher
memory utilization.
* `reader_name`:
Sets the reader name.
"""
conf = _pulsar.ReaderConfiguration()
if reader_listener:
conf.reader_listener(reader_listener)
conf.receiver_queue_size(receiver_queue_size)
if reader_name:
conf.reader_name(reader_name)
c = Reader()
c._reader = self._client.create_reader(topic, start_message_id, conf)
c._client = self
self._consumers.append(c)
return c

def close(self):
"""
Close the client and all the associated producers and consumers
Expand Down Expand Up @@ -574,3 +661,40 @@ def close(self):
"""
self._consumer.close()
self._client._consumers.remove(self)


class Reader:
"""
Pulsar topic reader.
"""

def topic(self):
"""
Return the topic this reader is reading from.
"""
return self._reader.topic()

def read_next(self, timeout_millis=None):
"""
Read a single message.
If a message is not immediately available, this method will block until
a new message is available.
**Options**
* `timeout_millis`:
If specified, the receive will raise an exception if a message is not
available within the timeout.
"""
if timeout_millis is None:
return self._reader.read_next()
else:
return self._reader.read_next(timeout_millis)

def close(self):
"""
Close the reader.
"""
self._reader.close()
self._client._consumers.remove(self)
118 changes: 113 additions & 5 deletions pulsar-client-cpp/python/pulsar_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@

from unittest import TestCase, main
import time
from pulsar import Client, \
from pulsar import Client, MessageId, \
CompressionType, ConsumerType

from _pulsar import ProducerConfiguration, ConsumerConfiguration


class PulsarTest(TestCase):

serviceUrl = 'pulsar://localhost:8885'

def test_producer_config(self):
conf = ProducerConfiguration()
conf.send_timeout_millis(12)
Expand All @@ -51,14 +54,14 @@ def test_consumer_config(self):
self.assertEqual(conf.consumer_name(), "my-name")

def test_simple_producer(self):
client = Client('pulsar://localhost:6650/')
client = Client(self.serviceUrl)
producer = client.create_producer('persistent://sample/standalone/ns/my-python-topic')
producer.send('hello')
producer.close()
client.close()

def test_producer_send_async(self):
client = Client('pulsar://localhost:6650/')
client = Client(self.serviceUrl)
producer = client.create_producer('persistent://sample/standalone/ns/my-python-topic')

sent_messages = []
Expand All @@ -75,7 +78,7 @@ def send_callback(producer, msg):
client.close()

def test_producer_consumer(self):
client = Client('pulsar://localhost:6650/')
client = Client(self.serviceUrl)
consumer = client.subscribe('persistent://sample/standalone/ns/my-python-topic-producer-consumer',
'my-sub',
consumer_type=ConsumerType.Shared)
Expand All @@ -95,7 +98,7 @@ def test_producer_consumer(self):
client.close()

def test_message_listener(self):
client = Client('pulsar://localhost:6650/')
client = Client(self.serviceUrl)

received_messages = []

Expand All @@ -120,6 +123,111 @@ def listener(consumer, msg):
self.assertEqual(received_messages[2].data(), "hello-3")
client.close()

def test_reader_simple(self):
client = Client(self.serviceUrl)
reader = client.create_reader('persistent://sample/standalone/ns/my-python-topic-reader-simple',
MessageId.earliest)

producer = client.create_producer('persistent://sample/standalone/ns/my-python-topic-reader-simple')
producer.send('hello')

msg = reader.read_next()
self.assertTrue(msg)
self.assertEqual(msg.data(), 'hello')

try:
msg = reader.read_next(100)
self.assertTrue(False) # Should not reach this point
except:
pass # Exception is expected

reader.close()
client.close()

def test_reader_on_last_message(self):
client = Client(self.serviceUrl)
producer = client.create_producer('persistent://sample/standalone/ns/my-python-topic-reader-on-last-message')

for i in range(10):
producer.send('hello-%d' % i)

reader = client.create_reader('persistent://sample/standalone/ns/my-python-topic-reader-on-last-message',
MessageId.latest)

for i in range(10, 20):
producer.send('hello-%d' % i)

for i in range(10, 20):
msg = reader.read_next()
self.assertTrue(msg)
self.assertEqual(msg.data(), 'hello-%d' % i)

reader.close()
client.close()

def test_reader_on_specific_message(self):
client = Client(self.serviceUrl)
producer = client.create_producer(
'persistent://sample/standalone/ns/my-python-topic-reader-on-specific-message')

for i in range(10):
producer.send('hello-%d' % i)

reader1 = client.create_reader(
'persistent://sample/standalone/ns/my-python-topic-reader-on-specific-message',
MessageId.earliest)

for i in range(5):
msg = reader1.read_next()
last_msg_id = msg.message_id()

reader2 = client.create_reader(
'persistent://sample/standalone/ns/my-python-topic-reader-on-specific-message',
last_msg_id)

for i in range(5, 10):
msg = reader2.read_next()
self.assertTrue(msg)
self.assertEqual(msg.data(), 'hello-%d' % i)

reader1.close()
reader2.close()
client.close()

def test_reader_on_specific_message_with_batches(self):
client = Client(self.serviceUrl)
producer = client.create_producer(
'persistent://sample/standalone/ns/my-python-topic-reader-on-specific-message-with-batches',
batching_enabled=True,
batching_max_publish_delay_ms=1000)

for i in range(10):
producer.send_async('hello-%d' % i, None)

# Send one sync message to make sure everything was published
producer.send('hello-10')

reader1 = client.create_reader(
'persistent://sample/standalone/ns/my-python-topic-reader-on-specific-message-with-batches',
MessageId.earliest)

for i in range(5):
msg = reader1.read_next()
last_msg_id = msg.message_id()

reader2 = client.create_reader(
'persistent://sample/standalone/ns/my-python-topic-reader-on-specific-message-with-batches',
last_msg_id)

for i in range(5, 11):
msg = reader2.read_next()
self.assertTrue(msg)
self.assertEqual(msg.data(), 'hello-%d' % i)

reader1.close()
reader2.close()
client.close()


if __name__ == '__main__':
main()
15 changes: 15 additions & 0 deletions pulsar-client-cpp/python/src/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,20 @@ Consumer Client_subscribe(Client& client, const std::string& topic, const std::s
return consumer;
}

Reader Client_createReader(Client& client, const std::string& topic,
const BatchMessageId& startMessageId,
const ReaderConfiguration& conf) {
Reader reader;
Result res;

Py_BEGIN_ALLOW_THREADS
res = client.createReader(topic, startMessageId, conf, reader);
Py_END_ALLOW_THREADS

CHECK_RESULT(res);
return reader;
}

void Client_close(Client& client) {
Result res;

Expand All @@ -59,6 +73,7 @@ void export_client() {
class_<Client>("Client", init<const std::string&, const ClientConfiguration& >())
.def("create_producer", &Client_createProducer)
.def("subscribe", &Client_subscribe)
.def("create_reader", &Client_createReader)
.def("close", &Client_close)
.def("shutdown", &Client::shutdown)
;
Expand Down
Loading

0 comments on commit 5cf010e

Please sign in to comment.