Skip to content

Commit eda4dad

Browse files
committed
Always add subscription/client/service to the node's list of waitables
1 parent a635b0b commit eda4dad

File tree

4 files changed

+24
-34
lines changed

4 files changed

+24
-34
lines changed

rclrs/src/node.rs

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -184,17 +184,11 @@ impl Node {
184184
///
185185
/// [1]: crate::Client
186186
// TODO: make client's lifetime depend on node's lifetime
187-
pub fn create_client<T>(
188-
&mut self,
189-
topic: &str,
190-
) -> Result<Arc<crate::node::client::Client<T>>, RclrsError>
187+
pub fn create_client<T>(&mut self, topic: &str) -> Result<Arc<Client<T>>, RclrsError>
191188
where
192189
T: rosidl_runtime_rs::Service,
193190
{
194-
let client = Arc::new(crate::node::client::Client::<T>::new(self, topic)?);
195-
self.clients
196-
.push(Arc::downgrade(&client) as Weak<dyn ClientBase>);
197-
Ok(client)
191+
Client::<T>::new(self, topic)
198192
}
199193

200194
/// Creates a [`Publisher`][1].
@@ -220,17 +214,12 @@ impl Node {
220214
&mut self,
221215
topic: &str,
222216
callback: F,
223-
) -> Result<Arc<crate::node::service::Service<T>>, RclrsError>
217+
) -> Result<Arc<Service<T>>, RclrsError>
224218
where
225219
T: rosidl_runtime_rs::Service,
226220
F: Fn(&rmw_request_id_t, T::Request) -> T::Response + 'static + Send,
227221
{
228-
let service = Arc::new(crate::node::service::Service::<T>::new(
229-
self, topic, callback,
230-
)?);
231-
self.services
232-
.push(Arc::downgrade(&service) as Weak<dyn ServiceBase>);
233-
Ok(service)
222+
Service::<T>::new(self, topic, callback)
234223
}
235224

236225
/// Creates a [`Subscription`][1].
@@ -247,10 +236,7 @@ impl Node {
247236
T: Message,
248237
F: FnMut(T) + 'static + Send,
249238
{
250-
let subscription = Arc::new(Subscription::<T>::new(self, topic, qos, callback)?);
251-
self.subscriptions
252-
.push(Arc::downgrade(&subscription) as Weak<dyn SubscriptionBase>);
253-
Ok(subscription)
239+
Subscription::new(self, topic, qos, callback)
254240
}
255241

256242
/// Returns the subscriptions that have not been dropped yet.

rclrs/src/node/client.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ where
7878
T: rosidl_runtime_rs::Service,
7979
{
8080
/// Creates a new client.
81-
pub fn new(node: &Node, topic: &str) -> Result<Self, RclrsError>
81+
pub fn new(node: &mut Node, topic: &str) -> Result<Arc<Self>, RclrsError>
8282
where
8383
T: rosidl_runtime_rs::Service,
8484
{
@@ -114,14 +114,15 @@ where
114114
rcl_client_mtx: Mutex::new(rcl_client),
115115
rcl_node_mtx: node.rcl_node_mtx.clone(),
116116
});
117-
118-
Ok(Self {
117+
let client = Arc::new(Self {
119118
handle,
120119
requests: Mutex::new(HashMap::new()),
121120
futures: Arc::new(Mutex::new(
122121
HashMap::<RequestId, oneshot::Sender<T::Response>>::new(),
123122
)),
124-
})
123+
});
124+
node.clients.push(Arc::downgrade(&client) as _);
125+
Ok(client)
125126
}
126127

127128
/// Sends a request with a callback to be called with the response.

rclrs/src/node/service.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ where
6767
T: rosidl_runtime_rs::Service,
6868
{
6969
/// Creates a new service.
70-
pub fn new<F>(node: &Node, topic: &str, callback: F) -> Result<Self, RclrsError>
70+
pub fn new<F>(node: &mut Node, topic: &str, callback: F) -> Result<Arc<Self>, RclrsError>
7171
where
7272
T: rosidl_runtime_rs::Service,
7373
F: Fn(&rmw_request_id_t, T::Request) -> T::Response + 'static + Send,
@@ -104,11 +104,13 @@ where
104104
handle: Mutex::new(service_handle),
105105
node_handle: node.rcl_node_mtx.clone(),
106106
});
107-
108-
Ok(Self {
107+
let service = Arc::new(Self {
109108
handle,
110109
callback: Mutex::new(Box::new(callback)),
111-
})
110+
});
111+
node.services.push(Arc::downgrade(&service) as _);
112+
113+
Ok(service)
112114
}
113115

114116
/// Fetches a new request.

rclrs/src/node/subscription.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -75,11 +75,11 @@ where
7575
{
7676
/// Creates a new subscription.
7777
pub fn new<F>(
78-
node: &Node,
78+
node: &mut Node,
7979
topic: &str,
8080
qos: QoSProfile,
8181
callback: F,
82-
) -> Result<Self, RclrsError>
82+
) -> Result<Arc<Self>, RclrsError>
8383
where
8484
T: Message,
8585
F: FnMut(T) + 'static + Send,
@@ -117,12 +117,13 @@ where
117117
rcl_subscription_mtx: Mutex::new(rcl_subscription),
118118
rcl_node_mtx: node.rcl_node_mtx.clone(),
119119
});
120-
121-
Ok(Self {
120+
let subscription = Arc::new(Self {
122121
handle,
123122
callback: Mutex::new(Box::new(callback)),
124123
message: PhantomData,
125-
})
124+
});
125+
node.subscriptions.push(Arc::downgrade(&subscription) as _);
126+
Ok(subscription)
126127
}
127128

128129
/// Returns the topic name of the subscription.
@@ -216,9 +217,9 @@ mod tests {
216217
fn test_instantiate_subscriber() -> Result<(), RclrsError> {
217218
let context =
218219
Context::new(vec![]).expect("Context instantiation is expected to be a success");
219-
let node = create_node(&context, "test_new_subscriber")?;
220+
let mut node = create_node(&context, "test_new_subscriber")?;
220221
let _subscriber = Subscription::<std_msgs::msg::String>::new(
221-
&node,
222+
&mut node,
222223
"test",
223224
QOS_PROFILE_DEFAULT,
224225
move |_: std_msgs::msg::String| {},

0 commit comments

Comments
 (0)