diff --git a/Cargo.toml b/Cargo.toml index 8cc82a8..b09450a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "dispatch" -version = "0.2.0" -authors = ["Steven Sheldon"] +version = "0.2.2" +authors = ["Steven Sheldon", "Sunip K. Mukherjee"] edition = "2018" description = "Rust wrapper for Apple's Grand Central Dispatch." diff --git a/README.md b/README.md index 42bda2e..907bee5 100644 --- a/README.md +++ b/README.md @@ -42,3 +42,34 @@ assert!(nums == [2, 3]); let nums = queue.map(nums, |x| x.to_string()); assert!(nums[0] == "2"); ``` + +# Timer Events + +GCD provides a timer facility that can be used to schedule blocks of code to +execute periodically, starting after a delay. The `TimerNode` type is a wrapper +around a dispatch source that can be used to schedule timer events. + +`TimerNode` has the `schedule` method to schedule a timer event, the `update` +method to update the timer's interval, and the `cancel` method to cancel the +timer. Dropping a `TimerNode` will cancel the timer. + +```rust +use dispatch::TimerNode; +use std::time::Duration; +use std::thread::sleep; +use std::sync::{Arc, Mutex}; + +let count = Arc::new(Mutex::new(0)); +let count_clone = count.clone(); +let mut timer = TimerNode::schedule(Duration::from_millis(10), Duration::from_secs(0), None, move || { + let mut count = count_clone.lock().unwrap(); + *count += 1; + println!("Hello, counter! -> {}", *count); +}).unwrap(); +sleep(Duration::from_millis(100)); +timer.update(Duration::from_millis(20), Duration::from_secs(0), None); // change the time period +sleep(Duration::from_millis(100)); +drop(timer); // cancel the timer +println!("Counter: {}", *count.lock().unwrap()); +assert!(*count.lock().unwrap() >= 15); +``` \ No newline at end of file diff --git a/examples/main.rs b/examples/main.rs index c92dcf1..6841a7e 100644 --- a/examples/main.rs +++ b/examples/main.rs @@ -1,8 +1,8 @@ extern crate dispatch; +use dispatch::{Queue, QueuePriority}; use std::io; use std::process::exit; -use dispatch::{Queue, QueuePriority}; /// Prompts for a number and adds it to the given sum. /// diff --git a/src/ffi.rs b/src/ffi.rs index 91dad6f..3e8225d 100644 --- a/src/ffi.rs +++ b/src/ffi.rs @@ -4,13 +4,17 @@ use std::os::raw::{c_char, c_long, c_ulong, c_void}; #[repr(C)] -pub struct dispatch_object_s { _private: [u8; 0] } +pub struct dispatch_object_s { + _private: [u8; 0], +} // dispatch_block_t -pub type dispatch_function_t = extern fn(*mut c_void); +pub type dispatch_function_t = extern "C" fn(*mut c_void); pub type dispatch_semaphore_t = *mut dispatch_object_s; pub type dispatch_group_t = *mut dispatch_object_s; pub type dispatch_object_t = *mut dispatch_object_s; +pub type dispatch_source_t = *mut dispatch_object_s; +pub type dispatch_source_type_t = *const dispatch_object_s; pub type dispatch_once_t = c_long; pub type dispatch_queue_t = *mut dispatch_object_s; pub type dispatch_time_t = u64; @@ -25,39 +29,88 @@ pub type dispatch_time_t = u64; // dispatch_io_interval_flags_t pub type dispatch_queue_attr_t = *const dispatch_object_s; -#[cfg_attr(any(target_os = "macos", target_os = "ios"), - link(name = "System", kind = "dylib"))] -#[cfg_attr(not(any(target_os = "macos", target_os = "ios")), - link(name = "dispatch", kind = "dylib"))] -extern { +#[cfg_attr( + any(target_os = "macos", target_os = "ios"), + link(name = "System", kind = "dylib") +)] +#[cfg_attr( + not(any(target_os = "macos", target_os = "ios")), + link(name = "dispatch", kind = "dylib") +)] +extern "C" { static _dispatch_main_q: dispatch_object_s; static _dispatch_queue_attr_concurrent: dispatch_object_s; + static _dispatch_source_type_data_add: dispatch_object_s; + static _dispatch_source_type_data_or: dispatch_object_s; + static _dispatch_source_type_mach_recv: dispatch_object_s; + static _dispatch_source_type_mach_send: dispatch_object_s; + static _dispatch_source_type_proc: dispatch_object_s; + static _dispatch_source_type_read: dispatch_object_s; + static _dispatch_source_type_signal: dispatch_object_s; + static _dispatch_source_type_timer: dispatch_object_s; + static _dispatch_source_type_vnode: dispatch_object_s; + static _dispatch_source_type_write: dispatch_object_s; pub fn dispatch_get_global_queue(identifier: c_long, flags: c_ulong) -> dispatch_queue_t; - pub fn dispatch_queue_create(label: *const c_char, attr: dispatch_queue_attr_t) -> dispatch_queue_t; + pub fn dispatch_queue_create( + label: *const c_char, + attr: dispatch_queue_attr_t, + ) -> dispatch_queue_t; // dispatch_queue_attr_t dispatch_queue_attr_make_with_qos_class ( dispatch_queue_attr_t attr, dispatch_qos_class_t qos_class, int relative_priority ); pub fn dispatch_queue_get_label(queue: dispatch_queue_t) -> *const c_char; pub fn dispatch_set_target_queue(object: dispatch_object_t, queue: dispatch_queue_t); pub fn dispatch_main(); // void dispatch_async ( dispatch_queue_t queue, dispatch_block_t block ); - pub fn dispatch_async_f(queue: dispatch_queue_t, context: *mut c_void, work: dispatch_function_t); + pub fn dispatch_async_f( + queue: dispatch_queue_t, + context: *mut c_void, + work: dispatch_function_t, + ); // void dispatch_sync ( dispatch_queue_t queue, dispatch_block_t block ); - pub fn dispatch_sync_f(queue: dispatch_queue_t, context: *mut c_void, work: dispatch_function_t); + pub fn dispatch_sync_f( + queue: dispatch_queue_t, + context: *mut c_void, + work: dispatch_function_t, + ); // void dispatch_after ( dispatch_time_t when, dispatch_queue_t queue, dispatch_block_t block ); - pub fn dispatch_after_f(when: dispatch_time_t, queue: dispatch_queue_t, context: *mut c_void, work: dispatch_function_t); + pub fn dispatch_after_f( + when: dispatch_time_t, + queue: dispatch_queue_t, + context: *mut c_void, + work: dispatch_function_t, + ); // void dispatch_apply ( size_t iterations, dispatch_queue_t queue, void (^block)(size_t) ); - pub fn dispatch_apply_f(iterations: usize, queue: dispatch_queue_t, context: *mut c_void, work: extern fn(*mut c_void, usize)); + pub fn dispatch_apply_f( + iterations: usize, + queue: dispatch_queue_t, + context: *mut c_void, + work: extern "C" fn(*mut c_void, usize), + ); // void dispatch_once ( dispatch_once_t *predicate, dispatch_block_t block ); - pub fn dispatch_once_f(predicate: *mut dispatch_once_t, context: *mut c_void, function: dispatch_function_t); + pub fn dispatch_once_f( + predicate: *mut dispatch_once_t, + context: *mut c_void, + function: dispatch_function_t, + ); // void dispatch_group_async ( dispatch_group_t group, dispatch_queue_t queue, dispatch_block_t block ); - pub fn dispatch_group_async_f(group: dispatch_group_t, queue: dispatch_queue_t, context: *mut c_void, work: dispatch_function_t); + pub fn dispatch_group_async_f( + group: dispatch_group_t, + queue: dispatch_queue_t, + context: *mut c_void, + work: dispatch_function_t, + ); pub fn dispatch_group_create() -> dispatch_group_t; pub fn dispatch_group_enter(group: dispatch_group_t); pub fn dispatch_group_leave(group: dispatch_group_t); // void dispatch_group_notify ( dispatch_group_t group, dispatch_queue_t queue, dispatch_block_t block ); - pub fn dispatch_group_notify_f(group: dispatch_group_t, queue: dispatch_queue_t, context: *mut c_void, work: dispatch_function_t); + pub fn dispatch_group_notify_f( + group: dispatch_group_t, + queue: dispatch_queue_t, + context: *mut c_void, + work: dispatch_function_t, + ); pub fn dispatch_group_wait(group: dispatch_group_t, timeout: dispatch_time_t) -> c_long; pub fn dispatch_get_context(object: dispatch_object_t) -> *mut c_void; @@ -70,15 +123,28 @@ extern { pub fn dispatch_semaphore_create(value: c_long) -> dispatch_semaphore_t; pub fn dispatch_semaphore_signal(dsema: dispatch_semaphore_t) -> c_long; - pub fn dispatch_semaphore_wait(dsema: dispatch_semaphore_t, timeout: dispatch_time_t) -> c_long; + pub fn dispatch_semaphore_wait(dsema: dispatch_semaphore_t, timeout: dispatch_time_t) + -> c_long; // void dispatch_barrier_async ( dispatch_queue_t queue, dispatch_block_t block ); - pub fn dispatch_barrier_async_f(queue: dispatch_queue_t, context: *mut c_void, work: dispatch_function_t); + pub fn dispatch_barrier_async_f( + queue: dispatch_queue_t, + context: *mut c_void, + work: dispatch_function_t, + ); // void dispatch_barrier_sync ( dispatch_queue_t queue, dispatch_block_t block ); - pub fn dispatch_barrier_sync_f(queue: dispatch_queue_t, context: *mut c_void, work: dispatch_function_t); - - // void dispatch_source_cancel ( dispatch_source_t source ); - // dispatch_source_t dispatch_source_create ( dispatch_source_type_t type, uintptr_t handle, unsigned long mask, dispatch_queue_t queue ); + pub fn dispatch_barrier_sync_f( + queue: dispatch_queue_t, + context: *mut c_void, + work: dispatch_function_t, + ); + pub fn dispatch_source_cancel(source: dispatch_source_t); + pub fn dispatch_source_create( + type_: dispatch_source_type_t, + handle: *const c_void, + mask: c_ulong, + queue: dispatch_queue_t, + ) -> dispatch_object_t; // unsigned long dispatch_source_get_data ( dispatch_source_t source ); // uintptr_t dispatch_source_get_handle ( dispatch_source_t source ); // unsigned long dispatch_source_get_mask ( dispatch_source_t source ); @@ -86,10 +152,21 @@ extern { // void dispatch_source_set_registration_handler ( dispatch_source_t source, dispatch_block_t handler ); // void dispatch_source_set_registration_handler_f ( dispatch_source_t source, dispatch_function_t handler ); // void dispatch_source_set_cancel_handler ( dispatch_source_t source, dispatch_block_t handler ); - // void dispatch_source_set_cancel_handler_f ( dispatch_source_t source, dispatch_function_t handler ); + pub fn dispatch_source_set_cancel_handler_f( + source: dispatch_source_t, + handler: dispatch_function_t, + ); // void dispatch_source_set_event_handler ( dispatch_source_t source, dispatch_block_t handler ); - // void dispatch_source_set_event_handler_f ( dispatch_source_t source, dispatch_function_t handler ); - // void dispatch_source_set_timer ( dispatch_source_t source, dispatch_time_t start, uint64_t interval, uint64_t leeway ); + pub fn dispatch_source_set_event_handler_f( + source: dispatch_source_t, + handler: dispatch_function_t, + ); + pub fn dispatch_source_set_timer( + source: dispatch_source_t, + start: dispatch_time_t, + interval: u64, + leeway: u64, + ); // long dispatch_source_testcancel ( dispatch_source_t source ); // void dispatch_read ( dispatch_fd_t fd, size_t length, dispatch_queue_t queue, void (^handler)(dispatch_data_t data, int error) ); @@ -136,14 +213,31 @@ pub fn dispatch_get_main_queue() -> dispatch_queue_t { } pub const DISPATCH_QUEUE_SERIAL: dispatch_queue_attr_t = 0 as dispatch_queue_attr_t; -pub static DISPATCH_QUEUE_CONCURRENT: &'static dispatch_object_s = unsafe { &_dispatch_queue_attr_concurrent }; +pub static DISPATCH_QUEUE_CONCURRENT: &dispatch_object_s = + unsafe { &_dispatch_queue_attr_concurrent }; + +pub static DISPATCH_SOURCE_TYPE_DATA_ADD: &dispatch_object_s = + unsafe { &_dispatch_source_type_data_add }; +pub static DISPATCH_SOURCE_TYPE_DATA_OR: &dispatch_object_s = + unsafe { &_dispatch_source_type_data_or }; +pub static DISPATCH_SOURCE_TYPE_MACH_RECV: &dispatch_object_s = + unsafe { &_dispatch_source_type_mach_recv }; +pub static DISPATCH_SOURCE_TYPE_MACH_SEND: &dispatch_object_s = + unsafe { &_dispatch_source_type_mach_send }; +pub static DISPATCH_SOURCE_TYPE_PROC: &dispatch_object_s = unsafe { &_dispatch_source_type_proc }; +pub static DISPATCH_SOURCE_TYPE_READ: &dispatch_object_s = unsafe { &_dispatch_source_type_read }; +pub static DISPATCH_SOURCE_TYPE_SIGNAL: &dispatch_object_s = + unsafe { &_dispatch_source_type_signal }; +pub static DISPATCH_SOURCE_TYPE_TIMER: &dispatch_object_s = unsafe { &_dispatch_source_type_timer }; +pub static DISPATCH_SOURCE_TYPE_VNODE: &dispatch_object_s = unsafe { &_dispatch_source_type_vnode }; +pub static DISPATCH_SOURCE_TYPE_WRITE: &dispatch_object_s = unsafe { &_dispatch_source_type_write }; -pub const DISPATCH_QUEUE_PRIORITY_HIGH: c_long = 2; -pub const DISPATCH_QUEUE_PRIORITY_DEFAULT: c_long = 0; -pub const DISPATCH_QUEUE_PRIORITY_LOW: c_long = -2; +pub const DISPATCH_QUEUE_PRIORITY_HIGH: c_long = 2; +pub const DISPATCH_QUEUE_PRIORITY_DEFAULT: c_long = 0; +pub const DISPATCH_QUEUE_PRIORITY_LOW: c_long = -2; pub const DISPATCH_QUEUE_PRIORITY_BACKGROUND: c_long = -1 << 15; -pub const DISPATCH_TIME_NOW: dispatch_time_t = 0; +pub const DISPATCH_TIME_NOW: dispatch_time_t = 0; pub const DISPATCH_TIME_FOREVER: dispatch_time_t = !0; #[cfg(test)] @@ -155,7 +249,7 @@ mod tests { use std::os::raw::c_void; use std::ptr; - extern fn serial_queue_test_add(num: *mut c_void) { + extern "C" fn serial_queue_test_add(num: *mut c_void) { unsafe { *(num as *mut u32) = 1; } diff --git a/src/group.rs b/src/group.rs index 6ba6c5b..84c0741 100644 --- a/src/group.rs +++ b/src/group.rs @@ -1,8 +1,8 @@ use std::time::Duration; use crate::ffi::*; -use crate::{context_and_function, time_after_delay, WaitTimeout}; use crate::queue::Queue; +use crate::{context_and_function, time_after_delay, WaitTimeout}; /// A Grand Central Dispatch group. /// @@ -18,7 +18,9 @@ impl Group { /// Creates a new dispatch `Group`. pub fn create() -> Group { unsafe { - Group { ptr: dispatch_group_create() } + Group { + ptr: dispatch_group_create(), + } } } @@ -32,7 +34,9 @@ impl Group { /// Submits a closure asynchronously to the given `Queue` and associates it /// with self. pub fn exec_async(&self, queue: &Queue, work: F) - where F: 'static + Send + FnOnce() { + where + F: 'static + Send + FnOnce(), + { let (context, work) = context_and_function(work); unsafe { dispatch_group_async_f(self.ptr, queue.ptr, context, work); @@ -43,7 +47,9 @@ impl Group { /// associated with self have completed. /// If self is empty, the closure is submitted immediately. pub fn notify(&self, queue: &Queue, work: F) - where F: 'static + Send + FnOnce() { + where + F: 'static + Send + FnOnce(), + { let (context, work) = context_and_function(work); unsafe { dispatch_group_notify_f(self.ptr, queue.ptr, context, work); @@ -52,9 +58,7 @@ impl Group { /// Waits synchronously for all tasks associated with self to complete. pub fn wait(&self) { - let result = unsafe { - dispatch_group_wait(self.ptr, DISPATCH_TIME_FOREVER) - }; + let result = unsafe { dispatch_group_wait(self.ptr, DISPATCH_TIME_FOREVER) }; assert!(result == 0, "Dispatch group wait errored"); } @@ -63,9 +67,7 @@ impl Group { /// Returns true if the tasks completed or false if the timeout elapsed. pub fn wait_timeout(&self, timeout: Duration) -> Result<(), WaitTimeout> { let when = time_after_delay(timeout); - let result = unsafe { - dispatch_group_wait(self.ptr, when) - }; + let result = unsafe { dispatch_group_wait(self.ptr, when) }; if result == 0 { Ok(()) } else { @@ -75,15 +77,13 @@ impl Group { /// Returns whether self is currently empty. pub fn is_empty(&self) -> bool { - let result = unsafe { - dispatch_group_wait(self.ptr, DISPATCH_TIME_NOW) - }; + let result = unsafe { dispatch_group_wait(self.ptr, DISPATCH_TIME_NOW) }; result == 0 } } -unsafe impl Sync for Group { } -unsafe impl Send for Group { } +unsafe impl Sync for Group {} +unsafe impl Send for Group {} impl Clone for Group { fn clone(&self) -> Self { @@ -113,11 +113,13 @@ impl GroupGuard { unsafe { dispatch_group_enter(group.ptr); } - GroupGuard { group: group.clone() } + GroupGuard { + group: group.clone(), + } } /// Drops self, leaving the `Group`. - pub fn leave(self) { } + pub fn leave(self) {} } impl Clone for GroupGuard { @@ -136,9 +138,9 @@ impl Drop for GroupGuard { #[cfg(test)] mod tests { - use std::sync::{Arc, Mutex}; - use crate::{Queue, QueueAttribute}; use super::Group; + use crate::{Queue, QueueAttribute}; + use std::sync::{Arc, Mutex}; #[test] fn test_group() { diff --git a/src/lib.rs b/src/lib.rs index c5cd981..eb46c09 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ +#![deny(missing_docs)] /*! Rust wrapper for Apple's Grand Central Dispatch (GCD). @@ -40,9 +41,39 @@ assert!(nums == [2, 3]); let nums = queue.map(nums, |x| x.to_string()); assert!(nums[0] == "2"); ``` -*/ -#![warn(missing_docs)] +# Timer Events + +GCD provides a timer facility that can be used to schedule blocks of code to +execute periodically, starting after a delay. The `TimerNode` type is a wrapper +around a dispatch source that can be used to schedule timer events. + +`TimerNode` has the `schedule` method to schedule a timer event, the `update` +method to update the timer's interval, and the `cancel` method to cancel the +timer. Dropping a `TimerNode` will cancel the timer. + +``` +use dispatch::TimerNode; +use std::time::Duration; +use std::thread::sleep; +use std::sync::{Arc, Mutex}; + +let count = Arc::new(Mutex::new(0)); +let count_clone = count.clone(); +let mut timer = TimerNode::schedule(Duration::from_millis(10), Duration::from_secs(0), None, move || { + let mut count = count_clone.lock().unwrap(); + *count += 1; + println!("Hello, counter! -> {}", *count); +}).unwrap(); +sleep(Duration::from_millis(100)); +timer.update(Duration::from_millis(20), Duration::from_secs(0), None); // change the time period +sleep(Duration::from_millis(100)); +drop(timer); // cancel the timer +println!("Counter: {}", *count.lock().unwrap()); +assert!(*count.lock().unwrap() >= 15); +``` + +*/ use std::error::Error; use std::fmt; @@ -56,13 +87,15 @@ pub use crate::group::{Group, GroupGuard}; pub use crate::once::Once; pub use crate::queue::{Queue, QueueAttribute, QueuePriority, SuspendGuard}; pub use crate::sem::{Semaphore, SemaphoreGuard}; +pub use crate::source::TimerNode; /// Raw foreign function interface for libdispatch. pub mod ffi; mod group; -mod queue; mod once; +mod queue; mod sem; +mod source; /// An error indicating a wait timed out. #[derive(Clone, Debug)] @@ -76,58 +109,94 @@ impl fmt::Display for WaitTimeout { } } -impl Error for WaitTimeout { } +impl Error for WaitTimeout {} fn time_after_delay(delay: Duration) -> dispatch_time_t { - delay.as_secs().checked_mul(1_000_000_000).and_then(|i| { - i.checked_add(delay.subsec_nanos() as u64) - }).and_then(|i| { - if i < (i64::max_value() as u64) { Some(i as i64) } else { None } - }).map_or(DISPATCH_TIME_FOREVER, |i| unsafe { - dispatch_time(DISPATCH_TIME_NOW, i) - }) + delay + .as_secs() + .checked_mul(1_000_000_000) + .and_then(|i| i.checked_add(delay.subsec_nanos() as u64)) + .and_then(|i| { + if i < (i64::MAX as u64) { + Some(i as i64) + } else { + None + } + }) + .map_or(DISPATCH_TIME_FOREVER, |i| unsafe { + dispatch_time(DISPATCH_TIME_NOW, i) + }) } fn context_and_function(closure: F) -> (*mut c_void, dispatch_function_t) - where F: FnOnce() { - extern fn work_execute_closure(context: Box) where F: FnOnce() { +where + F: FnOnce(), +{ + extern "C" fn work_execute_closure(context: Box) + where + F: FnOnce(), + { (*context)(); } let closure = Box::new(closure); - let func: extern fn(Box) = work_execute_closure::; + let func: extern "C" fn(Box) = work_execute_closure::; unsafe { - (mem::transmute(closure), mem::transmute(func)) + ( + mem::transmute::, *mut std::ffi::c_void>(closure), + mem::transmute::), extern "C" fn(*mut std::ffi::c_void)>( + func, + ), + ) } } -fn context_and_sync_function(closure: &mut Option) -> - (*mut c_void, dispatch_function_t) - where F: FnOnce() { - extern fn work_read_closure(context: &mut Option) where F: FnOnce() { +fn context_and_sync_function(closure: &mut Option) -> (*mut c_void, dispatch_function_t) +where + F: FnOnce(), +{ + extern "C" fn work_read_closure(context: &mut Option) + where + F: FnOnce(), + { // This is always passed Some, so it's safe to unwrap let closure = context.take().unwrap(); closure(); } let context: *mut Option = closure; - let func: extern fn(&mut Option) = work_read_closure::; + let func: extern "C" fn(&mut Option) = work_read_closure::; unsafe { - (context as *mut c_void, mem::transmute(func)) + ( + context as *mut c_void, + mem::transmute::< + for<'a> extern "C" fn(&'a mut std::option::Option), + extern "C" fn(*mut std::ffi::c_void), + >(func), + ) } } -fn context_and_apply_function(closure: &F) -> - (*mut c_void, extern fn(*mut c_void, usize)) - where F: Fn(usize) { - extern fn work_apply_closure(context: &F, iter: usize) - where F: Fn(usize) { +fn context_and_apply_function(closure: &F) -> (*mut c_void, extern "C" fn(*mut c_void, usize)) +where + F: Fn(usize), +{ + extern "C" fn work_apply_closure(context: &F, iter: usize) + where + F: Fn(usize), + { context(iter); } let context: *const F = closure; - let func: extern fn(&F, usize) = work_apply_closure::; + let func: extern "C" fn(&F, usize) = work_apply_closure::; unsafe { - (context as *mut c_void, mem::transmute(func)) + ( + context as *mut c_void, + mem::transmute::< + for<'a> extern "C" fn(&'a F, usize), + extern "C" fn(*mut std::ffi::c_void, usize), + >(func), + ) } } diff --git a/src/once.rs b/src/once.rs index c1dbfa8..7ae089e 100644 --- a/src/once.rs +++ b/src/once.rs @@ -1,7 +1,7 @@ use std::cell::UnsafeCell; -use crate::ffi::*; use crate::context_and_sync_function; +use crate::ffi::*; /// A predicate used to execute a closure only once for the lifetime of an /// application. @@ -13,7 +13,9 @@ pub struct Once { impl Once { /// Creates a new `Once`. pub const fn new() -> Once { - Once { predicate: UnsafeCell::new(0) } + Once { + predicate: UnsafeCell::new(0), + } } /// Executes a closure once, ensuring that no other closure has been or @@ -22,11 +24,16 @@ impl Once { /// If called simultaneously from multiple threads, waits synchronously // until the work has completed. #[inline(always)] - pub fn call_once(&'static self, work: F) where F: FnOnce() { + pub fn call_once(&'static self, work: F) + where + F: FnOnce(), + { #[cold] #[inline(never)] fn once(predicate: *mut dispatch_once_t, work: F) - where F: FnOnce() { + where + F: FnOnce(), + { let mut work = Some(work); let (context, work) = context_and_sync_function(&mut work); unsafe { @@ -43,7 +50,13 @@ impl Once { } } -unsafe impl Sync for Once { } +impl Default for Once { + fn default() -> Self { + Self::new() + } +} + +unsafe impl Sync for Once {} #[cfg(test)] mod tests { diff --git a/src/queue.rs b/src/queue.rs index 46ed9cd..2f43c62 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -6,8 +6,7 @@ use std::time::Duration; use crate::ffi::*; use crate::{ - context_and_function, context_and_sync_function, context_and_apply_function, - time_after_delay, + context_and_apply_function, context_and_function, context_and_sync_function, time_after_delay, }; /// The type of a dispatch queue. @@ -59,9 +58,9 @@ pub enum QueuePriority { impl QueuePriority { fn as_raw(&self) -> c_long { match *self { - QueuePriority::High => DISPATCH_QUEUE_PRIORITY_HIGH, - QueuePriority::Default => DISPATCH_QUEUE_PRIORITY_DEFAULT, - QueuePriority::Low => DISPATCH_QUEUE_PRIORITY_LOW, + QueuePriority::High => DISPATCH_QUEUE_PRIORITY_HIGH, + QueuePriority::Default => DISPATCH_QUEUE_PRIORITY_DEFAULT, + QueuePriority::Low => DISPATCH_QUEUE_PRIORITY_LOW, QueuePriority::Background => DISPATCH_QUEUE_PRIORITY_BACKGROUND, } } @@ -100,9 +99,7 @@ impl Queue { /// Creates a new dispatch `Queue`. pub fn create(label: &str, attr: QueueAttribute) -> Self { let label = CString::new(label).unwrap(); - let queue = unsafe { - dispatch_queue_create(label.as_ptr(), attr.as_raw()) - }; + let queue = unsafe { dispatch_queue_create(label.as_ptr(), attr.as_raw()) }; Queue { ptr: queue } } @@ -111,8 +108,7 @@ impl Queue { /// A dispatch queue's priority is inherited from its target queue. /// Additionally, if both the queue and its target are serial queues, /// their blocks will not be invoked concurrently. - pub fn with_target_queue(label: &str, attr: QueueAttribute, target: &Queue) - -> Self { + pub fn with_target_queue(label: &str, attr: QueueAttribute, target: &Queue) -> Self { let queue = Queue::create(label, attr); unsafe { dispatch_set_target_queue(queue.ptr, target.ptr); @@ -134,7 +130,10 @@ impl Queue { /// Submits a closure for execution on self and waits until it completes. pub fn exec_sync(&self, work: F) -> T - where F: Send + FnOnce() -> T, T: Send { + where + F: Send + FnOnce() -> T, + T: Send, + { let mut result = None; { let result_ref = &mut result; @@ -154,7 +153,10 @@ impl Queue { /// Submits a closure for asynchronous execution on self and returns /// immediately. - pub fn exec_async(&self, work: F) where F: 'static + Send + FnOnce() { + pub fn exec_async(&self, work: F) + where + F: 'static + Send + FnOnce(), + { let (context, work) = context_and_function(work); unsafe { dispatch_async_f(self.ptr, context, work); @@ -164,7 +166,9 @@ impl Queue { /// After the specified delay, submits a closure for asynchronous execution /// on self. pub fn exec_after(&self, delay: Duration, work: F) - where F: 'static + Send + FnOnce() { + where + F: 'static + Send + FnOnce(), + { let when = time_after_delay(delay); let (context, work) = context_and_function(work); unsafe { @@ -175,7 +179,9 @@ impl Queue { /// Submits a closure to be executed on self the given number of iterations /// and waits until it completes. pub fn apply(&self, iterations: usize, work: F) - where F: Sync + Fn(usize) { + where + F: Sync + Fn(usize), + { let (context, work) = context_and_apply_function(&work); unsafe { dispatch_apply_f(iterations, self.ptr, context, work); @@ -185,10 +191,13 @@ impl Queue { /// Submits a closure to be executed on self for each element of the /// provided slice and waits until it completes. pub fn for_each(&self, slice: &mut [T], work: F) - where F: Sync + Fn(&mut T), T: Send { + where + F: Sync + Fn(&mut T), + T: Send, + { let slice_ptr = slice.as_mut_ptr(); let work = move |i| unsafe { - work(&mut *slice_ptr.offset(i as isize)); + work(&mut *slice_ptr.add(i)); }; let (context, work) = context_and_apply_function(&work); unsafe { @@ -199,7 +208,11 @@ impl Queue { /// Submits a closure to be executed on self for each element of the /// provided vector and returns a `Vec` of the mapped elements. pub fn map(&self, vec: Vec, work: F) -> Vec - where F: Sync + Fn(T) -> U, T: Send, U: Send { + where + F: Sync + Fn(T) -> U, + T: Send, + U: Send, + { let mut src = vec; let len = src.len(); let src_ptr = src.as_ptr(); @@ -208,8 +221,8 @@ impl Queue { let dest_ptr = dest.as_mut_ptr(); let work = move |i| unsafe { - let result = work(ptr::read(src_ptr.offset(i as isize))); - ptr::write(dest_ptr.offset(i as isize), result); + let result = work(ptr::read(src_ptr.add(i))); + ptr::write(dest_ptr.add(i), result); }; let (context, work) = context_and_apply_function(&work); unsafe { @@ -234,7 +247,10 @@ impl Queue { /// If self is a serial queue or one of the global concurrent queues, /// this method behaves like the normal `sync` method. pub fn barrier_sync(&self, work: F) -> T - where F: Send + FnOnce() -> T, T: Send { + where + F: Send + FnOnce() -> T, + T: Send, + { let mut result = None; { let result_ref = &mut result; @@ -265,7 +281,9 @@ impl Queue { /// If self is a serial queue or one of the global concurrent queues, /// this method behaves like the normal `async` method. pub fn barrier_async(&self, work: F) - where F: 'static + Send + FnOnce() { + where + F: 'static + Send + FnOnce(), + { let (context, work) = context_and_function(work); unsafe { dispatch_barrier_async_f(self.ptr, context, work); @@ -283,8 +301,8 @@ impl Queue { } } -unsafe impl Sync for Queue { } -unsafe impl Send for Queue { } +unsafe impl Sync for Queue {} +unsafe impl Send for Queue {} impl Clone for Queue { fn clone(&self) -> Self { @@ -314,11 +332,13 @@ impl SuspendGuard { unsafe { dispatch_suspend(queue.ptr); } - SuspendGuard { queue: queue.clone() } + SuspendGuard { + queue: queue.clone(), + } } /// Drops self, allowing the suspended `Queue` to resume. - pub fn resume(self) { } + pub fn resume(self) {} } impl Clone for SuspendGuard { @@ -337,10 +357,10 @@ impl Drop for SuspendGuard { #[cfg(test)] mod tests { + use super::*; + use crate::Group; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; - use crate::Group; - use super::*; fn async_increment(queue: &Queue, num: &Arc>) { let num = num.clone(); diff --git a/src/sem.rs b/src/sem.rs index b6faa24..bb70826 100644 --- a/src/sem.rs +++ b/src/sem.rs @@ -18,26 +18,20 @@ impl Semaphore { /// successful calls to `wait` than `signal`, the system assumes the /// `Semaphore` is still in use and will abort if it is disposed. pub fn new(value: u32) -> Self { - let ptr = unsafe { - dispatch_semaphore_create(value as c_long) - }; + let ptr = unsafe { dispatch_semaphore_create(value as c_long) }; Semaphore { ptr } } /// Wait for (decrement) self. pub fn wait(&self) { - let result = unsafe { - dispatch_semaphore_wait(self.ptr, DISPATCH_TIME_FOREVER) - }; + let result = unsafe { dispatch_semaphore_wait(self.ptr, DISPATCH_TIME_FOREVER) }; assert!(result == 0, "Dispatch semaphore wait errored"); } /// Wait for (decrement) self until the specified timeout has elapsed. pub fn wait_timeout(&self, timeout: Duration) -> Result<(), WaitTimeout> { let when = time_after_delay(timeout); - let result = unsafe { - dispatch_semaphore_wait(self.ptr, when) - }; + let result = unsafe { dispatch_semaphore_wait(self.ptr, when) }; if result == 0 { Ok(()) } else { @@ -50,9 +44,7 @@ impl Semaphore { /// If the previous value was less than zero, this method wakes a waiting thread. /// Returns `true` if a thread is woken or `false` otherwise. pub fn signal(&self) -> bool { - unsafe { - dispatch_semaphore_signal(self.ptr) != 0 - } + unsafe { dispatch_semaphore_signal(self.ptr) != 0 } } /// Wait to access a resource protected by self. @@ -64,8 +56,7 @@ impl Semaphore { /// Wait until the specified timeout to access a resource protected by self. /// This decrements self and returns a guard that increments when dropped. - pub fn access_timeout(&self, timeout: Duration) - -> Result { + pub fn access_timeout(&self, timeout: Duration) -> Result { self.wait_timeout(timeout)?; Ok(SemaphoreGuard::new(self.clone())) } @@ -103,7 +94,7 @@ impl SemaphoreGuard { } /// Drops self, signaling the `Semaphore`. - pub fn signal(self) { } + pub fn signal(self) {} } impl Drop for SemaphoreGuard { diff --git a/src/source.rs b/src/source.rs new file mode 100644 index 0000000..f2bfd8f --- /dev/null +++ b/src/source.rs @@ -0,0 +1,154 @@ +use std::fmt::{Debug, Formatter, Result as FmtResult}; +use std::io::{Error, ErrorKind}; +use std::mem::transmute; +use std::os::raw::c_void; +use std::ptr; +use std::time::Duration; + +use crate::ffi::*; +use crate::time_after_delay; + +/// Source: https://stackoverflow.com/a/32270215/5214809 +extern "C" fn timer_handler(arg: *mut c_void) { + let closure: &mut Box = + unsafe { &mut *(arg as *mut std::boxed::Box) }; + closure(); +} + +/// A timer node. +/// +/// Schedule a timer to run a block of code after a delay using `TimerNode::schedule`. +/// Drop the timer to cancel it. +pub struct TimerNode { + timer: dispatch_source_t, +} + +impl Debug for TimerNode { + fn fmt(&self, f: &mut Formatter) -> FmtResult { + write!(f, "TimerNode {{ timer: {:?} }}", self.timer) + } +} + +impl TimerNode { + /// Schedules a timer to run a block of code after a delay. + /// + /// The block will be run on the default global queue. + /// + /// # Arguments + /// - interval: The interval between executions of the block. + /// - delay: The delay before the first execution of the block. + /// - leeway: The leeway to apply to the timer. + /// - block: The block of code to execute. + /// + /// # Returns + /// A `Result` containing either a `TimerNode` or an `Error`. + /// + pub fn schedule( + interval: Duration, + delay: Duration, + leeway: Option, + block: F, + ) -> Result + where + F: FnMut() + Send, + { + let context: Box> = Box::new(Box::new(block)); + let timer = unsafe { + dispatch_source_create( + DISPATCH_SOURCE_TYPE_TIMER, + std::ptr::null(), + 0, + ptr::null_mut(), + ) + }; + if timer.is_null() { + return Err(Error::new(ErrorKind::Other, "Failed to create timer")); + } + let when = time_after_delay(delay); + let leeway = leeway.unwrap_or(Duration::from_millis(0)); + unsafe { + dispatch_set_context( + timer, + transmute::< + std::boxed::Box>, + *mut std::ffi::c_void, + >(context), + ); + dispatch_source_set_event_handler_f(timer, timer_handler); + dispatch_source_set_timer( + timer, + when, + interval.as_nanos() as u64, + leeway.as_nanos() as u64, + ); + dispatch_resume(timer); + } + let node = TimerNode { timer }; + Ok(node) + } + + /// Update the timer with a new interval and delay. + /// + /// # Arguments + /// - interval: The new interval between executions of the block. + /// - delay: The new delay before the first execution of the block. + /// - leeway: The new leeway to apply to the timer. + pub fn update(&self, interval: Duration, delay: Duration, leeway: Option) { + let when = time_after_delay(delay); + let leeway = leeway.unwrap_or(Duration::from_millis(0)); + unsafe { + dispatch_suspend(self.timer); + dispatch_source_set_timer( + self.timer, + when, + interval.as_nanos() as u64, + leeway.as_nanos() as u64, + ); + dispatch_resume(self.timer); + }; + } + + /// Cancel the timer. + fn cancel(&self) { + unsafe { + let context = dispatch_get_context(self.timer); + dispatch_source_cancel(self.timer); + dispatch_release(self.timer); + let _: Box> = Box::from_raw(context as *mut _); + } + } +} + +impl Drop for TimerNode { + fn drop(&mut self) { + self.cancel(); + } +} + +#[cfg(test)] +mod test { + use crate::source::TimerNode; + use std::sync::{Arc, Mutex}; + use std::thread; + use std::time::Duration; + + #[test] + fn test_timer() { + let count = Arc::new(Mutex::new(0)); + let count_clone = count.clone(); + let block = move || { + let mut count = count_clone.lock().unwrap(); + *count += 1; + println!("Hello, counter! {}", *count); + }; + let _node = TimerNode::schedule( + Duration::from_millis(10), + Duration::from_secs(0), + None, + block, + ) + .unwrap(); + thread::sleep(Duration::from_millis(100)); + assert!(*count.lock().unwrap() >= 10); + } +}