From d82cd55af6b527d63558f08d35fb1d3923313160 Mon Sep 17 00:00:00 2001 From: Shivji Kumar Jha Date: Wed, 28 Oct 2020 14:20:54 +0530 Subject: [PATCH] [Issue 8154] [Python client] Expose schema version (of writerSchema) in Message (#8173) * [Issue 8154] Expose schema version (of writerSchema) in python client message * Adding formating suggestion on PR#8173 to fix tests * Fixing build issues * Added a test for python client returning schema version * Added one more test case for python client returning schema version * Fix test- move subscribe before send so the consumer offset is ahead of new data * Fix test to make it run on python 2 and 3 both Co-authored-by: Sijie Guo --- pulsar/__init__.py | 6 ++++++ schema_test.py | 26 ++++++++++++++++++++++++++ src/message.cc | 7 +++++++ 3 files changed, 39 insertions(+) diff --git a/pulsar/__init__.py b/pulsar/__init__.py index 20e7afe..c3c610a 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -220,6 +220,12 @@ def redelivery_count(self): """ return self._message.redelivery_count() + def schema_version(self): + """ + Get the schema version for this message + """ + return self._message.schema_version() + @staticmethod def _wrap(_message): self = Message() diff --git a/schema_test.py b/schema_test.py index 09cfd71..2d03020 100755 --- a/schema_test.py +++ b/schema_test.py @@ -354,6 +354,32 @@ class Example(Record): self.assertEqual(r2.__class__.__name__, 'Example') self.assertEqual(r2, r) + def test_schema_version(self): + class Example(Record): + a = Integer() + b = Integer() + + client = pulsar.Client(self.serviceUrl) + producer = client.create_producer( + 'my-avro-python-schema-version-topic', + schema=AvroSchema(Example)) + + consumer = client.subscribe('my-avro-python-schema-version-topic', 'sub-1', + schema=AvroSchema(Example)) + + r = Example(a=1, b=2) + producer.send(r) + + msg = consumer.receive() + + self.assertIsNotNone(msg.schema_version()) + + self.assertEquals(b'\x00\x00\x00\x00\x00\x00\x00\x00', msg.schema_version().encode()) + + self.assertEqual(r, msg.value()) + + client.close() + def test_serialize_wrong_types(self): class Example(Record): a = Integer() diff --git a/src/message.cc b/src/message.cc index 719e09e..460a0c7 100644 --- a/src/message.cc +++ b/src/message.cc @@ -82,6 +82,12 @@ std::string Topic_name_str(const Message& msg) { return ss.str(); } +std::string schema_version_str(const Message& msg) { + std::stringstream ss; + ss << msg.getSchemaVersion(); + return ss.str(); +} + const MessageId& Message_getMessageId(const Message& msg) { return msg.getMessageId(); } @@ -168,6 +174,7 @@ void export_message() { .def("__str__", &Message_str) .def("topic_name", &Topic_name_str) .def("redelivery_count", &Message::getRedeliveryCount) + .def("schema_version", &schema_version_str) ; MessageBatch& (MessageBatch::*MessageBatchParseFromString)(const std::string& payload, uint32_t batchSize) = &MessageBatch::parseFrom;