Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor spin and spin_once to mimic rclcpp's Executor API #324

Merged
merged 2 commits into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions examples/minimal_pub_sub/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ path = "src/minimal_subscriber.rs"
name = "minimal_publisher"
path = "src/minimal_publisher.rs"

[[bin]]
name = "minimal_two_nodes"
path = "src/minimal_two_nodes.rs"

[[bin]]
name = "zero_copy_subscriber"
path = "src/zero_copy_subscriber.rs"
Expand Down
77 changes: 77 additions & 0 deletions examples/minimal_pub_sub/src/minimal_two_nodes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use std::env;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Mutex};

use anyhow::{Error, Result};

struct MinimalSubscriber {
num_messages: AtomicU32,
node: Arc<rclrs::Node>,
subscription: Mutex<Option<Arc<rclrs::Subscription<std_msgs::msg::String>>>>,
}

impl MinimalSubscriber {
pub fn new(name: &str, topic: &str) -> Result<Arc<Self>, rclrs::RclrsError> {
let context = rclrs::Context::new(env::args())?;
let node = rclrs::create_node(&context, name)?;
let minimal_subscriber = Arc::new(MinimalSubscriber {
num_messages: 0.into(),
node,
subscription: None.into(),
});

let minimal_subscriber_aux = Arc::clone(&minimal_subscriber);
let subscription = minimal_subscriber
.node
.create_subscription::<std_msgs::msg::String, _>(
topic,
rclrs::QOS_PROFILE_DEFAULT,
move |msg: std_msgs::msg::String| {
minimal_subscriber_aux.callback(msg);
},
)?;
*minimal_subscriber.subscription.lock().unwrap() = Some(subscription);
Ok(minimal_subscriber)
}

fn callback(&self, msg: std_msgs::msg::String) {
self.num_messages.fetch_add(1, Ordering::SeqCst);
println!("[{}] I heard: '{}'", self.node.name(), msg.data);
println!(
"[{}] (Got {} messages so far)",
self.node.name(),
self.num_messages.load(Ordering::SeqCst)
);
}
}

fn main() -> Result<(), Error> {
let publisher_context = rclrs::Context::new(env::args())?;
let publisher_node = rclrs::create_node(&publisher_context, "minimal_publisher")?;

let subscriber_node_one = MinimalSubscriber::new("minimal_subscriber_one", "topic")?;
let subscriber_node_two = MinimalSubscriber::new("minimal_subscriber_two", "topic")?;

let publisher = publisher_node
.create_publisher::<std_msgs::msg::String>("topic", rclrs::QOS_PROFILE_DEFAULT)?;

std::thread::spawn(move || -> Result<(), rclrs::RclrsError> {
let mut message = std_msgs::msg::String::default();
let mut publish_count: u32 = 1;
loop {
message.data = format!("Hello, world! {}", publish_count);
println!("Publishing: [{}]", message.data);
publisher.publish(&message)?;
publish_count += 1;
std::thread::sleep(std::time::Duration::from_millis(500));
}
});

let executor = rclrs::SingleThreadedExecutor::new();

executor.add_node(&publisher_node)?;
executor.add_node(&subscriber_node_one.node)?;
executor.add_node(&subscriber_node_two.node)?;

executor.spin().map_err(|err| err.into())
}
79 changes: 79 additions & 0 deletions rclrs/src/executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use crate::rcl_bindings::rcl_context_is_valid;
use crate::{Node, RclReturnCode, RclrsError, WaitSet};
use std::sync::{Arc, Mutex, Weak};
use std::time::Duration;

/// Single-threaded executor implementation.
pub struct SingleThreadedExecutor {
nodes_mtx: Mutex<Vec<Weak<Node>>>,
}

impl Default for SingleThreadedExecutor {
fn default() -> Self {
Self::new()
}
}

