Skip to content

Commit

Permalink
Refactor spin and spin_once to mimic rclcpp's Executor API (ros2-rust…
Browse files Browse the repository at this point in the history
…#324)

* Added rclcpp/rclpy-like executor

* Fix comments
  • Loading branch information
esteve authored and luca-della-vedova committed Sep 13, 2023
1 parent 7e536e0 commit 2fed18e
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 40 deletions.
4 changes: 4 additions & 0 deletions examples/minimal_pub_sub/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ path = "src/minimal_subscriber.rs"
name = "minimal_publisher"
path = "src/minimal_publisher.rs"

[[bin]]
name = "minimal_two_nodes"
path = "src/minimal_two_nodes.rs"

[[bin]]
name = "zero_copy_subscriber"
path = "src/zero_copy_subscriber.rs"
Expand Down
77 changes: 77 additions & 0 deletions examples/minimal_pub_sub/src/minimal_two_nodes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use std::env;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Mutex};

use anyhow::{Error, Result};

struct MinimalSubscriber {
num_messages: AtomicU32,
node: Arc<rclrs::Node>,
subscription: Mutex<Option<Arc<rclrs::Subscription<std_msgs::msg::String>>>>,
}

impl MinimalSubscriber {
pub fn new(name: &str, topic: &str) -> Result<Arc<Self>, rclrs::RclrsError> {
let context = rclrs::Context::new(env::args())?;
let node = rclrs::create_node(&context, name)?;
let minimal_subscriber = Arc::new(MinimalSubscriber {
num_messages: 0.into(),
node,
subscription: None.into(),
});

let minimal_subscriber_aux = Arc::clone(&minimal_subscriber);
let subscription = minimal_subscriber
.node
.create_subscription::<std_msgs::msg::String, _>(
topic,
rclrs::QOS_PROFILE_DEFAULT,
move |msg: std_msgs::msg::String| {
minimal_subscriber_aux.callback(msg);
},
)?;
*minimal_subscriber.subscription.lock().unwrap() = Some(subscription);
Ok(minimal_subscriber)
}

fn callback(&self, msg: std_msgs::msg::String) {
self.num_messages.fetch_add(1, Ordering::SeqCst);
println!("[{}] I heard: '{}'", self.node.name(), msg.data);
println!(
"[{}] (Got {} messages so far)",
self.node.name(),
self.num_messages.load(Ordering::SeqCst)
);
}
}

fn main() -> Result<(), Error> {
let publisher_context = rclrs::Context::new(env::args())?;
let publisher_node = rclrs::create_node(&publisher_context, "minimal_publisher")?;

let subscriber_node_one = MinimalSubscriber::new("minimal_subscriber_one", "topic")?;
let subscriber_node_two = MinimalSubscriber::new("minimal_subscriber_two", "topic")?;

let publisher = publisher_node
.create_publisher::<std_msgs::msg::String>("topic", rclrs::QOS_PROFILE_DEFAULT)?;

std::thread::spawn(move || -> Result<(), rclrs::RclrsError> {
let mut message = std_msgs::msg::String::default();
let mut publish_count: u32 = 1;
loop {
message.data = format!("Hello, world! {}", publish_count);
println!("Publishing: [{}]", message.data);
publisher.publish(&message)?;
publish_count += 1;
std::thread::sleep(std::time::Duration::from_millis(500));
}
});

let executor = rclrs::SingleThreadedExecutor::new();

executor.add_node(&publisher_node)?;
executor.add_node(&subscriber_node_one.node)?;
executor.add_node(&subscriber_node_two.node)?;

executor.spin().map_err(|err| err.into())
}
81 changes: 81 additions & 0 deletions rclrs/src/executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use crate::rcl_bindings::rcl_context_is_valid;
use crate::{Node, RclReturnCode, RclrsError, WaitSet};
use std::sync::{Arc, Mutex, Weak};
use std::time::Duration;

/// Single-threaded executor implementation.
pub struct SingleThreadedExecutor {
nodes_mtx: Mutex<Vec<Weak<Node>>>,
}

impl Default for SingleThreadedExecutor {
fn default() -> Self {
Self::new()
}
}

