@@ -4,15 +4,42 @@ use crate::{Context, RclrsError, ToResult};
44use std:: sync:: { atomic:: AtomicBool , Arc , Mutex , MutexGuard } ;
55
66/// A struct for encapsulating a guard condition - a waitable trigger
7+ ///
8+ /// # Example
9+ /// ```
10+ /// # use rclrs::{Context, GuardCondition, WaitSet, RclrsError};
11+ /// # use std::sync::{Arc, atomic::Ordering};
12+ ///
13+ /// let context = Context::new([])?;
14+ ///
15+ /// let atomic_bool = Arc::new(std::sync::atomic::AtomicBool::new(false));
16+ /// let atomic_bool_for_closure = Arc::clone(&atomic_bool);
17+ ///
18+ /// let gc = Arc::new(GuardCondition::new(
19+ /// &context,
20+ /// Some(Box::new(move || {
21+ /// atomic_bool_for_closure.store(true, Ordering::Relaxed);
22+ /// })),
23+ /// ));
24+ ///
25+ /// let mut ws = WaitSet::new(0, 1, 0, 0, 0, 0, &context)?;
26+ /// ws.add_guard_condition(Arc::clone(&gc))?;
27+ ///
28+ /// // Trigger the guard condition, firing the callback and waking any wait sets being waited on
29+ /// gc.trigger()?;
30+ ///
31+ /// // The wait call will now immediately return
32+ /// ws.wait(Some(std::time::Duration::from_millis(10)))?;
33+ ///
34+ /// # Ok::<(), RclrsError>(())
35+ /// ```
736pub struct GuardCondition {
837 /// The rcl_guard_condition_t that this struct encapsulates.
938 rcl_guard_condition : Arc < Mutex < rcl_guard_condition_t > > ,
1039 /// An optional callback to call when this guard condition is triggered.
11- callback : Option < Box < dyn Fn ( usize ) > > ,
40+ callback : Option < Box < dyn Fn ( ) + Send > > ,
1241 /// A flag to indicate if this guard condition has already been assigned to a wait set.
1342 pub ( crate ) in_use_by_wait_set : Arc < AtomicBool > ,
14- /// A count for the number of times this guard condition was triggered, but no callback was assigned.
15- unread_count : usize ,
1643}
1744
1845impl Drop for GuardCondition {
@@ -24,12 +51,19 @@ impl Drop for GuardCondition {
2451 }
2552}
2653
54+ // SAFETY: rcl_guard_condition is the only member that doesn't implement Send, and it is designed to be accessed from other threads
55+ unsafe impl Send for GuardCondition { }
56+
57+ // SAFETY: The mutexes and atomic members ensure synchronized access to members, and the callback is reentrant
58+ unsafe impl Sync for GuardCondition { }
59+
2760impl GuardCondition {
2861 /// Creates a new guard condition
29- pub fn new ( context : & Context ) -> Self {
62+ pub fn new ( context : & Context , callback : Option < Box < dyn Fn ( ) + Send > > ) -> Self {
63+ // SAFETY: Getting a zero initialized value is always safe
3064 let mut guard_condition = unsafe { rcl_get_zero_initialized_guard_condition ( ) } ;
3165 unsafe {
32- // SAFETY: The context must be valid. No other preconditions for this function.
66+ // SAFETY: The context must be valid, and the guard condition must be zero-initialized
3367 rcl_guard_condition_init (
3468 & mut guard_condition,
3569 & mut * context. rcl_context_mtx . lock ( ) . unwrap ( ) ,
@@ -39,9 +73,8 @@ impl GuardCondition {
3973
4074 Self {
4175 rcl_guard_condition : Arc :: new ( Mutex :: new ( guard_condition) ) ,
42- callback : None ,
76+ callback,
4377 in_use_by_wait_set : Arc :: new ( AtomicBool :: new ( false ) ) ,
44- unread_count : 0 ,
4578 }
4679 }
4780
@@ -50,32 +83,14 @@ impl GuardCondition {
5083 self . rcl_guard_condition . lock ( ) . unwrap ( )
5184 }
5285
53- /// Sets the callback to call when this guard condition is triggered.
54- pub fn set_on_trigger_callback ( & mut self , callback : Option < Box < dyn Fn ( usize ) > > ) {
55- match callback {
56- Some ( callback) => {
57- if self . unread_count > 0 {
58- callback ( self . unread_count ) ;
59- self . unread_count = 0 ;
60- }
61- self . callback = Some ( callback) ;
62- }
63- None => {
64- self . callback = None ;
65- self . unread_count = 0 ;
66- }
67- }
68- }
69-
7086 /// Triggers this guard condition, activating the wait set, and calling the optionally assigned callback.
71- pub fn trigger ( & mut self ) -> Result < ( ) , RclrsError > {
87+ pub fn trigger ( & self ) -> Result < ( ) , RclrsError > {
7288 unsafe {
7389 // SAFETY: The rcl_guard_condition_t is valid.
7490 rcl_trigger_guard_condition ( & mut * self . rcl_guard_condition . lock ( ) . unwrap ( ) ) . ok ( ) ?;
7591 }
76- match & self . callback {
77- Some ( callback) => callback ( 1 ) ,
78- None => self . unread_count += 1 ,
92+ if self . callback . is_some ( ) {
93+ self . callback . as_ref ( ) . unwrap ( ) ( ) ;
7994 }
8095 Ok ( ( ) )
8196 }
@@ -90,43 +105,44 @@ mod tests {
90105 #[ test]
91106 fn test_guard_condition ( ) -> Result < ( ) , RclrsError > {
92107 let context = Context :: new ( [ ] ) ?;
93- let mut gc = GuardCondition :: new ( & context) ;
94108
95- let atomic_usize = Arc :: new ( std:: sync:: atomic:: AtomicUsize :: new ( 0 ) ) ;
96- let atomic_usize_for_closure = Arc :: clone ( & atomic_usize ) ;
109+ let atomic_bool = Arc :: new ( std:: sync:: atomic:: AtomicBool :: new ( false ) ) ;
110+ let atomic_bool_for_closure = Arc :: clone ( & atomic_bool ) ;
97111
98- gc. set_on_trigger_callback ( Some ( Box :: new ( move |count| {
99- atomic_usize_for_closure. store ( count, Ordering :: Relaxed ) ;
100- } ) ) ) ;
112+ let gc = GuardCondition :: new (
113+ & context,
114+ Some ( Box :: new ( move || {
115+ atomic_bool_for_closure. store ( true , Ordering :: Relaxed ) ;
116+ } ) ) ,
117+ ) ;
101118
102119 gc. trigger ( ) ?;
103120
104- assert_eq ! ( atomic_usize . load( Ordering :: Relaxed ) , 1 ) ;
121+ assert_eq ! ( atomic_bool . load( Ordering :: Relaxed ) , true ) ;
105122
106123 Ok ( ( ) )
107124 }
108125
109126 #[ test]
110127 fn test_guard_condition_wait ( ) -> Result < ( ) , RclrsError > {
111128 let context = Context :: new ( [ ] ) ?;
112- let gc = Arc :: new ( Mutex :: new ( GuardCondition :: new ( & context) ) ) ;
113129
114- let atomic_usize = Arc :: new ( std:: sync:: atomic:: AtomicUsize :: new ( 0 ) ) ;
115- let atomic_usize_for_closure = Arc :: clone ( & atomic_usize ) ;
130+ let atomic_bool = Arc :: new ( std:: sync:: atomic:: AtomicBool :: new ( false ) ) ;
131+ let atomic_bool_for_closure = Arc :: clone ( & atomic_bool ) ;
116132
117- gc. lock ( )
118- . unwrap ( )
119- . set_on_trigger_callback ( Some ( Box :: new ( move |count| {
120- atomic_usize_for_closure. store ( count, Ordering :: Relaxed ) ;
121- } ) ) ) ;
133+ let gc = Arc :: new ( GuardCondition :: new (
134+ & context,
135+ Some ( Box :: new ( move || {
136+ atomic_bool_for_closure. store ( true , Ordering :: Relaxed ) ;
137+ } ) ) ,
138+ ) ) ;
122139
123140 let mut ws = WaitSet :: new ( 0 , 1 , 0 , 0 , 0 , 0 , & context) ?;
124141 ws. add_guard_condition ( Arc :: clone ( & gc) ) ?;
125- gc. lock ( ) . unwrap ( ) . trigger ( ) ?;
142+ gc. trigger ( ) ?;
126143
127- assert_eq ! ( atomic_usize. load( Ordering :: Relaxed ) , 1 ) ;
128- let wait_result = ws. wait ( Some ( std:: time:: Duration :: from_millis ( 10 ) ) ) ?;
129- assert_eq ! ( wait_result. guard_conditions. len( ) , 1 ) ;
144+ assert_eq ! ( atomic_bool. load( Ordering :: Relaxed ) , true ) ;
145+ ws. wait ( Some ( std:: time:: Duration :: from_millis ( 10 ) ) ) ?;
130146
131147 Ok ( ( ) )
132148 }
0 commit comments