Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CapnProto Publisher and Subscriber to Python API #1089

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions lang/python/core/ecal/core/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def set_max_bandwidth_udp(self, bandwidth):


class ProtoPublisher(MessagePublisher):
"""Spezialized publisher that sends out protobuf messages
"""Specialized publisher that sends out protobuf messages
"""
def __init__(self, name, type_=None):
if type_ is not None:
Expand All @@ -117,9 +117,25 @@ def send(self, msg, time=-1):
def send_sync(self, msg, time, ack_timeout_ms):
return self.c_publisher.send_sync(msg.SerializeToString(), time, ack_timeout_ms)

class CapnPublisher(MessagePublisher):
"""Specialized publisher that sends out capnproto messages
"""
def __init__(self, name, type_=None):
if type_ is not None:
topic_type = type_._nodeProto.displayName.split("_")[1]
topic_desc = b""
super(CapnPublisher, self).__init__(name, topic_type, topic_desc)
else:
super(CapnPublisher, self).__init__(name)

def send(self, msg, time=-1):
return self.c_publisher.send(msg.to_bytes(), time)

def send_sync(self, msg, time, ack_timeout_ms):
return self.c_publisher.send_sync(msg.to_bytes(), time, ack_timeout_ms)

class StringPublisher(MessagePublisher):
"""Spezialized publisher that sends out plain strings
"""Specialized publisher that sends out plain strings
"""
def __init__(self, name):
topic_type = "base:std::string"
Expand All @@ -132,7 +148,6 @@ def send(self, msg, time=-1):
def send_sync(self, msg, time, ack_timeout_ms):
return self.c_publisher.send_sync(msg.encode(), time, ack_timeout_ms)


if __name__ == '__main__':
"""Test the publisher API
"""
Expand Down
111 changes: 109 additions & 2 deletions lang/python/core/ecal/core/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def rem_callback(self, callback):


class ProtoSubscriber(MessageSubscriber):
"""Spezialized subscriber that subscribes to protobuf messages
"""Specialized subscriber that subscribes to protobuf messages
"""
def __init__(self, name, type_protobuf):
""" receive subscriber content with timeout
Expand Down Expand Up @@ -131,9 +131,116 @@ def _on_receive(self, topic_name, msg, time):
proto_message.ParseFromString(msg)
self.callback(topic_name, proto_message, time)

class ProtoSubscriber(MessageSubscriber):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are now two ProtoSubscriber classes. Can you please remove one of them?

"""Specialized subscriber that subscribes to protobuf messages
"""
def __init__(self, name, type_protobuf):
""" receive subscriber content with timeout

:param name: name on which the subscriber listens to traffic
:type name: string
:param type_protobuf: type of the protobuf object, which the subscriber will receive

"""
topic_type = "proto:" + type_protobuf.DESCRIPTOR.full_name
#topic_desc = pb_helper.get_descriptor_from_type(type_protobuf)
# ProtoSubscriber only takes two arguments, check about subscriber
super(ProtoSubscriber, self).__init__(name, topic_type)
self.protobuf_type = type_protobuf
self.callback = None

def receive(self, timeout=0):
""" receive subscriber content with timeout

:param timeout: receive timeout in ms
:type timeout: int

"""
ret, msg, time = self.c_subscriber.receive(timeout)
proto_message = self.protobuf_type()
if ret > 0:
# parse message content from 'msg'
proto_message.ParseFromString(msg)
return ret, proto_message, time

def set_callback(self, callback):
""" set callback function for incoming messages

:param callback: python callback function (f(topic_name, msg, time))

"""
self.callback = callback
self.c_subscriber.set_callback(self._on_receive)

def rem_callback(self, callback):
""" remove callback function for incoming messages

:param callback: python callback function (f(topic_name, msg, time))

"""
self.c_subscriber.rem_callback(self._on_receive)
self.callback = None

def _on_receive(self, topic_name, msg, time):
proto_message = self.protobuf_type()
proto_message.ParseFromString(msg)
self.callback(topic_name, proto_message, time)

class CapnSubscriber(MessageSubscriber):
"""Specialized subscriber that subscribes to capnproto messages
"""
def __init__(self, name, capn_message_type):
""" receive subscriber content with timeout

:param name: name on which the subscriber listens to traffic
:type name: string
:param capn_message_type: type of the protobuf object, which the subscriber will receive

"""
topic_type = capn_message_type._nodeProto.displayName.split("_")[1]
super(CapnSubscriber, self).__init__(name, topic_type)
self.message_type = capn_message_type
self.callback = None

