From 674a53d00b08153c2f7fdf3f72bb27bcc8d378b1 Mon Sep 17 00:00:00 2001 From: crux Date: Thu, 4 May 2023 01:31:33 +0530 Subject: [PATCH 1/4] Add CapnPublisher to python binding --- lang/python/core/ecal/core/publisher.py | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/lang/python/core/ecal/core/publisher.py b/lang/python/core/ecal/core/publisher.py index ad5c535d4f..fd37f8eff9 100644 --- a/lang/python/core/ecal/core/publisher.py +++ b/lang/python/core/ecal/core/publisher.py @@ -101,7 +101,7 @@ def set_max_bandwidth_udp(self, bandwidth): class ProtoPublisher(MessagePublisher): - """Spezialized publisher that sends out protobuf messages + """Specialized publisher that sends out protobuf messages """ def __init__(self, name, type_=None): if type_ is not None: @@ -117,9 +117,25 @@ def send(self, msg, time=-1): def send_sync(self, msg, time, ack_timeout_ms): return self.c_publisher.send_sync(msg.SerializeToString(), time, ack_timeout_ms) +class CapnPublisher(MessagePublisher): + """Specialized publisher that sends out capnproto messages + """ + def __init__(self, name, type_=None): + if type_ is not None: + topic_type = type_._nodeProto.displayName + topic_desc = b"" + super(CapnPublisher, self).__init__(name, topic_type, topic_desc) + else: + super(CapnPublisher, self).__init__(name) + + def send(self, msg, time=-1): + return self.c_publisher.send(msg.to_bytes(), time) + + def send_sync(self, msg, time, ack_timeout_ms): + return self.c_publisher.send_sync(msg.to_bytes(), time, ack_timeout_ms) class StringPublisher(MessagePublisher): - """Spezialized publisher that sends out plain strings + """Specialized publisher that sends out plain strings """ def __init__(self, name): topic_type = "base:std::string" @@ -132,7 +148,6 @@ def send(self, msg, time=-1): def send_sync(self, msg, time, ack_timeout_ms): return self.c_publisher.send_sync(msg.encode(), time, ack_timeout_ms) - if __name__ == '__main__': """Test the publisher API """ From b6928a006079058c36d289b8690746daa4e5bce0 Mon Sep 17 00:00:00 2001 From: crux Date: Thu, 4 May 2023 02:00:04 +0530 Subject: [PATCH 2/4] Add CapnSubsriber to python binding --- lang/python/core/ecal/core/subscriber.py | 111 ++++++++++++++++++++++- 1 file changed, 109 insertions(+), 2 deletions(-) diff --git a/lang/python/core/ecal/core/subscriber.py b/lang/python/core/ecal/core/subscriber.py index 1fa0bb8a08..1d88825ca5 100644 --- a/lang/python/core/ecal/core/subscriber.py +++ b/lang/python/core/ecal/core/subscriber.py @@ -77,7 +77,7 @@ def rem_callback(self, callback): class ProtoSubscriber(MessageSubscriber): - """Spezialized subscriber that subscribes to protobuf messages + """Specialized subscriber that subscribes to protobuf messages """ def __init__(self, name, type_protobuf): """ receive subscriber content with timeout @@ -131,9 +131,116 @@ def _on_receive(self, topic_name, msg, time): proto_message.ParseFromString(msg) self.callback(topic_name, proto_message, time) +class ProtoSubscriber(MessageSubscriber): + """Specialized subscriber that subscribes to protobuf messages + """ + def __init__(self, name, type_protobuf): + """ receive subscriber content with timeout + + :param name: name on which the subscriber listens to traffic + :type name: string + :param type_protobuf: type of the protobuf object, which the subscriber will receive + + """ + topic_type = "proto:" + type_protobuf.DESCRIPTOR.full_name + #topic_desc = pb_helper.get_descriptor_from_type(type_protobuf) + # ProtoSubscriber only takes two arguments, check about subscriber + super(ProtoSubscriber, self).__init__(name, topic_type) + self.protobuf_type = type_protobuf + self.callback = None + + def receive(self, timeout=0): + """ receive subscriber content with timeout + + :param timeout: receive timeout in ms + :type timeout: int + + """ + ret, msg, time = self.c_subscriber.receive(timeout) + proto_message = self.protobuf_type() + if ret > 0: + # parse message content from 'msg' + proto_message.ParseFromString(msg) + return ret, proto_message, time + + def set_callback(self, callback): + """ set callback function for incoming messages + + :param callback: python callback function (f(topic_name, msg, time)) + + """ + self.callback = callback + self.c_subscriber.set_callback(self._on_receive) + + def rem_callback(self, callback): + """ remove callback function for incoming messages + + :param callback: python callback function (f(topic_name, msg, time)) + + """ + self.c_subscriber.rem_callback(self._on_receive) + self.callback = None + + def _on_receive(self, topic_name, msg, time): + proto_message = self.protobuf_type() + proto_message.ParseFromString(msg) + self.callback(topic_name, proto_message, time) + +class CapnSubscriber(MessageSubscriber): + """Specialized subscriber that subscribes to capnproto messages + """ + def __init__(self, name, capn_message_type): + """ receive subscriber content with timeout + + :param name: name on which the subscriber listens to traffic + :type name: string + :param capn_message_type: type of the protobuf object, which the subscriber will receive + + """ + topic_type = capn_message_type._nodeProto.displayName + super(CapnSubscriber, self).__init__(name, topic_type) + self.message_type = capn_message_type + self.callback = None + + def receive(self, timeout=0): + """ receive subscriber content with timeout + + :param timeout: receive timeout in ms + :type timeout: int + + """ + ret, msg, time = self.c_subscriber.receive(timeout) + if ret > 0: + # parse message content from 'msg' + with self.message_type.from_bytes(msg) as capn_message: + return ret, capn_message, time + else: + return ret, self.message_type, time + + def set_callback(self, callback): + """ set callback function for incoming messages + + :param callback: python callback function (f(topic_name, msg, time)) + + """ + self.callback = callback + self.c_subscriber.set_callback(self._on_receive) + + def rem_callback(self, callback): + """ remove callback function for incoming messages + + :param callback: python callback function (f(topic_name, msg, time)) + + """ + self.c_subscriber.rem_callback(self._on_receive) + self.callback = None + + def _on_receive(self, topic_name, msg, time): + with self.message_type.from_bytes(msg) as capn_message: + self.callback(topic_name, capn_message, time) class StringSubscriber(MessageSubscriber): - """Spezialized publisher subscribes to plain strings + """Specialized publisher subscribes to plain strings """ def __init__(self, name): topic_type = "base:std::string" From 0849ff734c69ae90e5e99b39f034dacba281cbfc Mon Sep 17 00:00:00 2001 From: crux Date: Thu, 4 May 2023 21:42:22 +0530 Subject: [PATCH 3/4] Fix : make topic type consistent with C++ --- lang/python/core/ecal/core/publisher.py | 2 +- lang/python/core/ecal/core/subscriber.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lang/python/core/ecal/core/publisher.py b/lang/python/core/ecal/core/publisher.py index fd37f8eff9..8e49a765cd 100644 --- a/lang/python/core/ecal/core/publisher.py +++ b/lang/python/core/ecal/core/publisher.py @@ -122,7 +122,7 @@ class CapnPublisher(MessagePublisher): """ def __init__(self, name, type_=None): if type_ is not None: - topic_type = type_._nodeProto.displayName + topic_type = type_._nodeProto.displayName.split("_")[1] topic_desc = b"" super(CapnPublisher, self).__init__(name, topic_type, topic_desc) else: diff --git a/lang/python/core/ecal/core/subscriber.py b/lang/python/core/ecal/core/subscriber.py index 1d88825ca5..ae1281651e 100644 --- a/lang/python/core/ecal/core/subscriber.py +++ b/lang/python/core/ecal/core/subscriber.py @@ -197,7 +197,7 @@ def __init__(self, name, capn_message_type): :param capn_message_type: type of the protobuf object, which the subscriber will receive """ - topic_type = capn_message_type._nodeProto.displayName + topic_type = capn_message_type._nodeProto.displayName("_")[0] super(CapnSubscriber, self).__init__(name, topic_type) self.message_type = capn_message_type self.callback = None From 68744bd44a09785c189b4410c9ea4e7972135413 Mon Sep 17 00:00:00 2001 From: crux Date: Sun, 7 May 2023 15:34:18 +0530 Subject: [PATCH 4/4] Add capnp python examples --- samples/python/capnp/README.md | 19 +++++++++++++++++ samples/python/capnp/messages.capnp | 8 ++++++++ samples/python/capnp/pub.py | 32 +++++++++++++++++++++++++++++ samples/python/capnp/sub.py | 28 +++++++++++++++++++++++++ 4 files changed, 87 insertions(+) create mode 100644 samples/python/capnp/README.md create mode 100644 samples/python/capnp/messages.capnp create mode 100644 samples/python/capnp/pub.py create mode 100644 samples/python/capnp/sub.py diff --git a/samples/python/capnp/README.md b/samples/python/capnp/README.md new file mode 100644 index 0000000000..a5cece1f7d --- /dev/null +++ b/samples/python/capnp/README.md @@ -0,0 +1,19 @@ +## Capnp Publisher and Subscriber Example + +Minimal Publisher - publishes an example Imu message +`python3 pub.py` + +Minimal Subscriber - subscribes to the Imu message +`python3 sub.py` + +Capnp Message Struct + +``` +struct Imu { + timestamp @0: Int64; + accel @1: List(Float32); + gyro @2: List(Float32); +} +``` + + diff --git a/samples/python/capnp/messages.capnp b/samples/python/capnp/messages.capnp new file mode 100644 index 0000000000..35f711cf10 --- /dev/null +++ b/samples/python/capnp/messages.capnp @@ -0,0 +1,8 @@ +@0xb6249c5df1f7e512; + +struct Imu { + timestamp @0: Int64; + accel @1: List(Float32); + gyro @2: List(Float32); +} + diff --git a/samples/python/capnp/pub.py b/samples/python/capnp/pub.py new file mode 100644 index 0000000000..84068a97f2 --- /dev/null +++ b/samples/python/capnp/pub.py @@ -0,0 +1,32 @@ +import sys +import time +import capnp +import random +import time +import messages_capnp as msgs +import ecal.core.core as ecal_core +from ecal.core.publisher import StringPublisher, MessagePublisher, CapnPublisher + +if __name__ == "__main__": + ecal_core.initialize(sys.argv, "CapnProto Message Publisher") + + # Create a CapnProto Publisher that publishes on the topic "sensor_imu" + pub = CapnPublisher("sensor_imu", msgs.Imu) + + # Infinite loop (using ecal_core.ok() will enable us to gracefully shutdown + # the process from another application) + while ecal_core.ok(): + # Create a capnproto Imu message, populate it with random values and publish it to the topic + imu_msg = msgs.Imu.new_message() + imu_msg.timestamp = time.monotonic_ns() + imu_msg.accel = [round(random.uniform(2.0, 20.0), 2), round(random.uniform(2.0, 20.0), 2), round(random.uniform(2.0, 20.0), 2)] + imu_msg.gyro = [round(random.uniform(0.0, 4.3), 2), round(random.uniform(0.0, 4.3), 2), round(random.uniform(0.0, 4.3), 2)] + + print("Sending: {}".format(imu_msg)) + pub.send(imu_msg) + + # Sleep 500 ms + time.sleep(0.5) + + # finalize eCAL API + ecal_core.finalize() diff --git a/samples/python/capnp/sub.py b/samples/python/capnp/sub.py new file mode 100644 index 0000000000..355ac71250 --- /dev/null +++ b/samples/python/capnp/sub.py @@ -0,0 +1,28 @@ +import sys +import time +import capnp +import messages_capnp as msgs +import ecal.core.core as ecal_core +from ecal.core.subscriber import MessageSubscriber, CapnSubscriber + +# callback function for the subscriber +def callback(topic_name, msg, time): + # print the received message + print(msg) + +if __name__ == "__main__": + # Initialize eCAL + ecal_core.initialize(sys.argv, "CapnProto Message Subscriber") + + # Create a subscriber that listenes on the "sensor_imu" topic + sub = CapnSubscriber("sensor_imu", msgs.Imu) + + # Set the callback + sub.set_callback(callback) + + # infinite loop that keeps the script alive + while ecal_core.ok(): + time.sleep(0.5) + + # finalize eCAL API + ecal_core.finalize()