impl SingleThreadedExecutor {
/// Creates a new executor.
pub fn new() -> Self {
SingleThreadedExecutor {
nodes_mtx: Mutex::new(Vec::new()),
}
}

/// Add a node to the executor.
pub fn add_node(&self, node: &Arc<Node>) -> Result<(), RclrsError> {
{ self.nodes_mtx.lock().unwrap() }.push(Arc::downgrade(node));
Ok(())
}

/// Remove a node from the executor.
pub fn remove_node(&self, node: Arc<Node>) -> Result<(), RclrsError> {
{ self.nodes_mtx.lock().unwrap() }
.retain(|n| !n.upgrade().map(|n| Arc::ptr_eq(&n, &node)).unwrap_or(false));
Ok(())
}

/// Polls the nodes for new messages and executes the corresponding callbacks.
pub fn spin_once(&self, timeout: Option<Duration>) -> Result<(), RclrsError> {
for node in { self.nodes_mtx.lock().unwrap() }
.iter()
.filter_map(Weak::upgrade)
.filter(|node| unsafe { rcl_context_is_valid(&*node.rcl_context_mtx.lock().unwrap()) })
{
let wait_set = WaitSet::new_for_node(&node)?;
let ready_entities = wait_set.wait(timeout)?;

for ready_subscription in ready_entities.subscriptions {
ready_subscription.execute()?;
}

for ready_client in ready_entities.clients {
ready_client.execute()?;
}

for ready_service in ready_entities.services {
ready_service.execute()?;
}
}

Ok(())
}

/// Convenience function for calling [`SingleThreadedExecutor::spin_once`] in a loop.
pub fn spin(&self) -> Result<(), RclrsError> {
while !{ self.nodes_mtx.lock().unwrap() }.is_empty() {
match self.spin_once(None) {
Ok(_)
| Err(RclrsError::RclError {
code: RclReturnCode::Timeout,
..
}) => (),
error => return error,
}
}

Ok(())
}
}
42 changes: 8 additions & 34 deletions rclrs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ mod arguments;
mod client;
mod context;
mod error;
mod executor;
mod node;
mod parameter;
mod publisher;
Expand All @@ -30,11 +31,11 @@ pub use arguments::*;
pub use client::*;
pub use context::*;
pub use error::*;
pub use executor::*;
pub use node::*;
pub use parameter::*;
pub use publisher::*;
pub use qos::*;
use rcl_bindings::rcl_context_is_valid;
pub use rcl_bindings::rmw_request_id_t;
pub use service::*;
pub use subscription::*;
Expand All @@ -51,46 +52,19 @@ pub use wait::*;
///
/// [1]: crate::RclReturnCode
pub fn spin_once(node: Arc<Node>, timeout: Option<Duration>) -> Result<(), RclrsError> {
let wait_set = WaitSet::new_for_node(&node)?;
let ready_entities = wait_set.wait(timeout)?;

for ready_subscription in ready_entities.subscriptions {
ready_subscription.execute()?;
}

for ready_client in ready_entities.clients {
ready_client.execute()?;
}

for ready_service in ready_entities.services {
ready_service.execute()?;
}

Ok(())
let executor = SingleThreadedExecutor::new();
executor.add_node(&node)?;
executor.spin_once(timeout)
}

/// Convenience function for calling [`spin_once`] in a loop.
///
/// This function additionally checks that the context is still valid.
esteve marked this conversation as resolved.
Show resolved Hide resolved
pub fn spin(node: Arc<Node>) -> Result<(), RclrsError> {
// The context_is_valid functions exists only to abstract away ROS distro differences
// SAFETY: No preconditions for this function.
esteve marked this conversation as resolved.
Show resolved Hide resolved
let context_is_valid = {
let node = Arc::clone(&node);
move || unsafe { rcl_context_is_valid(&*node.rcl_context_mtx.lock().unwrap()) }
};

while context_is_valid() {
match spin_once(Arc::clone(&node), None) {
Ok(_)
| Err(RclrsError::RclError {
code: RclReturnCode::Timeout,
..
}) => (),
error => return error,
}
}
Ok(())
let executor = SingleThreadedExecutor::new();
executor.add_node(&node)?;
executor.spin()
}

/// Creates a new node in the empty namespace.
Expand Down
9 changes: 7 additions & 2 deletions rclrs/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,16 @@ impl Node {
&self,
topic: &str,
qos: QoSProfile,
) -> Result<Publisher<T>, RclrsError>
) -> Result<Arc<Publisher<T>>, RclrsError>
where
T: Message,
{
Publisher::<T>::new(Arc::clone(&self.rcl_node_mtx), topic, qos)
let publisher = Arc::new(Publisher::<T>::new(
Arc::clone(&self.rcl_node_mtx),
topic,
qos,
)?);
Ok(publisher)
}

/// Creates a [`Service`][1].
Expand Down
11 changes: 10 additions & 1 deletion rclrs/src/wait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,16 @@ impl WaitSet {
// We cannot currently guarantee that the wait sets may not share content, but it is
// mentioned in the doc comment for `add_subscription`.
// Also, the rcl_wait_set is obviously valid.
unsafe { rcl_wait(&mut self.rcl_wait_set, timeout_ns) }.ok()?;
match unsafe { rcl_wait(&mut self.rcl_wait_set, timeout_ns) }.ok() {
Ok(_) => (),
Err(error) => match error {
RclrsError::RclError { code, msg } => match code {
RclReturnCode::WaitSetEmpty => (),
_ => return Err(RclrsError::RclError { code, msg }),
},
_ => return Err(error),
},
}
let mut ready_entities = ReadyEntities {
subscriptions: Vec::new(),
clients: Vec::new(),
Expand Down
Loading