def receive(self, timeout=0):
""" receive subscriber content with timeout

:param timeout: receive timeout in ms
:type timeout: int

"""
ret, msg, time = self.c_subscriber.receive(timeout)
if ret > 0:
# parse message content from 'msg'
with self.message_type.from_bytes(msg) as capn_message:
return ret, capn_message, time
else:
return ret, self.message_type, time

def set_callback(self, callback):
""" set callback function for incoming messages

:param callback: python callback function (f(topic_name, msg, time))

"""
self.callback = callback
self.c_subscriber.set_callback(self._on_receive)

def rem_callback(self, callback):
""" remove callback function for incoming messages

:param callback: python callback function (f(topic_name, msg, time))

"""
self.c_subscriber.rem_callback(self._on_receive)
self.callback = None

def _on_receive(self, topic_name, msg, time):
with self.message_type.from_bytes(msg) as capn_message:
self.callback(topic_name, capn_message, time)

class StringSubscriber(MessageSubscriber):
"""Spezialized publisher subscribes to plain strings
"""Specialized publisher subscribes to plain strings
"""
def __init__(self, name):
topic_type = "base:std::string"
Expand Down
23 changes: 23 additions & 0 deletions samples/python/capnp/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
## Capnp Publisher and Subscriber Example

Minimal Publisher - publishes an example Imu message
```
python3 pub.py
```

Minimal Subscriber - subscribes to the Imu message
```
python3 sub.py
```

Capnp Message Struct

```
struct Imu {
timestamp @0: Int64;
accel @1: List(Float32);
gyro @2: List(Float32);
}
```


8 changes: 8 additions & 0 deletions samples/python/capnp/messages.capnp
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
@0xb6249c5df1f7e512;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you use the same message like is used for the c++ samples? this way we can communicate the samples with python send / c++ receive and vice versa.


struct Imu {
timestamp @0: Int64;
accel @1: List(Float32);
gyro @2: List(Float32);
}

32 changes: 32 additions & 0 deletions samples/python/capnp/pub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import sys
import time
import capnp
import random
import time
import messages_capnp as msgs
import ecal.core.core as ecal_core
from ecal.core.publisher import StringPublisher, MessagePublisher, CapnPublisher

if __name__ == "__main__":
ecal_core.initialize(sys.argv, "CapnProto Message Publisher")

# Create a CapnProto Publisher that publishes on the topic "sensor_imu"
pub = CapnPublisher("sensor_imu", msgs.Imu)

# Infinite loop (using ecal_core.ok() will enable us to gracefully shutdown
# the process from another application)
while ecal_core.ok():
# Create a capnproto Imu message, populate it with random values and publish it to the topic
imu_msg = msgs.Imu.new_message()
imu_msg.timestamp = time.monotonic_ns()
imu_msg.accel = [round(random.uniform(2.0, 20.0), 2), round(random.uniform(2.0, 20.0), 2), round(random.uniform(2.0, 20.0), 2)]
imu_msg.gyro = [round(random.uniform(0.0, 4.3), 2), round(random.uniform(0.0, 4.3), 2), round(random.uniform(0.0, 4.3), 2)]

print("Sending: {}".format(imu_msg))
pub.send(imu_msg)

# Sleep 500 ms
time.sleep(0.5)

# finalize eCAL API
ecal_core.finalize()
28 changes: 28 additions & 0 deletions samples/python/capnp/sub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import sys
import time
import capnp
import messages_capnp as msgs
import ecal.core.core as ecal_core
from ecal.core.subscriber import CapnSubscriber

# callback function for the subscriber
def callback(topic_name, msg, time):
# print the received message
print(msg)

if __name__ == "__main__":
# Initialize eCAL
ecal_core.initialize(sys.argv, "CapnProto Message Subscriber")

# Create a subscriber that listenes on the "sensor_imu" topic
sub = CapnSubscriber("sensor_imu", msgs.Imu)

# Set the callback
sub.set_callback(callback)

# infinite loop that keeps the script alive
while ecal_core.ok():
time.sleep(0.5)

# finalize eCAL API
ecal_core.finalize()