From 1830fc9ad3f763bc330c4baf0477f8c23c77b125 Mon Sep 17 00:00:00 2001 From: "Sunip K. Mukherjee" Date: Fri, 29 Dec 2023 16:32:21 -0500 Subject: [PATCH 1/3] Implemented GCD timer --- Cargo.toml | 4 +- README.md | 31 ++++++++++++ src/ffi.rs | 34 ++++++++++--- src/lib.rs | 34 +++++++++++++ src/source.rs | 136 ++++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 231 insertions(+), 8 deletions(-) create mode 100644 src/source.rs diff --git a/Cargo.toml b/Cargo.toml index 8cc82a8..148fccc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "dispatch" -version = "0.2.0" -authors = ["Steven Sheldon"] +version = "0.2.1" +authors = ["Steven Sheldon", "Sunip K. Mukherjee"] edition = "2018" description = "Rust wrapper for Apple's Grand Central Dispatch." diff --git a/README.md b/README.md index 42bda2e..bf73555 100644 --- a/README.md +++ b/README.md @@ -42,3 +42,34 @@ assert!(nums == [2, 3]); let nums = queue.map(nums, |x| x.to_string()); assert!(nums[0] == "2"); ``` + +# Timer Events + +GCD provides a timer facility that can be used to schedule blocks of code to +execute periodically, starting after a delay. The `TimerNode` type is a wrapper +around a dispatch source that can be used to schedule timer events. + +`TimerNode` has the `schedule` method to schedule a timer event, the `update` +method to update the timer's interval, and the `cancel` method to cancel the +timer. Dropping a `TimerNode` will cancel the timer. + +``` +use dispatch::TimerNode; +use std::time::Duration; +use std::thread::sleep; +use std::sync::{Arc, Mutex}; + +let count = Arc::new(Mutex::new(0)); +let count_clone = count.clone(); +let mut timer = TimerNode::schedule(Duration::from_millis(10), Duration::from_secs(0), None, move || { + let mut count = count_clone.lock().unwrap(); + *count += 1; + println!("Hello, counter! -> {}", *count); +}).unwrap(); +sleep(Duration::from_millis(100)); +timer.update(Duration::from_millis(20), Duration::from_secs(0)); +sleep(Duration::from_millis(100)); +timer.cancel(); +println!("Counter: {}", *count.lock().unwrap()); +assert!(*count.lock().unwrap() >= 15); +``` \ No newline at end of file diff --git a/src/ffi.rs b/src/ffi.rs index 91dad6f..be478d7 100644 --- a/src/ffi.rs +++ b/src/ffi.rs @@ -11,6 +11,8 @@ pub type dispatch_function_t = extern fn(*mut c_void); pub type dispatch_semaphore_t = *mut dispatch_object_s; pub type dispatch_group_t = *mut dispatch_object_s; pub type dispatch_object_t = *mut dispatch_object_s; +pub type dispatch_source_t = *mut dispatch_object_s; +pub type dispatch_source_type_t = *const dispatch_object_s; pub type dispatch_once_t = c_long; pub type dispatch_queue_t = *mut dispatch_object_s; pub type dispatch_time_t = u64; @@ -32,6 +34,16 @@ pub type dispatch_queue_attr_t = *const dispatch_object_s; extern { static _dispatch_main_q: dispatch_object_s; static _dispatch_queue_attr_concurrent: dispatch_object_s; + static _dispatch_source_type_data_add: dispatch_object_s; + static _dispatch_source_type_data_or: dispatch_object_s; + static _dispatch_source_type_mach_recv: dispatch_object_s; + static _dispatch_source_type_mach_send: dispatch_object_s; + static _dispatch_source_type_proc: dispatch_object_s; + static _dispatch_source_type_read: dispatch_object_s; + static _dispatch_source_type_signal: dispatch_object_s; + static _dispatch_source_type_timer: dispatch_object_s; + static _dispatch_source_type_vnode: dispatch_object_s; + static _dispatch_source_type_write: dispatch_object_s; pub fn dispatch_get_global_queue(identifier: c_long, flags: c_ulong) -> dispatch_queue_t; pub fn dispatch_queue_create(label: *const c_char, attr: dispatch_queue_attr_t) -> dispatch_queue_t; @@ -76,9 +88,8 @@ extern { pub fn dispatch_barrier_async_f(queue: dispatch_queue_t, context: *mut c_void, work: dispatch_function_t); // void dispatch_barrier_sync ( dispatch_queue_t queue, dispatch_block_t block ); pub fn dispatch_barrier_sync_f(queue: dispatch_queue_t, context: *mut c_void, work: dispatch_function_t); - - // void dispatch_source_cancel ( dispatch_source_t source ); - // dispatch_source_t dispatch_source_create ( dispatch_source_type_t type, uintptr_t handle, unsigned long mask, dispatch_queue_t queue ); + pub fn dispatch_source_cancel(source: dispatch_source_t); + pub fn dispatch_source_create(type_: dispatch_source_type_t, handle: *const c_void, mask: c_ulong, queue: dispatch_queue_t) -> dispatch_object_t; // unsigned long dispatch_source_get_data ( dispatch_source_t source ); // uintptr_t dispatch_source_get_handle ( dispatch_source_t source ); // unsigned long dispatch_source_get_mask ( dispatch_source_t source ); @@ -86,10 +97,10 @@ extern { // void dispatch_source_set_registration_handler ( dispatch_source_t source, dispatch_block_t handler ); // void dispatch_source_set_registration_handler_f ( dispatch_source_t source, dispatch_function_t handler ); // void dispatch_source_set_cancel_handler ( dispatch_source_t source, dispatch_block_t handler ); - // void dispatch_source_set_cancel_handler_f ( dispatch_source_t source, dispatch_function_t handler ); + pub fn dispatch_source_set_cancel_handler_f(source: dispatch_source_t, handler: dispatch_function_t); // void dispatch_source_set_event_handler ( dispatch_source_t source, dispatch_block_t handler ); - // void dispatch_source_set_event_handler_f ( dispatch_source_t source, dispatch_function_t handler ); - // void dispatch_source_set_timer ( dispatch_source_t source, dispatch_time_t start, uint64_t interval, uint64_t leeway ); + pub fn dispatch_source_set_event_handler_f(source: dispatch_source_t, handler: dispatch_function_t); + pub fn dispatch_source_set_timer(source: dispatch_source_t, start: dispatch_time_t, interval: u64, leeway: u64); // long dispatch_source_testcancel ( dispatch_source_t source ); // void dispatch_read ( dispatch_fd_t fd, size_t length, dispatch_queue_t queue, void (^handler)(dispatch_data_t data, int error) ); @@ -138,6 +149,17 @@ pub fn dispatch_get_main_queue() -> dispatch_queue_t { pub const DISPATCH_QUEUE_SERIAL: dispatch_queue_attr_t = 0 as dispatch_queue_attr_t; pub static DISPATCH_QUEUE_CONCURRENT: &'static dispatch_object_s = unsafe { &_dispatch_queue_attr_concurrent }; +pub static DISPATCH_SOURCE_TYPE_DATA_ADD: &'static dispatch_object_s = unsafe { &_dispatch_source_type_data_add }; +pub static DISPATCH_SOURCE_TYPE_DATA_OR: &'static dispatch_object_s = unsafe { &_dispatch_source_type_data_or }; +pub static DISPATCH_SOURCE_TYPE_MACH_RECV: &'static dispatch_object_s = unsafe { &_dispatch_source_type_mach_recv }; +pub static DISPATCH_SOURCE_TYPE_MACH_SEND: &'static dispatch_object_s = unsafe { &_dispatch_source_type_mach_send }; +pub static DISPATCH_SOURCE_TYPE_PROC: &'static dispatch_object_s = unsafe { &_dispatch_source_type_proc }; +pub static DISPATCH_SOURCE_TYPE_READ: &'static dispatch_object_s = unsafe { &_dispatch_source_type_read }; +pub static DISPATCH_SOURCE_TYPE_SIGNAL: &'static dispatch_object_s = unsafe { &_dispatch_source_type_signal }; +pub static DISPATCH_SOURCE_TYPE_TIMER: &'static dispatch_object_s = unsafe { &_dispatch_source_type_timer }; +pub static DISPATCH_SOURCE_TYPE_VNODE: &'static dispatch_object_s = unsafe { &_dispatch_source_type_vnode }; +pub static DISPATCH_SOURCE_TYPE_WRITE: &'static dispatch_object_s = unsafe { &_dispatch_source_type_write }; + pub const DISPATCH_QUEUE_PRIORITY_HIGH: c_long = 2; pub const DISPATCH_QUEUE_PRIORITY_DEFAULT: c_long = 0; pub const DISPATCH_QUEUE_PRIORITY_LOW: c_long = -2; diff --git a/src/lib.rs b/src/lib.rs index c5cd981..359dc94 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -40,6 +40,38 @@ assert!(nums == [2, 3]); let nums = queue.map(nums, |x| x.to_string()); assert!(nums[0] == "2"); ``` + +# Timer Events + +GCD provides a timer facility that can be used to schedule blocks of code to +execute periodically, starting after a delay. The `TimerNode` type is a wrapper +around a dispatch source that can be used to schedule timer events. + +`TimerNode` has the `schedule` method to schedule a timer event, the `update` +method to update the timer's interval, and the `cancel` method to cancel the +timer. Dropping a `TimerNode` will cancel the timer. + +``` +use dispatch::TimerNode; +use std::time::Duration; +use std::thread::sleep; +use std::sync::{Arc, Mutex}; + +let count = Arc::new(Mutex::new(0)); +let count_clone = count.clone(); +let mut timer = TimerNode::schedule(Duration::from_millis(10), Duration::from_secs(0), None, move || { + let mut count = count_clone.lock().unwrap(); + *count += 1; + println!("Hello, counter! -> {}", *count); +}).unwrap(); +sleep(Duration::from_millis(100)); +timer.update(Duration::from_millis(20), Duration::from_secs(0)); +sleep(Duration::from_millis(100)); +timer.cancel(); +println!("Counter: {}", *count.lock().unwrap()); +assert!(*count.lock().unwrap() >= 15); +``` + */ #![warn(missing_docs)] @@ -56,6 +88,7 @@ pub use crate::group::{Group, GroupGuard}; pub use crate::once::Once; pub use crate::queue::{Queue, QueueAttribute, QueuePriority, SuspendGuard}; pub use crate::sem::{Semaphore, SemaphoreGuard}; +pub use crate::source::TimerNode; /// Raw foreign function interface for libdispatch. pub mod ffi; @@ -63,6 +96,7 @@ mod group; mod queue; mod once; mod sem; +mod source; /// An error indicating a wait timed out. #[derive(Clone, Debug)] diff --git a/src/source.rs b/src/source.rs new file mode 100644 index 0000000..0cf6775 --- /dev/null +++ b/src/source.rs @@ -0,0 +1,136 @@ +use std::cell::RefCell; +use std::fmt::{Debug, Formatter, Result as FmtResult}; +use std::io::{Error, ErrorKind}; +use std::mem::transmute; +use std::os::raw::c_void; +use std::ptr; +use std::time::Duration; + +use crate::ffi::*; +use crate::time_after_delay; + +/// Source: https://stackoverflow.com/a/32270215/5214809 +extern "C" fn timer_handler(arg: *mut c_void) { + let closure: &mut Box = unsafe { transmute(arg) }; + closure(); +} + +/// A timer node. +pub struct TimerNode { + timer: RefCell>, +} + +impl Debug for TimerNode { + fn fmt(&self, f: &mut Formatter) -> FmtResult { + write!(f, "TimerNode {{ timer: {:?} }}", self.timer) + } +} + +impl TimerNode { + /// Schedules a timer to run a block of code after a delay. + /// + /// The block will be run on the default global queue. + /// + /// # Arguments + /// - interval: The interval between executions of the block. + /// - delay: The delay before the first execution of the block. + /// - leeway: The leeway to apply to the timer. + /// - block: The block of code to execute. + /// + /// # Returns + /// A `Result` containing either a `TimerNode` or an `Error`. + /// + pub fn schedule(interval: Duration, delay: Duration, leeway: Option, block: F) -> Result + where + F: FnMut() + Send, + { + let context: Box> = Box::new(Box::new(block)); + let timer = unsafe { + dispatch_source_create( + DISPATCH_SOURCE_TYPE_TIMER, + std::ptr::null(), + 0, + ptr::null_mut(), + ) + }; + if timer.is_null() { + return Err(Error::new(ErrorKind::Other, "Failed to create timer")); + } + let when = time_after_delay(delay); + let leeway = leeway.unwrap_or(Duration::from_millis(0)); + unsafe { + dispatch_set_context(timer, transmute(context)); + dispatch_source_set_event_handler_f(timer, timer_handler); + dispatch_source_set_timer(timer, when, interval.as_nanos() as u64, leeway.as_nanos() as u64); + dispatch_resume(timer); + } + let node = TimerNode { timer: RefCell::new(Some(timer)) }; + Ok(node) + } + + /// Update the timer with a new interval and delay. + /// + /// # Arguments + /// - interval: The new interval between executions of the block. + /// - delay: The new delay before the first execution of the block. + pub fn update(&self, interval: Duration, delay: Duration) { + let timer = *self.timer.borrow(); + match timer { + Some(timer) => { + let when = time_after_delay(delay); + unsafe { + dispatch_suspend(timer); + dispatch_source_set_timer(timer, when, interval.as_nanos() as u64, 0); + dispatch_resume(timer); + } + } + None => {} + } + } + + /// Cancel the timer. + pub fn cancel(&self) { + let mut timer = self.timer.borrow_mut(); + match *timer { + Some(timer) => { + unsafe { + let context = dispatch_get_context(timer); + dispatch_source_cancel(timer); + dispatch_release(timer); + let _: Box> = Box::from_raw(context as *mut _); + } + } + None => {} + } + *timer = None; + } +} + +impl Drop for TimerNode { + fn drop(&mut self) { + self.cancel(); + } +} + +#[cfg(test)] +mod test { + use crate::source::TimerNode; + use std::sync::{Arc, Mutex}; + use std::thread; + use std::time::Duration; + + #[test] + fn test_timer() { + let count = Arc::new(Mutex::new(0)); + let count_clone = count.clone(); + let block = move || { + let mut count = count_clone.lock().unwrap(); + *count += 1; + println!("Hello, counter! {}", *count); + }; + let _node = + TimerNode::schedule(Duration::from_millis(10), Duration::from_secs(0), None, block).unwrap(); + thread::sleep(Duration::from_millis(100)); + assert!(*count.lock().unwrap() >= 10); + } +} From 89eae1cbf9511916e542b01c90f61358c6cdb8c7 Mon Sep 17 00:00:00 2001 From: "Sunip K. Mukherjee" Date: Tue, 2 Jan 2024 13:03:02 -0500 Subject: [PATCH 2/3] Timer API update: removed explicit cancel method, simply drop the timer to cancel --- README.md | 6 +++--- src/lib.rs | 4 ++-- src/source.rs | 49 ++++++++++++++++++++----------------------------- 3 files changed, 25 insertions(+), 34 deletions(-) diff --git a/README.md b/README.md index bf73555..907bee5 100644 --- a/README.md +++ b/README.md @@ -53,7 +53,7 @@ around a dispatch source that can be used to schedule timer events. method to update the timer's interval, and the `cancel` method to cancel the timer. Dropping a `TimerNode` will cancel the timer. -``` +```rust use dispatch::TimerNode; use std::time::Duration; use std::thread::sleep; @@ -67,9 +67,9 @@ let mut timer = TimerNode::schedule(Duration::from_millis(10), Duration::from_se println!("Hello, counter! -> {}", *count); }).unwrap(); sleep(Duration::from_millis(100)); -timer.update(Duration::from_millis(20), Duration::from_secs(0)); +timer.update(Duration::from_millis(20), Duration::from_secs(0), None); // change the time period sleep(Duration::from_millis(100)); -timer.cancel(); +drop(timer); // cancel the timer println!("Counter: {}", *count.lock().unwrap()); assert!(*count.lock().unwrap() >= 15); ``` \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 359dc94..df6e850 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -65,9 +65,9 @@ let mut timer = TimerNode::schedule(Duration::from_millis(10), Duration::from_se println!("Hello, counter! -> {}", *count); }).unwrap(); sleep(Duration::from_millis(100)); -timer.update(Duration::from_millis(20), Duration::from_secs(0)); +timer.update(Duration::from_millis(20), Duration::from_secs(0), None); // change the time period sleep(Duration::from_millis(100)); -timer.cancel(); +drop(timer); // cancel the timer println!("Counter: {}", *count.lock().unwrap()); assert!(*count.lock().unwrap() >= 15); ``` diff --git a/src/source.rs b/src/source.rs index 0cf6775..161df2f 100644 --- a/src/source.rs +++ b/src/source.rs @@ -1,4 +1,3 @@ -use std::cell::RefCell; use std::fmt::{Debug, Formatter, Result as FmtResult}; use std::io::{Error, ErrorKind}; use std::mem::transmute; @@ -16,8 +15,11 @@ extern "C" fn timer_handler(arg: *mut c_void) { } /// A timer node. +/// +/// Schedule a timer to run a block of code after a delay using `TimerNode::schedule`. +/// Drop the timer to cancel it. pub struct TimerNode { - timer: RefCell>, + timer: dispatch_source_t, } impl Debug for TimerNode { @@ -64,7 +66,7 @@ impl TimerNode { dispatch_source_set_timer(timer, when, interval.as_nanos() as u64, leeway.as_nanos() as u64); dispatch_resume(timer); } - let node = TimerNode { timer: RefCell::new(Some(timer)) }; + let node = TimerNode { timer: timer }; Ok(node) } @@ -73,36 +75,25 @@ impl TimerNode { /// # Arguments /// - interval: The new interval between executions of the block. /// - delay: The new delay before the first execution of the block. - pub fn update(&self, interval: Duration, delay: Duration) { - let timer = *self.timer.borrow(); - match timer { - Some(timer) => { - let when = time_after_delay(delay); - unsafe { - dispatch_suspend(timer); - dispatch_source_set_timer(timer, when, interval.as_nanos() as u64, 0); - dispatch_resume(timer); - } - } - None => {} - } + /// - leeway: The new leeway to apply to the timer. + pub fn update(&self, interval: Duration, delay: Duration, leeway: Option) { + let when = time_after_delay(delay); + let leeway = leeway.unwrap_or(Duration::from_millis(0)); + unsafe { + dispatch_suspend(self.timer); + dispatch_source_set_timer(self.timer, when, interval.as_nanos() as u64, leeway.as_nanos() as u64); + dispatch_resume(self.timer); + }; } /// Cancel the timer. - pub fn cancel(&self) { - let mut timer = self.timer.borrow_mut(); - match *timer { - Some(timer) => { - unsafe { - let context = dispatch_get_context(timer); - dispatch_source_cancel(timer); - dispatch_release(timer); - let _: Box> = Box::from_raw(context as *mut _); - } - } - None => {} + fn cancel(&self) { + unsafe { + let context = dispatch_get_context(self.timer); + dispatch_source_cancel(self.timer); + dispatch_release(self.timer); + let _: Box> = Box::from_raw(context as *mut _); } - *timer = None; } } From 1150158a9f0eb62ce593e115e1b56c3fa2720ac7 Mon Sep 17 00:00:00 2001 From: "Sunip K. Mukherjee" Date: Mon, 9 Sep 2024 20:25:16 -0400 Subject: [PATCH 3/3] Lint --- Cargo.toml | 2 +- examples/main.rs | 2 +- src/ffi.rs | 152 ++++++++++++++++++++++++++++++++++------------- src/group.rs | 40 +++++++------ src/lib.rs | 91 +++++++++++++++++++--------- src/once.rs | 23 +++++-- src/queue.rs | 74 ++++++++++++++--------- src/sem.rs | 21 ++----- src/source.rs | 45 +++++++++++--- 9 files changed, 305 insertions(+), 145 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 148fccc..b09450a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dispatch" -version = "0.2.1" +version = "0.2.2" authors = ["Steven Sheldon", "Sunip K. Mukherjee"] edition = "2018" diff --git a/examples/main.rs b/examples/main.rs index c92dcf1..6841a7e 100644 --- a/examples/main.rs +++ b/examples/main.rs @@ -1,8 +1,8 @@ extern crate dispatch; +use dispatch::{Queue, QueuePriority}; use std::io; use std::process::exit; -use dispatch::{Queue, QueuePriority}; /// Prompts for a number and adds it to the given sum. /// diff --git a/src/ffi.rs b/src/ffi.rs index be478d7..3e8225d 100644 --- a/src/ffi.rs +++ b/src/ffi.rs @@ -4,10 +4,12 @@ use std::os::raw::{c_char, c_long, c_ulong, c_void}; #[repr(C)] -pub struct dispatch_object_s { _private: [u8; 0] } +pub struct dispatch_object_s { + _private: [u8; 0], +} // dispatch_block_t -pub type dispatch_function_t = extern fn(*mut c_void); +pub type dispatch_function_t = extern "C" fn(*mut c_void); pub type dispatch_semaphore_t = *mut dispatch_object_s; pub type dispatch_group_t = *mut dispatch_object_s; pub type dispatch_object_t = *mut dispatch_object_s; @@ -27,11 +29,15 @@ pub type dispatch_time_t = u64; // dispatch_io_interval_flags_t pub type dispatch_queue_attr_t = *const dispatch_object_s; -#[cfg_attr(any(target_os = "macos", target_os = "ios"), - link(name = "System", kind = "dylib"))] -#[cfg_attr(not(any(target_os = "macos", target_os = "ios")), - link(name = "dispatch", kind = "dylib"))] -extern { +#[cfg_attr( + any(target_os = "macos", target_os = "ios"), + link(name = "System", kind = "dylib") +)] +#[cfg_attr( + not(any(target_os = "macos", target_os = "ios")), + link(name = "dispatch", kind = "dylib") +)] +extern "C" { static _dispatch_main_q: dispatch_object_s; static _dispatch_queue_attr_concurrent: dispatch_object_s; static _dispatch_source_type_data_add: dispatch_object_s; @@ -46,30 +52,65 @@ extern { static _dispatch_source_type_write: dispatch_object_s; pub fn dispatch_get_global_queue(identifier: c_long, flags: c_ulong) -> dispatch_queue_t; - pub fn dispatch_queue_create(label: *const c_char, attr: dispatch_queue_attr_t) -> dispatch_queue_t; + pub fn dispatch_queue_create( + label: *const c_char, + attr: dispatch_queue_attr_t, + ) -> dispatch_queue_t; // dispatch_queue_attr_t dispatch_queue_attr_make_with_qos_class ( dispatch_queue_attr_t attr, dispatch_qos_class_t qos_class, int relative_priority ); pub fn dispatch_queue_get_label(queue: dispatch_queue_t) -> *const c_char; pub fn dispatch_set_target_queue(object: dispatch_object_t, queue: dispatch_queue_t); pub fn dispatch_main(); // void dispatch_async ( dispatch_queue_t queue, dispatch_block_t block ); - pub fn dispatch_async_f(queue: dispatch_queue_t, context: *mut c_void, work: dispatch_function_t); + pub fn dispatch_async_f( + queue: dispatch_queue_t, + context: *mut c_void, + work: dispatch_function_t, + ); // void dispatch_sync ( dispatch_queue_t queue, dispatch_block_t block ); - pub fn dispatch_sync_f(queue: dispatch_queue_t, context: *mut c_void, work: dispatch_function_t); + pub fn dispatch_sync_f( + queue: dispatch_queue_t, + context: *mut c_void, + work: dispatch_function_t, + ); // void dispatch_after ( dispatch_time_t when, dispatch_queue_t queue, dispatch_block_t block ); - pub fn dispatch_after_f(when: dispatch_time_t, queue: dispatch_queue_t, context: *mut c_void, work: dispatch_function_t); + pub fn dispatch_after_f( + when: dispatch_time_t, + queue: dispatch_queue_t, + context: *mut c_void, + work: dispatch_function_t, + ); // void dispatch_apply ( size_t iterations, dispatch_queue_t queue, void (^block)(size_t) ); - pub fn dispatch_apply_f(iterations: usize, queue: dispatch_queue_t, context: *mut c_void, work: extern fn(*mut c_void, usize)); + pub fn dispatch_apply_f( + iterations: usize, + queue: dispatch_queue_t, + context: *mut c_void, + work: extern "C" fn(*mut c_void, usize), + ); // void dispatch_once ( dispatch_once_t *predicate, dispatch_block_t block ); - pub fn dispatch_once_f(predicate: *mut dispatch_once_t, context: *mut c_void, function: dispatch_function_t); + pub fn dispatch_once_f( + predicate: *mut dispatch_once_t, + context: *mut c_void, + function: dispatch_function_t, + ); // void dispatch_group_async ( dispatch_group_t group, dispatch_queue_t queue, dispatch_block_t block ); - pub fn dispatch_group_async_f(group: dispatch_group_t, queue: dispatch_queue_t, context: *mut c_void, work: dispatch_function_t); + pub fn dispatch_group_async_f( + group: dispatch_group_t, + queue: dispatch_queue_t, + context: *mut c_void, + work: dispatch_function_t, + ); pub fn dispatch_group_create() -> dispatch_group_t; pub fn dispatch_group_enter(group: dispatch_group_t); pub fn dispatch_group_leave(group: dispatch_group_t); // void dispatch_group_notify ( dispatch_group_t group, dispatch_queue_t queue, dispatch_block_t block ); - pub fn dispatch_group_notify_f(group: dispatch_group_t, queue: dispatch_queue_t, context: *mut c_void, work: dispatch_function_t); + pub fn dispatch_group_notify_f( + group: dispatch_group_t, + queue: dispatch_queue_t, + context: *mut c_void, + work: dispatch_function_t, + ); pub fn dispatch_group_wait(group: dispatch_group_t, timeout: dispatch_time_t) -> c_long; pub fn dispatch_get_context(object: dispatch_object_t) -> *mut c_void; @@ -82,14 +123,28 @@ extern { pub fn dispatch_semaphore_create(value: c_long) -> dispatch_semaphore_t; pub fn dispatch_semaphore_signal(dsema: dispatch_semaphore_t) -> c_long; - pub fn dispatch_semaphore_wait(dsema: dispatch_semaphore_t, timeout: dispatch_time_t) -> c_long; + pub fn dispatch_semaphore_wait(dsema: dispatch_semaphore_t, timeout: dispatch_time_t) + -> c_long; // void dispatch_barrier_async ( dispatch_queue_t queue, dispatch_block_t block ); - pub fn dispatch_barrier_async_f(queue: dispatch_queue_t, context: *mut c_void, work: dispatch_function_t); + pub fn dispatch_barrier_async_f( + queue: dispatch_queue_t, + context: *mut c_void, + work: dispatch_function_t, + ); // void dispatch_barrier_sync ( dispatch_queue_t queue, dispatch_block_t block ); - pub fn dispatch_barrier_sync_f(queue: dispatch_queue_t, context: *mut c_void, work: dispatch_function_t); + pub fn dispatch_barrier_sync_f( + queue: dispatch_queue_t, + context: *mut c_void, + work: dispatch_function_t, + ); pub fn dispatch_source_cancel(source: dispatch_source_t); - pub fn dispatch_source_create(type_: dispatch_source_type_t, handle: *const c_void, mask: c_ulong, queue: dispatch_queue_t) -> dispatch_object_t; + pub fn dispatch_source_create( + type_: dispatch_source_type_t, + handle: *const c_void, + mask: c_ulong, + queue: dispatch_queue_t, + ) -> dispatch_object_t; // unsigned long dispatch_source_get_data ( dispatch_source_t source ); // uintptr_t dispatch_source_get_handle ( dispatch_source_t source ); // unsigned long dispatch_source_get_mask ( dispatch_source_t source ); @@ -97,10 +152,21 @@ extern { // void dispatch_source_set_registration_handler ( dispatch_source_t source, dispatch_block_t handler ); // void dispatch_source_set_registration_handler_f ( dispatch_source_t source, dispatch_function_t handler ); // void dispatch_source_set_cancel_handler ( dispatch_source_t source, dispatch_block_t handler ); - pub fn dispatch_source_set_cancel_handler_f(source: dispatch_source_t, handler: dispatch_function_t); + pub fn dispatch_source_set_cancel_handler_f( + source: dispatch_source_t, + handler: dispatch_function_t, + ); // void dispatch_source_set_event_handler ( dispatch_source_t source, dispatch_block_t handler ); - pub fn dispatch_source_set_event_handler_f(source: dispatch_source_t, handler: dispatch_function_t); - pub fn dispatch_source_set_timer(source: dispatch_source_t, start: dispatch_time_t, interval: u64, leeway: u64); + pub fn dispatch_source_set_event_handler_f( + source: dispatch_source_t, + handler: dispatch_function_t, + ); + pub fn dispatch_source_set_timer( + source: dispatch_source_t, + start: dispatch_time_t, + interval: u64, + leeway: u64, + ); // long dispatch_source_testcancel ( dispatch_source_t source ); // void dispatch_read ( dispatch_fd_t fd, size_t length, dispatch_queue_t queue, void (^handler)(dispatch_data_t data, int error) ); @@ -147,25 +213,31 @@ pub fn dispatch_get_main_queue() -> dispatch_queue_t { } pub const DISPATCH_QUEUE_SERIAL: dispatch_queue_attr_t = 0 as dispatch_queue_attr_t; -pub static DISPATCH_QUEUE_CONCURRENT: &'static dispatch_object_s = unsafe { &_dispatch_queue_attr_concurrent }; - -pub static DISPATCH_SOURCE_TYPE_DATA_ADD: &'static dispatch_object_s = unsafe { &_dispatch_source_type_data_add }; -pub static DISPATCH_SOURCE_TYPE_DATA_OR: &'static dispatch_object_s = unsafe { &_dispatch_source_type_data_or }; -pub static DISPATCH_SOURCE_TYPE_MACH_RECV: &'static dispatch_object_s = unsafe { &_dispatch_source_type_mach_recv }; -pub static DISPATCH_SOURCE_TYPE_MACH_SEND: &'static dispatch_object_s = unsafe { &_dispatch_source_type_mach_send }; -pub static DISPATCH_SOURCE_TYPE_PROC: &'static dispatch_object_s = unsafe { &_dispatch_source_type_proc }; -pub static DISPATCH_SOURCE_TYPE_READ: &'static dispatch_object_s = unsafe { &_dispatch_source_type_read }; -pub static DISPATCH_SOURCE_TYPE_SIGNAL: &'static dispatch_object_s = unsafe { &_dispatch_source_type_signal }; -pub static DISPATCH_SOURCE_TYPE_TIMER: &'static dispatch_object_s = unsafe { &_dispatch_source_type_timer }; -pub static DISPATCH_SOURCE_TYPE_VNODE: &'static dispatch_object_s = unsafe { &_dispatch_source_type_vnode }; -pub static DISPATCH_SOURCE_TYPE_WRITE: &'static dispatch_object_s = unsafe { &_dispatch_source_type_write }; - -pub const DISPATCH_QUEUE_PRIORITY_HIGH: c_long = 2; -pub const DISPATCH_QUEUE_PRIORITY_DEFAULT: c_long = 0; -pub const DISPATCH_QUEUE_PRIORITY_LOW: c_long = -2; +pub static DISPATCH_QUEUE_CONCURRENT: &dispatch_object_s = + unsafe { &_dispatch_queue_attr_concurrent }; + +pub static DISPATCH_SOURCE_TYPE_DATA_ADD: &dispatch_object_s = + unsafe { &_dispatch_source_type_data_add }; +pub static DISPATCH_SOURCE_TYPE_DATA_OR: &dispatch_object_s = + unsafe { &_dispatch_source_type_data_or }; +pub static DISPATCH_SOURCE_TYPE_MACH_RECV: &dispatch_object_s = + unsafe { &_dispatch_source_type_mach_recv }; +pub static DISPATCH_SOURCE_TYPE_MACH_SEND: &dispatch_object_s = + unsafe { &_dispatch_source_type_mach_send }; +pub static DISPATCH_SOURCE_TYPE_PROC: &dispatch_object_s = unsafe { &_dispatch_source_type_proc }; +pub static DISPATCH_SOURCE_TYPE_READ: &dispatch_object_s = unsafe { &_dispatch_source_type_read }; +pub static DISPATCH_SOURCE_TYPE_SIGNAL: &dispatch_object_s = + unsafe { &_dispatch_source_type_signal }; +pub static DISPATCH_SOURCE_TYPE_TIMER: &dispatch_object_s = unsafe { &_dispatch_source_type_timer }; +pub static DISPATCH_SOURCE_TYPE_VNODE: &dispatch_object_s = unsafe { &_dispatch_source_type_vnode }; +pub static DISPATCH_SOURCE_TYPE_WRITE: &dispatch_object_s = unsafe { &_dispatch_source_type_write }; + +pub const DISPATCH_QUEUE_PRIORITY_HIGH: c_long = 2; +pub const DISPATCH_QUEUE_PRIORITY_DEFAULT: c_long = 0; +pub const DISPATCH_QUEUE_PRIORITY_LOW: c_long = -2; pub const DISPATCH_QUEUE_PRIORITY_BACKGROUND: c_long = -1 << 15; -pub const DISPATCH_TIME_NOW: dispatch_time_t = 0; +pub const DISPATCH_TIME_NOW: dispatch_time_t = 0; pub const DISPATCH_TIME_FOREVER: dispatch_time_t = !0; #[cfg(test)] @@ -177,7 +249,7 @@ mod tests { use std::os::raw::c_void; use std::ptr; - extern fn serial_queue_test_add(num: *mut c_void) { + extern "C" fn serial_queue_test_add(num: *mut c_void) { unsafe { *(num as *mut u32) = 1; } diff --git a/src/group.rs b/src/group.rs index 6ba6c5b..84c0741 100644 --- a/src/group.rs +++ b/src/group.rs @@ -1,8 +1,8 @@ use std::time::Duration; use crate::ffi::*; -use crate::{context_and_function, time_after_delay, WaitTimeout}; use crate::queue::Queue; +use crate::{context_and_function, time_after_delay, WaitTimeout}; /// A Grand Central Dispatch group. /// @@ -18,7 +18,9 @@ impl Group { /// Creates a new dispatch `Group`. pub fn create() -> Group { unsafe { - Group { ptr: dispatch_group_create() } + Group { + ptr: dispatch_group_create(), + } } } @@ -32,7 +34,9 @@ impl Group { /// Submits a closure asynchronously to the given `Queue` and associates it /// with self. pub fn exec_async(&self, queue: &Queue, work: F) - where F: 'static + Send + FnOnce() { + where + F: 'static + Send + FnOnce(), + { let (context, work) = context_and_function(work); unsafe { dispatch_group_async_f(self.ptr, queue.ptr, context, work); @@ -43,7 +47,9 @@ impl Group { /// associated with self have completed. /// If self is empty, the closure is submitted immediately. pub fn notify(&self, queue: &Queue, work: F) - where F: 'static + Send + FnOnce() { + where + F: 'static + Send + FnOnce(), + { let (context, work) = context_and_function(work); unsafe { dispatch_group_notify_f(self.ptr, queue.ptr, context, work); @@ -52,9 +58,7 @@ impl Group { /// Waits synchronously for all tasks associated with self to complete. pub fn wait(&self) { - let result = unsafe { - dispatch_group_wait(self.ptr, DISPATCH_TIME_FOREVER) - }; + let result = unsafe { dispatch_group_wait(self.ptr, DISPATCH_TIME_FOREVER) }; assert!(result == 0, "Dispatch group wait errored"); } @@ -63,9 +67,7 @@ impl Group { /// Returns true if the tasks completed or false if the timeout elapsed. pub fn wait_timeout(&self, timeout: Duration) -> Result<(), WaitTimeout> { let when = time_after_delay(timeout); - let result = unsafe { - dispatch_group_wait(self.ptr, when) - }; + let result = unsafe { dispatch_group_wait(self.ptr, when) }; if result == 0 { Ok(()) } else { @@ -75,15 +77,13 @@ impl Group { /// Returns whether self is currently empty. pub fn is_empty(&self) -> bool { - let result = unsafe { - dispatch_group_wait(self.ptr, DISPATCH_TIME_NOW) - }; + let result = unsafe { dispatch_group_wait(self.ptr, DISPATCH_TIME_NOW) }; result == 0 } } -unsafe impl Sync for Group { } -unsafe impl Send for Group { } +unsafe impl Sync for Group {} +unsafe impl Send for Group {} impl Clone for Group { fn clone(&self) -> Self { @@ -113,11 +113,13 @@ impl GroupGuard { unsafe { dispatch_group_enter(group.ptr); } - GroupGuard { group: group.clone() } + GroupGuard { + group: group.clone(), + } } /// Drops self, leaving the `Group`. - pub fn leave(self) { } + pub fn leave(self) {} } impl Clone for GroupGuard { @@ -136,9 +138,9 @@ impl Drop for GroupGuard { #[cfg(test)] mod tests { - use std::sync::{Arc, Mutex}; - use crate::{Queue, QueueAttribute}; use super::Group; + use crate::{Queue, QueueAttribute}; + use std::sync::{Arc, Mutex}; #[test] fn test_group() { diff --git a/src/lib.rs b/src/lib.rs index df6e850..eb46c09 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ +#![deny(missing_docs)] /*! Rust wrapper for Apple's Grand Central Dispatch (GCD). @@ -74,8 +75,6 @@ assert!(*count.lock().unwrap() >= 15); */ -#![warn(missing_docs)] - use std::error::Error; use std::fmt; use std::mem; @@ -93,8 +92,8 @@ pub use crate::source::TimerNode; /// Raw foreign function interface for libdispatch. pub mod ffi; mod group; -mod queue; mod once; +mod queue; mod sem; mod source; @@ -110,58 +109,94 @@ impl fmt::Display for WaitTimeout { } } -impl Error for WaitTimeout { } +impl Error for WaitTimeout {} fn time_after_delay(delay: Duration) -> dispatch_time_t { - delay.as_secs().checked_mul(1_000_000_000).and_then(|i| { - i.checked_add(delay.subsec_nanos() as u64) - }).and_then(|i| { - if i < (i64::max_value() as u64) { Some(i as i64) } else { None } - }).map_or(DISPATCH_TIME_FOREVER, |i| unsafe { - dispatch_time(DISPATCH_TIME_NOW, i) - }) + delay + .as_secs() + .checked_mul(1_000_000_000) + .and_then(|i| i.checked_add(delay.subsec_nanos() as u64)) + .and_then(|i| { + if i < (i64::MAX as u64) { + Some(i as i64) + } else { + None + } + }) + .map_or(DISPATCH_TIME_FOREVER, |i| unsafe { + dispatch_time(DISPATCH_TIME_NOW, i) + }) } fn context_and_function(closure: F) -> (*mut c_void, dispatch_function_t) - where F: FnOnce() { - extern fn work_execute_closure(context: Box) where F: FnOnce() { +where + F: FnOnce(), +{ + extern "C" fn work_execute_closure(context: Box) + where + F: FnOnce(), + { (*context)(); } let closure = Box::new(closure); - let func: extern fn(Box) = work_execute_closure::; + let func: extern "C" fn(Box) = work_execute_closure::; unsafe { - (mem::transmute(closure), mem::transmute(func)) + ( + mem::transmute::, *mut std::ffi::c_void>(closure), + mem::transmute::), extern "C" fn(*mut std::ffi::c_void)>( + func, + ), + ) } } -fn context_and_sync_function(closure: &mut Option) -> - (*mut c_void, dispatch_function_t) - where F: FnOnce() { - extern fn work_read_closure(context: &mut Option) where F: FnOnce() { +fn context_and_sync_function(closure: &mut Option) -> (*mut c_void, dispatch_function_t) +where + F: FnOnce(), +{ + extern "C" fn work_read_closure(context: &mut Option) + where + F: FnOnce(), + { // This is always passed Some, so it's safe to unwrap let closure = context.take().unwrap(); closure(); } let context: *mut Option = closure; - let func: extern fn(&mut Option) = work_read_closure::; + let func: extern "C" fn(&mut Option) = work_read_closure::; unsafe { - (context as *mut c_void, mem::transmute(func)) + ( + context as *mut c_void, + mem::transmute::< + for<'a> extern "C" fn(&'a mut std::option::Option), + extern "C" fn(*mut std::ffi::c_void), + >(func), + ) } } -fn context_and_apply_function(closure: &F) -> - (*mut c_void, extern fn(*mut c_void, usize)) - where F: Fn(usize) { - extern fn work_apply_closure(context: &F, iter: usize) - where F: Fn(usize) { +fn context_and_apply_function(closure: &F) -> (*mut c_void, extern "C" fn(*mut c_void, usize)) +where + F: Fn(usize), +{ + extern "C" fn work_apply_closure(context: &F, iter: usize) + where + F: Fn(usize), + { context(iter); } let context: *const F = closure; - let func: extern fn(&F, usize) = work_apply_closure::; + let func: extern "C" fn(&F, usize) = work_apply_closure::; unsafe { - (context as *mut c_void, mem::transmute(func)) + ( + context as *mut c_void, + mem::transmute::< + for<'a> extern "C" fn(&'a F, usize), + extern "C" fn(*mut std::ffi::c_void, usize), + >(func), + ) } } diff --git a/src/once.rs b/src/once.rs index c1dbfa8..7ae089e 100644 --- a/src/once.rs +++ b/src/once.rs @@ -1,7 +1,7 @@ use std::cell::UnsafeCell; -use crate::ffi::*; use crate::context_and_sync_function; +use crate::ffi::*; /// A predicate used to execute a closure only once for the lifetime of an /// application. @@ -13,7 +13,9 @@ pub struct Once { impl Once { /// Creates a new `Once`. pub const fn new() -> Once { - Once { predicate: UnsafeCell::new(0) } + Once { + predicate: UnsafeCell::new(0), + } } /// Executes a closure once, ensuring that no other closure has been or @@ -22,11 +24,16 @@ impl Once { /// If called simultaneously from multiple threads, waits synchronously // until the work has completed. #[inline(always)] - pub fn call_once(&'static self, work: F) where F: FnOnce() { + pub fn call_once(&'static self, work: F) + where + F: FnOnce(), + { #[cold] #[inline(never)] fn once(predicate: *mut dispatch_once_t, work: F) - where F: FnOnce() { + where + F: FnOnce(), + { let mut work = Some(work); let (context, work) = context_and_sync_function(&mut work); unsafe { @@ -43,7 +50,13 @@ impl Once { } } -unsafe impl Sync for Once { } +impl Default for Once { + fn default() -> Self { + Self::new() + } +} + +unsafe impl Sync for Once {} #[cfg(test)] mod tests { diff --git a/src/queue.rs b/src/queue.rs index 46ed9cd..2f43c62 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -6,8 +6,7 @@ use std::time::Duration; use crate::ffi::*; use crate::{ - context_and_function, context_and_sync_function, context_and_apply_function, - time_after_delay, + context_and_apply_function, context_and_function, context_and_sync_function, time_after_delay, }; /// The type of a dispatch queue. @@ -59,9 +58,9 @@ pub enum QueuePriority { impl QueuePriority { fn as_raw(&self) -> c_long { match *self { - QueuePriority::High => DISPATCH_QUEUE_PRIORITY_HIGH, - QueuePriority::Default => DISPATCH_QUEUE_PRIORITY_DEFAULT, - QueuePriority::Low => DISPATCH_QUEUE_PRIORITY_LOW, + QueuePriority::High => DISPATCH_QUEUE_PRIORITY_HIGH, + QueuePriority::Default => DISPATCH_QUEUE_PRIORITY_DEFAULT, + QueuePriority::Low => DISPATCH_QUEUE_PRIORITY_LOW, QueuePriority::Background => DISPATCH_QUEUE_PRIORITY_BACKGROUND, } } @@ -100,9 +99,7 @@ impl Queue { /// Creates a new dispatch `Queue`. pub fn create(label: &str, attr: QueueAttribute) -> Self { let label = CString::new(label).unwrap(); - let queue = unsafe { - dispatch_queue_create(label.as_ptr(), attr.as_raw()) - }; + let queue = unsafe { dispatch_queue_create(label.as_ptr(), attr.as_raw()) }; Queue { ptr: queue } } @@ -111,8 +108,7 @@ impl Queue { /// A dispatch queue's priority is inherited from its target queue. /// Additionally, if both the queue and its target are serial queues, /// their blocks will not be invoked concurrently. - pub fn with_target_queue(label: &str, attr: QueueAttribute, target: &Queue) - -> Self { + pub fn with_target_queue(label: &str, attr: QueueAttribute, target: &Queue) -> Self { let queue = Queue::create(label, attr); unsafe { dispatch_set_target_queue(queue.ptr, target.ptr); @@ -134,7 +130,10 @@ impl Queue { /// Submits a closure for execution on self and waits until it completes. pub fn exec_sync(&self, work: F) -> T - where F: Send + FnOnce() -> T, T: Send { + where + F: Send + FnOnce() -> T, + T: Send, + { let mut result = None; { let result_ref = &mut result; @@ -154,7 +153,10 @@ impl Queue { /// Submits a closure for asynchronous execution on self and returns /// immediately. - pub fn exec_async(&self, work: F) where F: 'static + Send + FnOnce() { + pub fn exec_async(&self, work: F) + where + F: 'static + Send + FnOnce(), + { let (context, work) = context_and_function(work); unsafe { dispatch_async_f(self.ptr, context, work); @@ -164,7 +166,9 @@ impl Queue { /// After the specified delay, submits a closure for asynchronous execution /// on self. pub fn exec_after(&self, delay: Duration, work: F) - where F: 'static + Send + FnOnce() { + where + F: 'static + Send + FnOnce(), + { let when = time_after_delay(delay); let (context, work) = context_and_function(work); unsafe { @@ -175,7 +179,9 @@ impl Queue { /// Submits a closure to be executed on self the given number of iterations /// and waits until it completes. pub fn apply(&self, iterations: usize, work: F) - where F: Sync + Fn(usize) { + where + F: Sync + Fn(usize), + { let (context, work) = context_and_apply_function(&work); unsafe { dispatch_apply_f(iterations, self.ptr, context, work); @@ -185,10 +191,13 @@ impl Queue { /// Submits a closure to be executed on self for each element of the /// provided slice and waits until it completes. pub fn for_each(&self, slice: &mut [T], work: F) - where F: Sync + Fn(&mut T), T: Send { + where + F: Sync + Fn(&mut T), + T: Send, + { let slice_ptr = slice.as_mut_ptr(); let work = move |i| unsafe { - work(&mut *slice_ptr.offset(i as isize)); + work(&mut *slice_ptr.add(i)); }; let (context, work) = context_and_apply_function(&work); unsafe { @@ -199,7 +208,11 @@ impl Queue { /// Submits a closure to be executed on self for each element of the /// provided vector and returns a `Vec` of the mapped elements. pub fn map(&self, vec: Vec, work: F) -> Vec - where F: Sync + Fn(T) -> U, T: Send, U: Send { + where + F: Sync + Fn(T) -> U, + T: Send, + U: Send, + { let mut src = vec; let len = src.len(); let src_ptr = src.as_ptr(); @@ -208,8 +221,8 @@ impl Queue { let dest_ptr = dest.as_mut_ptr(); let work = move |i| unsafe { - let result = work(ptr::read(src_ptr.offset(i as isize))); - ptr::write(dest_ptr.offset(i as isize), result); + let result = work(ptr::read(src_ptr.add(i))); + ptr::write(dest_ptr.add(i), result); }; let (context, work) = context_and_apply_function(&work); unsafe { @@ -234,7 +247,10 @@ impl Queue { /// If self is a serial queue or one of the global concurrent queues, /// this method behaves like the normal `sync` method. pub fn barrier_sync(&self, work: F) -> T - where F: Send + FnOnce() -> T, T: Send { + where + F: Send + FnOnce() -> T, + T: Send, + { let mut result = None; { let result_ref = &mut result; @@ -265,7 +281,9 @@ impl Queue { /// If self is a serial queue or one of the global concurrent queues, /// this method behaves like the normal `async` method. pub fn barrier_async(&self, work: F) - where F: 'static + Send + FnOnce() { + where + F: 'static + Send + FnOnce(), + { let (context, work) = context_and_function(work); unsafe { dispatch_barrier_async_f(self.ptr, context, work); @@ -283,8 +301,8 @@ impl Queue { } } -unsafe impl Sync for Queue { } -unsafe impl Send for Queue { } +unsafe impl Sync for Queue {} +unsafe impl Send for Queue {} impl Clone for Queue { fn clone(&self) -> Self { @@ -314,11 +332,13 @@ impl SuspendGuard { unsafe { dispatch_suspend(queue.ptr); } - SuspendGuard { queue: queue.clone() } + SuspendGuard { + queue: queue.clone(), + } } /// Drops self, allowing the suspended `Queue` to resume. - pub fn resume(self) { } + pub fn resume(self) {} } impl Clone for SuspendGuard { @@ -337,10 +357,10 @@ impl Drop for SuspendGuard { #[cfg(test)] mod tests { + use super::*; + use crate::Group; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; - use crate::Group; - use super::*; fn async_increment(queue: &Queue, num: &Arc>) { let num = num.clone(); diff --git a/src/sem.rs b/src/sem.rs index b6faa24..bb70826 100644 --- a/src/sem.rs +++ b/src/sem.rs @@ -18,26 +18,20 @@ impl Semaphore { /// successful calls to `wait` than `signal`, the system assumes the /// `Semaphore` is still in use and will abort if it is disposed. pub fn new(value: u32) -> Self { - let ptr = unsafe { - dispatch_semaphore_create(value as c_long) - }; + let ptr = unsafe { dispatch_semaphore_create(value as c_long) }; Semaphore { ptr } } /// Wait for (decrement) self. pub fn wait(&self) { - let result = unsafe { - dispatch_semaphore_wait(self.ptr, DISPATCH_TIME_FOREVER) - }; + let result = unsafe { dispatch_semaphore_wait(self.ptr, DISPATCH_TIME_FOREVER) }; assert!(result == 0, "Dispatch semaphore wait errored"); } /// Wait for (decrement) self until the specified timeout has elapsed. pub fn wait_timeout(&self, timeout: Duration) -> Result<(), WaitTimeout> { let when = time_after_delay(timeout); - let result = unsafe { - dispatch_semaphore_wait(self.ptr, when) - }; + let result = unsafe { dispatch_semaphore_wait(self.ptr, when) }; if result == 0 { Ok(()) } else { @@ -50,9 +44,7 @@ impl Semaphore { /// If the previous value was less than zero, this method wakes a waiting thread. /// Returns `true` if a thread is woken or `false` otherwise. pub fn signal(&self) -> bool { - unsafe { - dispatch_semaphore_signal(self.ptr) != 0 - } + unsafe { dispatch_semaphore_signal(self.ptr) != 0 } } /// Wait to access a resource protected by self. @@ -64,8 +56,7 @@ impl Semaphore { /// Wait until the specified timeout to access a resource protected by self. /// This decrements self and returns a guard that increments when dropped. - pub fn access_timeout(&self, timeout: Duration) - -> Result { + pub fn access_timeout(&self, timeout: Duration) -> Result { self.wait_timeout(timeout)?; Ok(SemaphoreGuard::new(self.clone())) } @@ -103,7 +94,7 @@ impl SemaphoreGuard { } /// Drops self, signaling the `Semaphore`. - pub fn signal(self) { } + pub fn signal(self) {} } impl Drop for SemaphoreGuard { diff --git a/src/source.rs b/src/source.rs index 161df2f..f2bfd8f 100644 --- a/src/source.rs +++ b/src/source.rs @@ -10,12 +10,13 @@ use crate::time_after_delay; /// Source: https://stackoverflow.com/a/32270215/5214809 extern "C" fn timer_handler(arg: *mut c_void) { - let closure: &mut Box = unsafe { transmute(arg) }; + let closure: &mut Box = + unsafe { &mut *(arg as *mut std::boxed::Box) }; closure(); } /// A timer node. -/// +/// /// Schedule a timer to run a block of code after a delay using `TimerNode::schedule`. /// Drop the timer to cancel it. pub struct TimerNode { @@ -42,7 +43,12 @@ impl TimerNode { /// # Returns /// A `Result` containing either a `TimerNode` or an `Error`. /// - pub fn schedule(interval: Duration, delay: Duration, leeway: Option, block: F) -> Result + pub fn schedule( + interval: Duration, + delay: Duration, + leeway: Option, + block: F, + ) -> Result where F: FnMut() + Send, { @@ -61,12 +67,23 @@ impl TimerNode { let when = time_after_delay(delay); let leeway = leeway.unwrap_or(Duration::from_millis(0)); unsafe { - dispatch_set_context(timer, transmute(context)); + dispatch_set_context( + timer, + transmute::< + std::boxed::Box>, + *mut std::ffi::c_void, + >(context), + ); dispatch_source_set_event_handler_f(timer, timer_handler); - dispatch_source_set_timer(timer, when, interval.as_nanos() as u64, leeway.as_nanos() as u64); + dispatch_source_set_timer( + timer, + when, + interval.as_nanos() as u64, + leeway.as_nanos() as u64, + ); dispatch_resume(timer); } - let node = TimerNode { timer: timer }; + let node = TimerNode { timer }; Ok(node) } @@ -81,7 +98,12 @@ impl TimerNode { let leeway = leeway.unwrap_or(Duration::from_millis(0)); unsafe { dispatch_suspend(self.timer); - dispatch_source_set_timer(self.timer, when, interval.as_nanos() as u64, leeway.as_nanos() as u64); + dispatch_source_set_timer( + self.timer, + when, + interval.as_nanos() as u64, + leeway.as_nanos() as u64, + ); dispatch_resume(self.timer); }; } @@ -119,8 +141,13 @@ mod test { *count += 1; println!("Hello, counter! {}", *count); }; - let _node = - TimerNode::schedule(Duration::from_millis(10), Duration::from_secs(0), None, block).unwrap(); + let _node = TimerNode::schedule( + Duration::from_millis(10), + Duration::from_secs(0), + None, + block, + ) + .unwrap(); thread::sleep(Duration::from_millis(100)); assert!(*count.lock().unwrap() >= 10); }