Skip to content

Commit

Permalink
[Issue 8154] [Python client] Expose schema version (of writerSchema) …
Browse files Browse the repository at this point in the history
…in Message (#8173)

* [Issue 8154] Expose schema version (of writerSchema) in python client message

* Adding formating suggestion on PR#8173 to fix tests

* Fixing build issues

* Added a test for python client returning schema version

* Added one more test case for python client returning schema version

* Fix test- move subscribe before send so the consumer offset is ahead of new data

* Fix test to make it run on python 2 and 3 both

Co-authored-by: Sijie Guo <sijie@apache.org>
  • Loading branch information
shiv4289 and sijie authored Oct 28, 2020
1 parent f2014eb commit d82cd55
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 0 deletions.
6 changes: 6 additions & 0 deletions pulsar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,12 @@ def redelivery_count(self):
"""
return self._message.redelivery_count()

def schema_version(self):
"""
Get the schema version for this message
"""
return self._message.schema_version()

@staticmethod
def _wrap(_message):
self = Message()
Expand Down
26 changes: 26 additions & 0 deletions schema_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,32 @@ class Example(Record):
self.assertEqual(r2.__class__.__name__, 'Example')
self.assertEqual(r2, r)

def test_schema_version(self):
class Example(Record):
a = Integer()
b = Integer()

client = pulsar.Client(self.serviceUrl)
producer = client.create_producer(
'my-avro-python-schema-version-topic',
schema=AvroSchema(Example))

consumer = client.subscribe('my-avro-python-schema-version-topic', 'sub-1',
schema=AvroSchema(Example))

r = Example(a=1, b=2)
producer.send(r)

msg = consumer.receive()

self.assertIsNotNone(msg.schema_version())

self.assertEquals(b'\x00\x00\x00\x00\x00\x00\x00\x00', msg.schema_version().encode())

self.assertEqual(r, msg.value())

client.close()

def test_serialize_wrong_types(self):
class Example(Record):
a = Integer()
Expand Down
7 changes: 7 additions & 0 deletions src/message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ std::string Topic_name_str(const Message& msg) {
return ss.str();
}

std::string schema_version_str(const Message& msg) {
std::stringstream ss;
ss << msg.getSchemaVersion();
return ss.str();
}

const MessageId& Message_getMessageId(const Message& msg) {
return msg.getMessageId();
}
Expand Down Expand Up @@ -168,6 +174,7 @@ void export_message() {
.def("__str__", &Message_str)
.def("topic_name", &Topic_name_str)
.def("redelivery_count", &Message::getRedeliveryCount)
.def("schema_version", &schema_version_str)
;

MessageBatch& (MessageBatch::*MessageBatchParseFromString)(const std::string& payload, uint32_t batchSize) = &MessageBatch::parseFrom;
Expand Down

0 comments on commit d82cd55

Please sign in to comment.