Skip to content

Commit 1e01cd4

Browse files
Adds Guard Conditions
- Adds the Guard Condition struct encapsulating rcl_guard_condition_t. - Adds optional callbacks and a trigger method to approximate the rclcpp implementation. - Adds an `add_guard_condition` method to WaitSet to add the GuardCondition to the WaitSet
1 parent ec85565 commit 1e01cd4

File tree

2 files changed

+195
-0
lines changed

2 files changed

+195
-0
lines changed

rclrs/src/wait.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ use std::time::Duration;
2424
use std::vec::Vec;
2525

2626
mod exclusivity_guard;
27+
mod guard_condition;
2728
use exclusivity_guard::*;
29+
pub use guard_condition::*;
2830

2931
/// A struct for waiting on subscriptions and other waitable entities to become ready.
3032
pub struct WaitSet {
@@ -36,6 +38,8 @@ pub struct WaitSet {
3638
// even in the error case.
3739
subscriptions: Vec<ExclusivityGuard<Arc<dyn SubscriptionBase>>>,
3840
clients: Vec<ExclusivityGuard<Arc<dyn ClientBase>>>,
41+
// The guard conditions that are currently registered in the wait set.
42+
guard_conditions: Vec<ExclusivityGuard<Arc<GuardCondition>>>,
3943
services: Vec<ExclusivityGuard<Arc<dyn ServiceBase>>>,
4044
}
4145

@@ -105,6 +109,7 @@ impl WaitSet {
105109
rcl_wait_set,
106110
_rcl_context_mtx: context.rcl_context_mtx.clone(),
107111
subscriptions: Vec::new(),
112+
guard_conditions: Vec::new(),
108113
clients: Vec::new(),
109114
services: Vec::new(),
110115
})
@@ -116,6 +121,7 @@ impl WaitSet {
116121
/// [`WaitSet::new`].
117122
pub fn clear(&mut self) {
118123
self.subscriptions.clear();
124+
self.guard_conditions.clear();
119125
self.clients.clear();
120126
self.services.clear();
121127
// This cannot fail – the rcl_wait_set_clear function only checks that the input handle is
@@ -159,6 +165,38 @@ impl WaitSet {
159165
Ok(())
160166
}
161167

168+
/// Adds a guard condition to the wait set.
169+
///
170+
/// # Errors
171+
/// - If the guard condition was already added to this wait set or another one,
172+
/// [`AlreadyAddedToWaitSet`][1] will be returned
173+
/// - If the number of guard conditions in the wait set is larger than the
174+
/// capacity set in [`WaitSet::new`], [`WaitSetFull`][2] will be returned
175+
///
176+
/// [1]: crate::RclrsError
177+
/// [2]: crate::RclReturnCode
178+
pub fn add_guard_condition(
179+
&mut self,
180+
guard_condition: Arc<GuardCondition>,
181+
) -> Result<(), RclrsError> {
182+
let exclusive_guard_condition = ExclusivityGuard::new(
183+
Arc::clone(&guard_condition),
184+
Arc::clone(&guard_condition.in_use_by_wait_set),
185+
)?;
186+
187+
unsafe {
188+
// SAFETY: Safe if the wait set and guard condition are initialized
189+
rcl_wait_set_add_guard_condition(
190+
&mut self.rcl_wait_set,
191+
&*guard_condition.rcl_guard_condition.lock().unwrap(),
192+
std::ptr::null_mut(),
193+
)
194+
.ok()?;
195+
}
196+
self.guard_conditions.push(exclusive_guard_condition);
197+
Ok(())
198+
}
199+
162200
/// Adds a client to the wait set.
163201
///
164202
/// # Errors
@@ -275,6 +313,7 @@ impl WaitSet {
275313
.push(Arc::clone(&subscription.waitable));
276314
}
277315
}
316+
278317
for (i, client) in self.clients.iter().enumerate() {
279318
// SAFETY: The `clients` entry is an array of pointers, and this dereferencing is
280319
// equivalent to

rclrs/src/wait/guard_condition.rs

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
use crate::rcl_bindings::*;
2+
use crate::{Context, RclrsError, ToResult};
3+
4+
use std::sync::{atomic::AtomicBool, Arc, Mutex};
5+
6+
/// A waitiable entity used for waking up a wait set manually
7+
///
8+
/// # Example
9+
/// ```
10+
/// # use rclrs::{Context, GuardCondition, WaitSet, RclrsError};
11+
/// # use std::sync::{Arc, atomic::Ordering};
12+
///
13+
/// let context = Context::new([])?;
14+
///
15+
/// let atomic_bool = Arc::new(std::sync::atomic::AtomicBool::new(false));
16+
/// let atomic_bool_for_closure = Arc::clone(&atomic_bool);
17+
///
18+
/// let gc = Arc::new(GuardCondition::new(
19+
/// &context,
20+
/// Some(Box::new(move || {
21+
/// atomic_bool_for_closure.store(true, Ordering::Relaxed);
22+
/// })),
23+
/// ));
24+
///
25+
/// let mut ws = WaitSet::new(0, 1, 0, 0, 0, 0, &context)?;
26+
/// ws.add_guard_condition(Arc::clone(&gc))?;
27+
///
28+
/// // Trigger the guard condition, firing the callback and waking the wait set being waited on, if any.
29+
/// gc.trigger()?;
30+
///
31+
/// // The provided callback has now been called.
32+
/// assert_eq!(atomic_bool.load(Ordering::Relaxed), true);
33+
///
34+
/// // The wait call will now immediately return.
35+
/// ws.wait(Some(std::time::Duration::from_millis(10)))?;
36+
///
37+
/// # Ok::<(), RclrsError>(())
38+
/// ```
39+
pub struct GuardCondition {
40+
/// The rcl_guard_condition_t that this struct encapsulates.
41+
pub(crate) rcl_guard_condition: Arc<Mutex<rcl_guard_condition_t>>,
42+
/// An optional callback to call when this guard condition is triggered.
43+
callback: Option<Box<dyn Fn() + Send + Sync>>,
44+
/// A flag to indicate if this guard condition has already been assigned to a wait set.
45+
pub(crate) in_use_by_wait_set: Arc<AtomicBool>,
46+
}
47+
48+
impl Drop for GuardCondition {
49+
fn drop(&mut self) {
50+
unsafe {
51+
// SAFETY: No precondition for this function (besides passing in a valid guard condition)
52+
rcl_guard_condition_fini(&mut *self.rcl_guard_condition.lock().unwrap());
53+
}
54+
}
55+
}
56+
57+
// SAFETY: rcl_guard_condition is the only member that doesn't implement Send, and it is designed to be accessed from other threads
58+
unsafe impl Send for rcl_guard_condition_t {}
59+
60+
// SAFETY: The mutexes and atomic members ensure synchronized access to members, and the callback is reentrant
61+
unsafe impl Sync for GuardCondition {}
62+
63+
impl GuardCondition {
64+
/// Creates a new guard condition.
65+
pub fn new(context: &Context, callback: Option<Box<dyn Fn() + Send + Sync>>) -> Arc<Self> {
66+
// SAFETY: Getting a zero initialized value is always safe
67+
let mut guard_condition = unsafe { rcl_get_zero_initialized_guard_condition() };
68+
unsafe {
69+
// SAFETY: The context must be valid, and the guard condition must be zero-initialized
70+
rcl_guard_condition_init(
71+
&mut guard_condition,
72+
&mut *context.rcl_context_mtx.lock().unwrap(),
73+
rcl_guard_condition_get_default_options(),
74+
);
75+
}
76+
77+
Arc::new(Self {
78+
rcl_guard_condition: Arc::new(Mutex::new(guard_condition)),
79+
callback,
80+
in_use_by_wait_set: Arc::new(AtomicBool::new(false)),
81+
})
82+
}
83+
84+
/// Triggers this guard condition, activating the wait set, and calling the optionally assigned callback.
85+
pub fn trigger(&self) -> Result<(), RclrsError> {
86+
unsafe {
87+
// SAFETY: The rcl_guard_condition_t is valid.
88+
rcl_trigger_guard_condition(&mut *self.rcl_guard_condition.lock().unwrap()).ok()?;
89+
}
90+
if let Some(callback) = &self.callback {
91+
callback();
92+
}
93+
Ok(())
94+
}
95+
}
96+
97+
#[cfg(test)]
98+
mod tests {
99+
use super::*;
100+
use crate::WaitSet;
101+
use std::sync::atomic::Ordering;
102+
103+
#[test]
104+
fn test_guard_condition() -> Result<(), RclrsError> {
105+
let context = Context::new([])?;
106+
107+
let atomic_bool = Arc::new(std::sync::atomic::AtomicBool::new(false));
108+
let atomic_bool_for_closure = Arc::clone(&atomic_bool);
109+
110+
let guard_condition = GuardCondition::new(
111+
&context,
112+
Some(Box::new(move || {
113+
atomic_bool_for_closure.store(true, Ordering::Relaxed);
114+
})),
115+
);
116+
117+
guard_condition.trigger()?;
118+
119+
assert!(atomic_bool.load(Ordering::Relaxed));
120+
121+
Ok(())
122+
}
123+
124+
#[test]
125+
fn test_guard_condition_wait() -> Result<(), RclrsError> {
126+
let context = Context::new([])?;
127+
128+
let atomic_bool = Arc::new(std::sync::atomic::AtomicBool::new(false));
129+
let atomic_bool_for_closure = Arc::clone(&atomic_bool);
130+
131+
let guard_condition = GuardCondition::new(
132+
&context,
133+
Some(Box::new(move || {
134+
atomic_bool_for_closure.store(true, Ordering::Relaxed);
135+
})),
136+
);
137+
138+
let mut wait_set = WaitSet::new(0, 1, 0, 0, 0, 0, &context)?;
139+
wait_set.add_guard_condition(Arc::clone(&guard_condition))?;
140+
guard_condition.trigger()?;
141+
142+
assert!(atomic_bool.load(Ordering::Relaxed));
143+
wait_set.wait(Some(std::time::Duration::from_millis(10)))?;
144+
145+
Ok(())
146+
}
147+
148+
fn assert_send<T: Send>() {}
149+
fn assert_sync<T: Sync>() {}
150+
151+
#[test]
152+
fn test_guard_condition_is_send_and_sync() {
153+
assert_send::<GuardCondition>();
154+
assert_sync::<GuardCondition>();
155+
}
156+
}

0 commit comments

Comments
 (0)