impl SingleThreadedExecutor {
/// Creates a new executor.
pub fn new() -> Self {
SingleThreadedExecutor {
nodes_mtx: Mutex::new(Vec::new()),
}
}

/// Add a node to the executor.
pub fn add_node(&self, node: &Arc<Node>) -> Result<(), RclrsError> {
{ self.nodes_mtx.lock().unwrap() }.push(Arc::downgrade(node));
Ok(())
}

/// Remove a node from the executor.
pub fn remove_node(&self, node: Arc<Node>) -> Result<(), RclrsError> {
{ self.nodes_mtx.lock().unwrap() }
.retain(|n| !n.upgrade().map(|n| Arc::ptr_eq(&n, &node)).unwrap_or(false));
Ok(())
}

/// Polls the nodes for new messages and executes the corresponding callbacks.
///
/// This function additionally checks that the context is still valid.
pub fn spin_once(&self, timeout: Option<Duration>) -> Result<(), RclrsError> {
for node in { self.nodes_mtx.lock().unwrap() }
.iter()
.filter_map(Weak::upgrade)
.filter(|node| unsafe { rcl_context_is_valid(&*node.rcl_context_mtx.lock().unwrap()) })
{
let wait_set = WaitSet::new_for_node(&node)?;
let ready_entities = wait_set.wait(timeout)?;

for ready_subscription in ready_entities.subscriptions {
ready_subscription.execute()?;
}

for ready_client in ready_entities.clients {
ready_client.execute()?;
}

for ready_service in ready_entities.services {
ready_service.execute()?;
}
}

Ok(())
}

/// Convenience function for calling [`SingleThreadedExecutor::spin_once`] in a loop.
pub fn spin(&self) -> Result<(), RclrsError> {
while !{ self.nodes_mtx.lock().unwrap() }.is_empty() {
match self.spin_once(None) {
Ok(_)
| Err(RclrsError::RclError {
code: RclReturnCode::Timeout,
..
}) => (),
error => return error,
}
}

Ok(())
}
}
45 changes: 8 additions & 37 deletions rclrs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ mod client;
mod clock;
mod context;
mod error;
mod executor;
mod node;
mod parameter;
mod publisher;
Expand All @@ -34,11 +35,11 @@ pub use client::*;
pub use clock::*;
pub use context::*;
pub use error::*;
pub use executor::*;
pub use node::*;
pub use parameter::*;
pub use publisher::*;
pub use qos::*;
use rcl_bindings::rcl_context_is_valid;
pub use rcl_bindings::rmw_request_id_t;
pub use service::*;
pub use subscription::*;
Expand All @@ -57,46 +58,16 @@ pub use wait::*;
///
/// [1]: crate::RclReturnCode
pub fn spin_once(node: Arc<Node>, timeout: Option<Duration>) -> Result<(), RclrsError> {
let wait_set = WaitSet::new_for_node(&node)?;
let ready_entities = wait_set.wait(timeout)?;

for ready_subscription in ready_entities.subscriptions {
ready_subscription.execute()?;
}

for ready_client in ready_entities.clients {
ready_client.execute()?;
}

for ready_service in ready_entities.services {
ready_service.execute()?;
}

Ok(())
let executor = SingleThreadedExecutor::new();
executor.add_node(&node)?;
executor.spin_once(timeout)
}

/// Convenience function for calling [`spin_once`] in a loop.
///
/// This function additionally checks that the context is still valid.
pub fn spin(node: Arc<Node>) -> Result<(), RclrsError> {
// The context_is_valid functions exists only to abstract away ROS distro differences
// SAFETY: No preconditions for this function.
let context_is_valid = {
let node = Arc::clone(&node);
move || unsafe { rcl_context_is_valid(&*node.rcl_context_mtx.lock().unwrap()) }
};

while context_is_valid() {
match spin_once(Arc::clone(&node), None) {
Ok(_)
| Err(RclrsError::RclError {
code: RclReturnCode::Timeout,
..
}) => (),
error => return error,
}
}
Ok(())
let executor = SingleThreadedExecutor::new();
executor.add_node(&node)?;
executor.spin()
}

/// Creates a new node in the empty namespace.
Expand Down
9 changes: 7 additions & 2 deletions rclrs/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,11 +244,16 @@ impl Node {
&self,
topic: &str,
qos: QoSProfile,
) -> Result<Publisher<T>, RclrsError>
) -> Result<Arc<Publisher<T>>, RclrsError>
where
T: Message,
{
Publisher::<T>::new(Arc::clone(&self.rcl_node_mtx), topic, qos)
let publisher = Arc::new(Publisher::<T>::new(
Arc::clone(&self.rcl_node_mtx),
topic,
qos,
)?);
Ok(publisher)
}

/// Creates a [`Service`][1].
Expand Down
11 changes: 10 additions & 1 deletion rclrs/src/wait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,16 @@ impl WaitSet {
// We cannot currently guarantee that the wait sets may not share content, but it is
// mentioned in the doc comment for `add_subscription`.
// Also, the rcl_wait_set is obviously valid.
unsafe { rcl_wait(&mut self.rcl_wait_set, timeout_ns) }.ok()?;
match unsafe { rcl_wait(&mut self.rcl_wait_set, timeout_ns) }.ok() {
Ok(_) => (),
Err(error) => match error {
RclrsError::RclError { code, msg } => match code {
RclReturnCode::WaitSetEmpty => (),
_ => return Err(RclrsError::RclError { code, msg }),
},
_ => return Err(error),
},
}
let mut ready_entities = ReadyEntities {
subscriptions: Vec::new(),
clients: Vec::new(),
Expand Down

0 comments on commit 2fed18e

Please sign in to comment.