From ed3a391518f9545e6de8ce296f45592722a11ac6 Mon Sep 17 00:00:00 2001 From: nnmm Date: Thu, 14 Jul 2022 14:59:38 +0200 Subject: [PATCH 1/4] Always add subscription/client/service to the node's list of waitables --- rclrs/src/node.rs | 24 +++++------------------- rclrs/src/node/client.rs | 9 +++++---- rclrs/src/node/service.rs | 10 ++++++---- rclrs/src/node/subscription.rs | 15 ++++++++------- 4 files changed, 24 insertions(+), 34 deletions(-) diff --git a/rclrs/src/node.rs b/rclrs/src/node.rs index 39dc23c1b..eac72ce8a 100644 --- a/rclrs/src/node.rs +++ b/rclrs/src/node.rs @@ -184,17 +184,11 @@ impl Node { /// /// [1]: crate::Client // TODO: make client's lifetime depend on node's lifetime - pub fn create_client( - &mut self, - topic: &str, - ) -> Result>, RclrsError> + pub fn create_client(&mut self, topic: &str) -> Result>, RclrsError> where T: rosidl_runtime_rs::Service, { - let client = Arc::new(crate::node::client::Client::::new(self, topic)?); - self.clients - .push(Arc::downgrade(&client) as Weak); - Ok(client) + Client::::new(self, topic) } /// Creates a [`Publisher`][1]. @@ -220,17 +214,12 @@ impl Node { &mut self, topic: &str, callback: F, - ) -> Result>, RclrsError> + ) -> Result>, RclrsError> where T: rosidl_runtime_rs::Service, F: Fn(&rmw_request_id_t, T::Request) -> T::Response + 'static + Send, { - let service = Arc::new(crate::node::service::Service::::new( - self, topic, callback, - )?); - self.services - .push(Arc::downgrade(&service) as Weak); - Ok(service) + Service::::new(self, topic, callback) } /// Creates a [`Subscription`][1]. @@ -247,10 +236,7 @@ impl Node { T: Message, F: FnMut(T) + 'static + Send, { - let subscription = Arc::new(Subscription::::new(self, topic, qos, callback)?); - self.subscriptions - .push(Arc::downgrade(&subscription) as Weak); - Ok(subscription) + Subscription::new(self, topic, qos, callback) } /// Returns the subscriptions that have not been dropped yet. diff --git a/rclrs/src/node/client.rs b/rclrs/src/node/client.rs index 179730521..d655cfc61 100644 --- a/rclrs/src/node/client.rs +++ b/rclrs/src/node/client.rs @@ -70,7 +70,7 @@ where T: rosidl_runtime_rs::Service, { /// Creates a new client. - pub fn new(node: &Node, topic: &str) -> Result + pub fn new(node: &mut Node, topic: &str) -> Result, RclrsError> where T: rosidl_runtime_rs::Service, { @@ -107,14 +107,15 @@ where rcl_node_mtx: node.rcl_node_mtx.clone(), in_use_by_wait_set: Arc::new(AtomicBool::new(false)), }); - - Ok(Self { + let client = Arc::new(Self { handle, requests: Mutex::new(HashMap::new()), futures: Arc::new(Mutex::new( HashMap::>::new(), )), - }) + }); + node.clients.push(Arc::downgrade(&client) as _); + Ok(client) } /// Sends a request with a callback to be called with the response. diff --git a/rclrs/src/node/service.rs b/rclrs/src/node/service.rs index 968e39027..caae47aba 100644 --- a/rclrs/src/node/service.rs +++ b/rclrs/src/node/service.rs @@ -69,7 +69,7 @@ where T: rosidl_runtime_rs::Service, { /// Creates a new service. - pub fn new(node: &Node, topic: &str, callback: F) -> Result + pub fn new(node: &mut Node, topic: &str, callback: F) -> Result, RclrsError> where T: rosidl_runtime_rs::Service, F: Fn(&rmw_request_id_t, T::Request) -> T::Response + 'static + Send, @@ -107,11 +107,13 @@ where node_handle: node.rcl_node_mtx.clone(), in_use_by_wait_set: Arc::new(AtomicBool::new(false)), }); - - Ok(Self { + let service = Arc::new(Self { handle, callback: Mutex::new(Box::new(callback)), - }) + }); + node.services.push(Arc::downgrade(&service) as _); + + Ok(service) } /// Fetches a new request. diff --git a/rclrs/src/node/subscription.rs b/rclrs/src/node/subscription.rs index 6a4af7902..583732aba 100644 --- a/rclrs/src/node/subscription.rs +++ b/rclrs/src/node/subscription.rs @@ -77,11 +77,11 @@ where { /// Creates a new subscription. pub fn new( - node: &Node, + node: &mut Node, topic: &str, qos: QoSProfile, callback: F, - ) -> Result + ) -> Result, RclrsError> where T: Message, F: FnMut(T) + 'static + Send, @@ -120,12 +120,13 @@ where rcl_node_mtx: node.rcl_node_mtx.clone(), in_use_by_wait_set: Arc::new(AtomicBool::new(false)), }); - - Ok(Self { + let subscription = Arc::new(Self { handle, callback: Mutex::new(Box::new(callback)), message: PhantomData, - }) + }); + node.subscriptions.push(Arc::downgrade(&subscription) as _); + Ok(subscription) } /// Returns the topic name of the subscription. @@ -219,9 +220,9 @@ mod tests { fn test_instantiate_subscriber() -> Result<(), RclrsError> { let context = Context::new(vec![]).expect("Context instantiation is expected to be a success"); - let node = create_node(&context, "test_new_subscriber")?; + let mut node = create_node(&context, "test_new_subscriber")?; let _subscriber = Subscription::::new( - &node, + &mut node, "test", QOS_PROFILE_DEFAULT, move |_: std_msgs::msg::String| {}, From c5001db85176546cd284d60dc5f2fb64ee918d93 Mon Sep 17 00:00:00 2001 From: Esteve Fernandez Date: Thu, 21 Jul 2022 11:45:32 +0200 Subject: [PATCH 2/4] Revert changes. Declare waitables new function with pub(crate) visibility --- rclrs/src/node.rs | 24 +++++++++++++++++++----- rclrs/src/node/client.rs | 9 ++++----- rclrs/src/node/service.rs | 10 ++++------ rclrs/src/node/subscription.rs | 18 ++++++++---------- 4 files changed, 35 insertions(+), 26 deletions(-) diff --git a/rclrs/src/node.rs b/rclrs/src/node.rs index eac72ce8a..39dc23c1b 100644 --- a/rclrs/src/node.rs +++ b/rclrs/src/node.rs @@ -184,11 +184,17 @@ impl Node { /// /// [1]: crate::Client // TODO: make client's lifetime depend on node's lifetime - pub fn create_client(&mut self, topic: &str) -> Result>, RclrsError> + pub fn create_client( + &mut self, + topic: &str, + ) -> Result>, RclrsError> where T: rosidl_runtime_rs::Service, { - Client::::new(self, topic) + let client = Arc::new(crate::node::client::Client::::new(self, topic)?); + self.clients + .push(Arc::downgrade(&client) as Weak); + Ok(client) } /// Creates a [`Publisher`][1]. @@ -214,12 +220,17 @@ impl Node { &mut self, topic: &str, callback: F, - ) -> Result>, RclrsError> + ) -> Result>, RclrsError> where T: rosidl_runtime_rs::Service, F: Fn(&rmw_request_id_t, T::Request) -> T::Response + 'static + Send, { - Service::::new(self, topic, callback) + let service = Arc::new(crate::node::service::Service::::new( + self, topic, callback, + )?); + self.services + .push(Arc::downgrade(&service) as Weak); + Ok(service) } /// Creates a [`Subscription`][1]. @@ -236,7 +247,10 @@ impl Node { T: Message, F: FnMut(T) + 'static + Send, { - Subscription::new(self, topic, qos, callback) + let subscription = Arc::new(Subscription::::new(self, topic, qos, callback)?); + self.subscriptions + .push(Arc::downgrade(&subscription) as Weak); + Ok(subscription) } /// Returns the subscriptions that have not been dropped yet. diff --git a/rclrs/src/node/client.rs b/rclrs/src/node/client.rs index d655cfc61..13d5c5232 100644 --- a/rclrs/src/node/client.rs +++ b/rclrs/src/node/client.rs @@ -70,7 +70,7 @@ where T: rosidl_runtime_rs::Service, { /// Creates a new client. - pub fn new(node: &mut Node, topic: &str) -> Result, RclrsError> + pub(crate) fn new(node: &Node, topic: &str) -> Result where T: rosidl_runtime_rs::Service, { @@ -107,15 +107,14 @@ where rcl_node_mtx: node.rcl_node_mtx.clone(), in_use_by_wait_set: Arc::new(AtomicBool::new(false)), }); - let client = Arc::new(Self { + + Ok(Self { handle, requests: Mutex::new(HashMap::new()), futures: Arc::new(Mutex::new( HashMap::>::new(), )), - }); - node.clients.push(Arc::downgrade(&client) as _); - Ok(client) + }) } /// Sends a request with a callback to be called with the response. diff --git a/rclrs/src/node/service.rs b/rclrs/src/node/service.rs index caae47aba..118c00a7a 100644 --- a/rclrs/src/node/service.rs +++ b/rclrs/src/node/service.rs @@ -69,7 +69,7 @@ where T: rosidl_runtime_rs::Service, { /// Creates a new service. - pub fn new(node: &mut Node, topic: &str, callback: F) -> Result, RclrsError> + pub(crate) fn new(node: &Node, topic: &str, callback: F) -> Result where T: rosidl_runtime_rs::Service, F: Fn(&rmw_request_id_t, T::Request) -> T::Response + 'static + Send, @@ -107,13 +107,11 @@ where node_handle: node.rcl_node_mtx.clone(), in_use_by_wait_set: Arc::new(AtomicBool::new(false)), }); - let service = Arc::new(Self { + + Ok(Self { handle, callback: Mutex::new(Box::new(callback)), - }); - node.services.push(Arc::downgrade(&service) as _); - - Ok(service) + }) } /// Fetches a new request. diff --git a/rclrs/src/node/subscription.rs b/rclrs/src/node/subscription.rs index 583732aba..87848a949 100644 --- a/rclrs/src/node/subscription.rs +++ b/rclrs/src/node/subscription.rs @@ -76,12 +76,12 @@ where T: Message, { /// Creates a new subscription. - pub fn new( - node: &mut Node, + pub(crate) fn new( + node: &Node, topic: &str, qos: QoSProfile, callback: F, - ) -> Result, RclrsError> + ) -> Result where T: Message, F: FnMut(T) + 'static + Send, @@ -120,13 +120,12 @@ where rcl_node_mtx: node.rcl_node_mtx.clone(), in_use_by_wait_set: Arc::new(AtomicBool::new(false)), }); - let subscription = Arc::new(Self { + + Ok(Self { handle, callback: Mutex::new(Box::new(callback)), message: PhantomData, - }); - node.subscriptions.push(Arc::downgrade(&subscription) as _); - Ok(subscription) + }) } /// Returns the topic name of the subscription. @@ -220,9 +219,8 @@ mod tests { fn test_instantiate_subscriber() -> Result<(), RclrsError> { let context = Context::new(vec![]).expect("Context instantiation is expected to be a success"); - let mut node = create_node(&context, "test_new_subscriber")?; - let _subscriber = Subscription::::new( - &mut node, + let node = create_node(&context, "test_new_subscriber")?; + let _subscriber = node.create_subscription::( "test", QOS_PROFILE_DEFAULT, move |_: std_msgs::msg::String| {}, From 1ced628b333fe5b9c388b91a069cb7d12a0d27cf Mon Sep 17 00:00:00 2001 From: Esteve Fernandez Date: Thu, 21 Jul 2022 15:56:07 +0200 Subject: [PATCH 3/4] Document why ::new needs pub(crate) visibility --- rclrs/src/node/client.rs | 5 +++++ rclrs/src/node/service.rs | 5 +++++ rclrs/src/node/subscription.rs | 5 +++++ 3 files changed, 15 insertions(+) diff --git a/rclrs/src/node/client.rs b/rclrs/src/node/client.rs index 13d5c5232..b8c46d2e0 100644 --- a/rclrs/src/node/client.rs +++ b/rclrs/src/node/client.rs @@ -56,6 +56,9 @@ type RequestValue = Box; type RequestId = i64; /// Main class responsible for sending requests to a ROS service. +/// +/// The only available way to instantiate clients is via [`Node::create_client`], this is to +/// ensure that [`Node`]s can track all the clients that have been created. pub struct Client where T: rosidl_runtime_rs::Service, @@ -71,6 +74,8 @@ where { /// Creates a new client. pub(crate) fn new(node: &Node, topic: &str) -> Result + // This uses pub(crate) visibility to avoid instantiating this struct outside + // [`Node::create_client`], see the struct's documentation for the rationale where T: rosidl_runtime_rs::Service, { diff --git a/rclrs/src/node/service.rs b/rclrs/src/node/service.rs index 118c00a7a..5603149dd 100644 --- a/rclrs/src/node/service.rs +++ b/rclrs/src/node/service.rs @@ -55,6 +55,9 @@ type ServiceCallback = Box Response + 'static + Send>; /// Main class responsible for responding to requests sent by ROS clients. +/// +/// The only available way to instantiate services is via [`Node::create_service`], this is to +/// ensure that [`Node`]s can track all the services that have been created. pub struct Service where T: rosidl_runtime_rs::Service, @@ -70,6 +73,8 @@ where { /// Creates a new service. pub(crate) fn new(node: &Node, topic: &str, callback: F) -> Result + // This uses pub(crate) visibility to avoid instantiating this struct outside + // [`Node::create_service`], see the struct's documentation for the rationale where T: rosidl_runtime_rs::Service, F: Fn(&rmw_request_id_t, T::Request) -> T::Response + 'static + Send, diff --git a/rclrs/src/node/subscription.rs b/rclrs/src/node/subscription.rs index 87848a949..ac80ae5be 100644 --- a/rclrs/src/node/subscription.rs +++ b/rclrs/src/node/subscription.rs @@ -59,6 +59,9 @@ pub trait SubscriptionBase: Send + Sync { /// When a subscription is created, it may take some time to get "matched" with a corresponding /// publisher. /// +/// The only available way to instantiate subscriptions is via [`Node::create_subscription`], this +/// is to ensure that [`Node`]s can track all the subscriptions that have been created. +/// /// [1]: crate::spin_once /// [2]: crate::spin pub struct Subscription @@ -82,6 +85,8 @@ where qos: QoSProfile, callback: F, ) -> Result + // This uses pub(crate) visibility to avoid instantiating this struct outside + // [`Node::create_subscription`], see the struct's documentation for the rationale where T: Message, F: FnMut(T) + 'static + Send, From 53c7ffb225a2e92a66caaff465b2ee5df2b7a2d6 Mon Sep 17 00:00:00 2001 From: Esteve Fernandez Date: Thu, 21 Jul 2022 16:38:33 +0200 Subject: [PATCH 4/4] Fix clippy issues --- rclrs/src/node/subscription.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rclrs/src/node/subscription.rs b/rclrs/src/node/subscription.rs index ac80ae5be..fa27302b8 100644 --- a/rclrs/src/node/subscription.rs +++ b/rclrs/src/node/subscription.rs @@ -218,13 +218,13 @@ where #[cfg(test)] mod tests { use super::*; - use crate::{create_node, Context, Subscription, QOS_PROFILE_DEFAULT}; + use crate::{create_node, Context, QOS_PROFILE_DEFAULT}; #[test] fn test_instantiate_subscriber() -> Result<(), RclrsError> { let context = Context::new(vec![]).expect("Context instantiation is expected to be a success"); - let node = create_node(&context, "test_new_subscriber")?; + let mut node = create_node(&context, "test_new_subscriber")?; let _subscriber = node.create_subscription::( "test", QOS_PROFILE_DEFAULT,