From 11aaad737128a5f7115c4582601217dca950268d Mon Sep 17 00:00:00 2001 From: Flier Lu Date: Tue, 4 Sep 2018 16:17:12 +0800 Subject: [PATCH] block based operations of Queue --- Cargo.toml | 1 + src/blk.rs | 416 +++++++++++++++++++++++++++++++++++++++++-- src/data.rs | 60 ++++--- src/ffi.rs | 86 +++++++-- src/io.rs | 86 +++++---- src/lib.rs | 44 ++--- src/sem.rs | 10 +- src/time.rs | 71 +++++--- tests-ios/prelude.rs | 25 ++- 9 files changed, 653 insertions(+), 146 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7359dbb..c9e6833 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ exclude = [ [dependencies] log = "0.4" +bitflags = "1.0" libc = "0.2" block = "0.1" diff --git a/src/blk.rs b/src/blk.rs index 06d618b..d980e07 100644 --- a/src/blk.rs +++ b/src/blk.rs @@ -1,17 +1,48 @@ +use std::cell::RefCell; +use std::fmt; use std::os::raw::c_void; +use std::sync::Arc; use ffi::*; -use {Queue, Timeout, WaitTimeout}; +use {Group, IntoTimeout, QosClass, Queue, WaitTimeout}; + +bitflags! { + /// Flags to pass to the `DispatchBlock::create()` functions. + pub struct BlockFlags: dispatch_block_flags_t { + /// Flag indicating that a dispatch block object should act as a barrier block + /// when submitted to a DISPATCH_QUEUE_CONCURRENT queue. + const BARRIER = 0x1; + /// Flag indicating that a dispatch block object should execute disassociated + /// from current execution context attributes such as QOS class, os_activity_t + /// and properties of the current IPC request (if any). + const DETACHED = 0x2; + /// Flag indicating that a dispatch block object should be assigned the execution + /// context attributes that are current at the time the block object is created. + const ASSIGN_CURRENT = 0x4; + /// Flag indicating that a dispatch block object should be not be assigned a QOS class. + const NO_QOS_CLASS = 0x8; + /// Flag indicating that execution of a dispatch block object submitted to + /// a queue should prefer the QOS class assigned to the queue over the QOS class + /// assigned to the block (resp. associated with the block at the time of submission). + const INHERIT_QOS_CLASS = 0x10; + /// Flag indicating that execution of a dispatch block object submitted to + /// a queue should prefer the QOS class assigned to the block (resp. associated + /// with the block at the time of submission) over the QOS class assigned to + /// the queue, as long as doing so will not result in a lower QOS class. + const ENFORCE_QOS_CLASS = 0x20; + } +} /// Creates, synchronously executes, and releases a dispatch block from the specified block and flags. -pub fn perform(flags: dispatch_block_flags_t, closure: F) +pub fn perform(flags: BlockFlags, closure: F) where F: 'static + Fn(), { - unsafe { dispatch_block_perform(flags, block(closure)) } + unsafe { dispatch_block_perform(flags.bits(), block(closure)) } } /// Dispatch blocks allow you to configure properties of individual units of work on a queue directly. +/// /// They also allow you to address individual work units for the purposes of waiting for their completion, /// getting notified about their completion, and/or canceling them. pub struct DispatchBlock { @@ -19,21 +50,29 @@ pub struct DispatchBlock { } impl DispatchBlock { - /// Creates a new dispatch block on the heap using an existing block and the given flags. - pub fn create(flags: dispatch_block_flags_t, closure: F) -> Self + /// Creates a new dispatch block on the heap using a closure. + pub fn new(closure: F) -> Self + where + F: 'static + Fn(), + { + Self::create(BlockFlags::INHERIT_QOS_CLASS, closure) + } + + /// Creates a new dispatch block on the heap using a closure and the given flags. + pub fn create(flags: BlockFlags, closure: F) -> Self where F: 'static + Fn(), { - let ptr = unsafe { dispatch_block_create(flags, block(closure)) }; + let ptr = unsafe { dispatch_block_create(flags.bits(), block(closure)) }; DispatchBlock { ptr } } - /// Creates a new dispatch block on the heap from an existing block and the given flags, + /// Creates a new dispatch block on the heap from a closure and the given flags, /// and assigns it the specified QoS class and relative priority. pub fn create_with_qos_class( - flags: dispatch_block_flags_t, - qos_class: dispatch_qos_class_t, + flags: BlockFlags, + qos_class: QosClass, relative_priority: i32, closure: F, ) -> Self @@ -42,8 +81,8 @@ impl DispatchBlock { { let ptr = unsafe { dispatch_block_create_with_qos_class( - flags, - qos_class, + flags.bits(), + qos_class as u32, relative_priority, block(closure), ) @@ -52,9 +91,19 @@ impl DispatchBlock { DispatchBlock { ptr } } + /// Extracts the raw `dispatch_block_t`. + pub fn as_raw(&self) -> dispatch_block_t { + self.ptr + } + + /// Consumes the `DispatchBlock`, returning the wrapped `dispatch_block_t`. + pub fn into_raw(self) -> dispatch_block_t { + self.ptr + } + /// Waits synchronously until execution of the specified dispatch block has completed or until the specified timeout has elapsed. - pub fn wait_timeout(&self, timeout: T) -> Result<(), WaitTimeout> { - let when = timeout.as_raw(); + pub fn wait_timeout(&self, timeout: T) -> Result<(), WaitTimeout> { + let when = timeout.into_raw(); if unsafe { dispatch_block_wait(self.ptr, when) } == 0 { Ok(()) @@ -112,28 +161,204 @@ impl Drop for DispatchBlock { } } +impl From for DispatchBlock { + fn from(closure: F) -> Self { + DispatchBlock::new(closure) + } +} + +impl Queue { + /// Submits a closure for execution on self and waits until it completes. + pub fn sync_block(&self, flags: BlockFlags, work: F) -> T + where + F: 'static + Send + Fn() -> T, + T: 'static + Send + fmt::Debug, + { + let result = Arc::new(RefCell::new(None)); + { + let result_ref = result.clone(); + let work = move || { + *result_ref.borrow_mut() = Some(work()); + }; + let block = DispatchBlock::create(flags, work); + + unsafe { + dispatch_sync(self.ptr, block.clone().into_raw()); + } + } + // This was set so it's safe to unwrap + let result = result.borrow_mut().take(); + result.unwrap() + } + + /// Submits a closure for asynchronous execution on self and returns dispatch block immediately. + pub fn async_block(&self, work: B) -> DispatchBlock + where + B: Into, + { + let block = work.into(); + + unsafe { + dispatch_async(self.ptr, block.clone().into_raw()); + } + + block + } + + /// After the specified delay, submits a closure for asynchronous execution + /// on self and returns dispatch block immediately. + pub fn after_block(&self, delay: T, work: B) -> DispatchBlock + where + T: IntoTimeout, + B: Into, + { + let when = delay.into_raw(); + let block = work.into(); + + unsafe { + dispatch_after(when, self.ptr, block.clone().into_raw()); + } + + block + } + + /// Submits a closure to be executed on self the given number of iterations + /// and waits until it completes. + pub fn apply_block(&self, iterations: usize, work: F) + where + F: 'static + Sync + Fn(usize), + { + let block = block(work); + + unsafe { + dispatch_apply(iterations, self.ptr, block); + } + } + + /// Submits a closure to be executed on self as a barrier and waits until + /// it completes. + /// + /// Barriers create synchronization points within a concurrent queue. + /// If self is concurrent, when it encounters a barrier it delays execution + /// of the closure (and any further ones) until all closures submitted + /// before the barrier finish executing. + /// At that point, the barrier closure executes by itself. + /// Upon completion, self resumes its normal execution behavior. + /// + /// If self is a serial queue or one of the global concurrent queues, + /// this method behaves like the normal `sync` method. + pub fn barrier_sync_block(&self, flags: BlockFlags, work: F) -> T + where + F: 'static + Send + Fn() -> T, + T: 'static + Send + fmt::Debug, + { + let result = Arc::new(RefCell::new(None)); + { + let result_ref = result.clone(); + let work = move || { + *result_ref.borrow_mut() = Some(work()); + }; + let block = DispatchBlock::create(flags, work); + + unsafe { + dispatch_barrier_sync(self.ptr, block.clone().into_raw()); + } + } + // This was set so it's safe to unwrap + let result = result.borrow_mut().take(); + result.unwrap() + } + + /// Submits a closure to be executed on self as a barrier and returns + /// a `DispatchBlock` immediately. + /// + /// Barriers create synchronization points within a concurrent queue. + /// If self is concurrent, when it encounters a barrier it delays execution + /// of the closure (and any further ones) until all closures submitted + /// before the barrier finish executing. + /// At that point, the barrier closure executes by itself. + /// Upon completion, self resumes its normal execution behavior. + /// + /// If self is a serial queue or one of the global concurrent queues, + /// this method behaves like the normal `async` method. + pub fn barrier_async_block(&self, work: B) -> DispatchBlock + where + B: Into, + { + let block = work.into(); + + unsafe { + dispatch_barrier_async(self.ptr, block.clone().into_raw()); + } + + block + } +} + +impl Group { + /// Submits a closure asynchronously to the given `Queue` and associates it + /// with self and returns a `DispatchBlock` immediately. + pub fn async_block(&self, queue: &Queue, work: B) + where + B: Into, + { + let block = work.into(); + + unsafe { + dispatch_group_async(self.ptr, queue.ptr, block.clone().into_raw()); + } + } + + /// Schedules a closure to be submitted to the given `Queue` when all tasks + /// associated with self have completed and returns a `DispatchBlock` immediately. + /// If self is empty, the closure is submitted immediately. + pub fn notify_block(&self, queue: &Queue, work: B) + where + B: Into, + { + let block = work.into(); + + unsafe { + dispatch_group_notify(self.ptr, queue.ptr, block.clone().into_raw()); + } + } +} + #[cfg(test)] mod tests { use std::cell::Cell; - use std::sync::Arc; + use std::sync::{Arc, Mutex}; + use std::time::{Duration, Instant}; use super::*; + use QueueAttribute; + + fn async_increment_block(queue: &Queue, num: &Arc>) { + let num = num.clone(); + queue.async_block(DispatchBlock::create( + BlockFlags::ASSIGN_CURRENT, + move || { + let mut num = num.lock().unwrap(); + *num += 1; + }, + )); + } #[test] - fn test_perform() { + fn test_perform_block() { let n = Arc::new(Cell::new(0)); let c = n.clone(); - perform(DISPATCH_BLOCK_NO_QOS_CLASS, move || c.set(123)); + perform(BlockFlags::NO_QOS_CLASS, move || c.set(123)); assert_eq!(n.get(), 123); } #[test] - fn test_block() { + fn test_block_block() { let n = Arc::new(Cell::new(0)); let c = n.clone(); - let block = DispatchBlock::create(DISPATCH_BLOCK_NO_QOS_CLASS, move || c.set(123)); + let block = DispatchBlock::create(BlockFlags::NO_QOS_CLASS, move || c.set(123)); assert!(!block.canceled()); @@ -147,4 +372,159 @@ mod tests { assert!(!block.done()); } + + #[test] + fn test_serial_queue_block() { + let q = Queue::create("", QueueAttribute::Serial); + let mut num = 0; + + q.sync(|| num = 1); + assert_eq!(num, 1); + assert_eq!(q.qos_class(), (QosClass::Unspecified, 0)); + + assert_eq!(q.sync_block(BlockFlags::ASSIGN_CURRENT, move || num), 1); + } + + #[test] + fn test_serial_queue_async_block() { + let q = Queue::create("", QueueAttribute::Serial); + let num = Arc::new(Mutex::new(0)); + + async_increment_block(&q, &num); + + // Sync an empty block to ensure the async one finishes + q.sync(|| ()); + assert_eq!(*num.lock().unwrap(), 1); + } + + #[test] + fn test_after_block() { + let q = Queue::create("", QueueAttribute::Serial); + let group = Group::create(); + let num = Arc::new(Mutex::new(0)); + + let delay = Duration::from_millis(0); + let num2 = num.clone(); + let start = Instant::now(); + { + let group = group.clone(); + let guard = RefCell::new(Some(group.enter())); + q.after_block(delay, move || { + let mut num = num2.lock().unwrap(); + *num = 1; + guard.borrow_mut().take().unwrap().leave(); + }); + } + + // Wait for the previous block to complete + assert!(group.wait_timeout(Duration::from_millis(5000))); + assert!(start.elapsed() >= delay); + assert_eq!(*num.lock().unwrap(), 1); + } + + #[test] + fn test_apply_block() { + let q = Queue::create("", QueueAttribute::Serial); + let num = Arc::new(Mutex::new(0)); + { + let num = num.clone(); + q.apply_block(5, move |_| *num.lock().unwrap() += 1); + } + assert_eq!(*num.lock().unwrap(), 5); + } + + #[test] + fn test_barrier_sync_block() { + let q = Queue::create("", QueueAttribute::Concurrent); + let num = Arc::new(Mutex::new(0)); + + async_increment_block(&q, &num); + async_increment_block(&q, &num); + + let num2 = num.clone(); + let result = q.barrier_sync_block(BlockFlags::ASSIGN_CURRENT, move || { + let mut num = num2.lock().unwrap(); + if *num == 2 { + *num = 10; + } + *num + }); + assert_eq!(result, 10); + + async_increment_block(&q, &num); + async_increment_block(&q, &num); + + q.barrier_sync(|| ()); + assert_eq!(*num.lock().unwrap(), 12); + } + + #[test] + fn test_barrier_async_block() { + let q = Queue::create("", QueueAttribute::Concurrent); + let num = Arc::new(Mutex::new(0)); + + async_increment_block(&q, &num); + async_increment_block(&q, &num); + + let num2 = num.clone(); + q.barrier_async_block(move || { + let mut num = num2.lock().unwrap(); + if *num == 2 { + *num = 10; + } + }); + + async_increment_block(&q, &num); + async_increment_block(&q, &num); + + q.barrier_sync(|| ()); + assert_eq!(*num.lock().unwrap(), 12); + } + + #[test] + fn test_group_block() { + let group = Group::create(); + let q = Queue::create("", QueueAttribute::Serial); + let num = Arc::new(Mutex::new(0)); + { + let num = num.clone(); + group.async_block(&q, move || { + let mut num = num.lock().unwrap(); + *num += 1; + }); + } + + { + let group = group.clone(); + let guard = RefCell::new(Some(group.enter())); + let num = num.clone(); + q.async_block(DispatchBlock::create( + BlockFlags::ASSIGN_CURRENT, + move || { + let mut num = num.lock().unwrap(); + *num += 1; + guard.borrow_mut().take().unwrap().leave(); + }, + )); + } + + let notify_group = Group::create(); + + { + let guard = RefCell::new(Some(notify_group.enter())); + let num = num.clone(); + group.notify_block(&q, move || { + let mut num = num.lock().unwrap(); + *num *= 5; + guard.borrow_mut().take().unwrap().leave(); + }); + } + + // Wait for the notify block to finish + notify_group.wait(); + // If the notify ran, the group should be empty + assert!(group.is_empty()); + // The notify must have run after the two blocks of the group + assert_eq!(*num.lock().unwrap(), 10); + } } diff --git a/src/data.rs b/src/data.rs index 8272bb2..4ce762e 100644 --- a/src/data.rs +++ b/src/data.rs @@ -7,41 +7,53 @@ use ffi::*; use Queue; /// The destructor responsible for freeing the data when it is no longer needed. -pub trait Destructor { - /// Extracts the raw `dispatch_block_t`. - fn as_raw(self) -> dispatch_block_t; +pub trait IntoDestructor { + /// Consumes the `Destructor`, returning the raw `dispatch_block_t`. + fn into_raw(self) -> dispatch_block_t; } -impl Destructor for dispatch_block_t { - fn as_raw(self) -> dispatch_block_t { +impl IntoDestructor for dispatch_block_t { + fn into_raw(self) -> dispatch_block_t { self } } -impl Destructor for F { - fn as_raw(self) -> dispatch_block_t { +impl IntoDestructor for F { + fn into_raw(self) -> dispatch_block_t { block(self) } } -/// The default destructor for dispatch data objects. -/// Used at data object creation to indicate that the supplied buffer -/// should be copied into internal storage managed by the system. -pub fn dispatch_data_destructor_default() -> dispatch_block_t { - ptr::null() +/// The build-in destructor responsible for freeing the data when it is no longer needed. +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum Destructor { + /// The default destructor for dispatch data objects. + /// Used at data object creation to indicate that the supplied buffer + /// should be copied into internal storage managed by the system. + Default, + /// The destructor for dispatch data objects created from a malloc'd buffer. + /// Used at data object creation to indicate that the supplied buffer + /// was allocated by the malloc() family and should be destroyed with free(3). + Free, + /// The destructor for dispatch data objects that have been created + /// from buffers that require deallocation with munmap(2). + Munmap, } -/// The destructor for dispatch data objects created from a malloc'd buffer. -/// Used at data object creation to indicate that the supplied buffer -/// was allocated by the malloc() family and should be destroyed with free(3). -pub fn dispatch_data_destructor_free() -> dispatch_block_t { - unsafe { _dispatch_data_destructor_free } +impl Default for Destructor { + fn default() -> Self { + Destructor::Default + } } -/// The destructor for dispatch data objects that have been created -/// from buffers that require deallocation with munmap(2). -pub fn dispatch_data_destructor_munmap() -> dispatch_block_t { - unsafe { _dispatch_data_destructor_free } +impl IntoDestructor for Destructor { + fn into_raw(self) -> dispatch_block_t { + match self { + Destructor::Default => ptr::null(), + Destructor::Free => unsafe { _dispatch_data_destructor_free }, + Destructor::Munmap => unsafe { _dispatch_data_destructor_free }, + } + } } /// An immutable object representing a contiguous or sparse region of memory. @@ -67,14 +79,14 @@ impl Data { } /// Creates a new dispatch data object with the specified memory buffer and destructor. - pub fn create_with_destructor( + pub fn create_with_destructor( queue: &Queue, buffer: *const c_void, size: usize, destructor: F, ) -> Self { let ptr = - unsafe { dispatch_data_create(buffer, size, queue.as_raw(), destructor.as_raw()) }; + unsafe { dispatch_data_create(buffer, size, queue.as_raw(), destructor.into_raw()) }; debug!("create data with {} bytes, data: {:?}", size, ptr); @@ -288,7 +300,7 @@ mod tests { let queue = Queue::main(); let p = unsafe { libc::malloc(8) } as *const c_void; - let data = Data::create_with_destructor(&queue, p, 8, dispatch_data_destructor_free()); + let data = Data::create_with_destructor(&queue, p, 8, Destructor::Free); assert_eq!(data.len(), 8); diff --git a/src/ffi.rs b/src/ffi.rs index 6f01158..d34ac16 100644 --- a/src/ffi.rs +++ b/src/ffi.rs @@ -29,7 +29,7 @@ pub type dispatch_once_t = c_long; pub type dispatch_queue_t = *mut dispatch_object_s; pub type dispatch_time_t = u64; // dispatch_source_type_t -pub type dispatch_fd_t = u64; +pub type dispatch_fd_t = c_int; pub type dispatch_data_handler_t = *const Block<(dispatch_data_t, c_int), ()>; pub type dispatch_data_t = *mut dispatch_object_s; pub type dispatch_data_applier_t = @@ -37,11 +37,11 @@ pub type dispatch_data_applier_t = pub type dispatch_io_t = *mut dispatch_object_s; pub type dispatch_io_handler_t = *const Block<(bool, dispatch_data_t, c_int), ()>; pub type dispatch_cleanup_handler_t = *const Block<(c_int,), ()>; -pub type dispatch_io_type_t = u64; -pub type dispatch_io_close_flags_t = u64; -pub type dispatch_io_interval_flags_t = u64; +pub type dispatch_io_type_t = c_ulong; +pub type dispatch_io_close_flags_t = c_ulong; +pub type dispatch_io_interval_flags_t = c_ulong; pub type dispatch_queue_attr_t = *const dispatch_object_s; -pub type dispatch_block_flags_t = u64; +pub type dispatch_block_flags_t = c_ulong; pub type dispatch_qos_class_t = c_uint; #[cfg_attr(any(target_os = "macos", target_os = "ios"), link(name = "System", kind = "dylib"))] @@ -83,40 +83,48 @@ extern "C" { pub fn dispatch_set_target_queue(object: dispatch_object_t, queue: dispatch_queue_t); pub fn dispatch_main(); - // void dispatch_async ( dispatch_queue_t queue, dispatch_block_t block ); + pub fn dispatch_async(queue: dispatch_queue_t, block: dispatch_block_t); pub fn dispatch_async_f( queue: dispatch_queue_t, context: *mut c_void, work: dispatch_function_t, ); - // void dispatch_sync ( dispatch_queue_t queue, dispatch_block_t block ); + pub fn dispatch_sync(queue: dispatch_queue_t, block: dispatch_block_t); pub fn dispatch_sync_f( queue: dispatch_queue_t, context: *mut c_void, work: dispatch_function_t, ); - // void dispatch_after ( dispatch_time_t when, dispatch_queue_t queue, dispatch_block_t block ); + pub fn dispatch_after(when: dispatch_time_t, queue: dispatch_queue_t, block: dispatch_block_t); pub fn dispatch_after_f( when: dispatch_time_t, queue: dispatch_queue_t, context: *mut c_void, work: dispatch_function_t, ); - // void dispatch_apply ( size_t iterations, dispatch_queue_t queue, void (^block)(size_t) ); + pub fn dispatch_apply( + iterations: usize, + queue: dispatch_queue_t, + block: *const Block<(usize,), ()>, + ); pub fn dispatch_apply_f( iterations: usize, queue: dispatch_queue_t, context: *mut c_void, work: extern "C" fn(*mut c_void, usize), ); - // void dispatch_once ( dispatch_once_t *predicate, dispatch_block_t block ); + pub fn dispatch_once(predicate: *mut dispatch_once_t, block: dispatch_block_t); pub fn dispatch_once_f( predicate: *mut dispatch_once_t, context: *mut c_void, function: dispatch_function_t, ); - // void dispatch_group_async ( dispatch_group_t group, dispatch_queue_t queue, dispatch_block_t block ); + pub fn dispatch_group_async( + group: dispatch_group_t, + queue: dispatch_queue_t, + block: dispatch_block_t, + ); pub fn dispatch_group_async_f( group: dispatch_group_t, queue: dispatch_queue_t, @@ -126,7 +134,11 @@ extern "C" { pub fn dispatch_group_create() -> dispatch_group_t; pub fn dispatch_group_enter(group: dispatch_group_t); pub fn dispatch_group_leave(group: dispatch_group_t); - // void dispatch_group_notify ( dispatch_group_t group, dispatch_queue_t queue, dispatch_block_t block ); + pub fn dispatch_group_notify( + group: dispatch_group_t, + queue: dispatch_queue_t, + block: dispatch_block_t, + ); pub fn dispatch_group_notify_f( group: dispatch_group_t, queue: dispatch_queue_t, @@ -148,13 +160,13 @@ extern "C" { pub fn dispatch_semaphore_wait(dsema: dispatch_semaphore_t, timeout: dispatch_time_t) -> c_long; - // void dispatch_barrier_async ( dispatch_queue_t queue, dispatch_block_t block ); + pub fn dispatch_barrier_async(queue: dispatch_queue_t, block: dispatch_block_t); pub fn dispatch_barrier_async_f( queue: dispatch_queue_t, context: *mut c_void, work: dispatch_function_t, ); - // void dispatch_barrier_sync ( dispatch_queue_t queue, dispatch_block_t block ); + pub fn dispatch_barrier_sync(queue: dispatch_queue_t, block: dispatch_block_t); pub fn dispatch_barrier_sync_f( queue: dispatch_queue_t, context: *mut c_void, @@ -295,26 +307,57 @@ pub fn dispatch_get_main_queue() -> dispatch_queue_t { unsafe { &_dispatch_main_q as *const _ as dispatch_queue_t } } +/// A QOS class which indicates work performed by this thread is interactive with the user. pub const QOS_CLASS_USER_INTERACTIVE: dispatch_qos_class_t = 0x21; +/// A QOS class which indicates work performed by this thread was initiated by the user +/// and that the user is likely waiting for the results. pub const QOS_CLASS_USER_INITIATED: dispatch_qos_class_t = 0x19; +/// A default QOS class used by the system in cases where more specific QOS class information is not available. pub const QOS_CLASS_DEFAULT: dispatch_qos_class_t = 0x15; +/// A QOS class which indicates work performed by this thread may or may not be initiated by the user +/// and that the user is unlikely to be immediately waiting for the results. pub const QOS_CLASS_UTILITY: dispatch_qos_class_t = 0x11; +/// A QOS class which indicates work performed by this thread was not initiated by the user +/// and that the user may be unaware of the results. pub const QOS_CLASS_BACKGROUND: dispatch_qos_class_t = 0x09; +/// A QOS class value which indicates the absence or removal of QOS class information. pub const QOS_CLASS_UNSPECIFIED: dispatch_qos_class_t = 0x00; +/// The queue executes blocks serially in FIFO order. pub const DISPATCH_QUEUE_SERIAL: dispatch_queue_attr_t = 0 as dispatch_queue_attr_t; +/// The queue executes blocks concurrently. pub static DISPATCH_QUEUE_CONCURRENT: &'static dispatch_object_s = unsafe { &_dispatch_queue_attr_concurrent }; +/// Items dispatched to the queue will run at high priority, +/// i.e. the queue will be scheduled for execution +/// before any default priority or low priority queue. pub const DISPATCH_QUEUE_PRIORITY_HIGH: c_long = 2; +/// Items dispatched to the queue will run at the default priority, +/// i.e. the queue will be scheduled for execution +/// after all high priority queues have been scheduled, +/// but before any low priority queues have been scheduled. pub const DISPATCH_QUEUE_PRIORITY_DEFAULT: c_long = 0; +/// Items dispatched to the queue will run at low priority, +/// i.e. the queue will be scheduled for execution +/// after all default priority and high priority queues have been scheduled. pub const DISPATCH_QUEUE_PRIORITY_LOW: c_long = -2; +/// Items dispatched to the queue will run at background priority, +/// i.e. the queue will be scheduled for execution +/// after all higher priority queues have been scheduled +/// and the system will run items on this queue on a thread +/// with background status as per setpriority(2) +/// (i.e. disk I/O is throttled and the thread's scheduling priority is set to lowest value). pub const DISPATCH_QUEUE_PRIORITY_BACKGROUND: c_long = -1 << 15; +/// A `dispatch_time_t` corresponding to the current time. pub const DISPATCH_TIME_NOW: dispatch_time_t = 0; +/// A `dispatch_time_t` corresponding to the maximum time. pub const DISPATCH_TIME_FOREVER: dispatch_time_t = !0; +/// A dispatch I/O channel representing a stream of bytes. pub const DISPATCH_IO_STREAM: dispatch_io_type_t = 0; +/// A dispatch I/O channel representing a random access file. pub const DISPATCH_IO_RANDOM: dispatch_io_type_t = 1; /// Stop outstanding operations on a channel when the channel is closed. @@ -325,11 +368,26 @@ pub const DISPATCH_IO_STOP: dispatch_io_close_flags_t = 0x1; /// is inferior to the low water mark (or zero). pub const DISPATCH_IO_STRICT_INTERVAL: dispatch_io_interval_flags_t = 0x1; +/// Flag indicating that a dispatch block object should act as a barrier block +/// when submitted to a DISPATCH_QUEUE_CONCURRENT queue. pub const DISPATCH_BLOCK_BARRIER: dispatch_block_flags_t = 0x1; +/// Flag indicating that a dispatch block object should execute disassociated +/// from current execution context attributes such as QOS class, os_activity_t +/// and properties of the current IPC request (if any). pub const DISPATCH_BLOCK_DETACHED: dispatch_block_flags_t = 0x2; +/// Flag indicating that a dispatch block object should be assigned the execution +/// context attributes that are current at the time the block object is created. pub const DISPATCH_BLOCK_ASSIGN_CURRENT: dispatch_block_flags_t = 0x4; +/// Flag indicating that a dispatch block object should be not be assigned a QOS class. pub const DISPATCH_BLOCK_NO_QOS_CLASS: dispatch_block_flags_t = 0x8; +/// Flag indicating that execution of a dispatch block object submitted to +/// a queue should prefer the QOS class assigned to the queue over the QOS class +/// assigned to the block (resp. associated with the block at the time of submission). pub const DISPATCH_BLOCK_INHERIT_QOS_CLASS: dispatch_block_flags_t = 0x10; +/// Flag indicating that execution of a dispatch block object submitted to +/// a queue should prefer the QOS class assigned to the block (resp. associated +/// with the block at the time of submission) over the QOS class assigned to +/// the queue, as long as doing so will not result in a lower QOS class. pub const DISPATCH_BLOCK_ENFORCE_QOS_CLASS: dispatch_block_flags_t = 0x20; #[cfg(test)] diff --git a/src/io.rs b/src/io.rs index 4828b88..b99b733 100644 --- a/src/io.rs +++ b/src/io.rs @@ -12,21 +12,36 @@ use libc::{mode_t, off_t}; use ffi::*; use {Data, Queue}; -#[derive(Clone, Debug)] -pub enum ChannelType { - Stream, - Random, +bitflags! { + /// The type of flags you can set on a `Channel::close()` call + pub struct CloseFlags: dispatch_io_close_flags_t { + /// Stop outstanding operations on a channel when the channel is closed. + const STOP = DISPATCH_IO_STOP; + } } -impl ChannelType { - pub fn as_raw(&self) -> dispatch_io_type_t { - match self { - ChannelType::Stream => DISPATCH_IO_STREAM, - ChannelType::Random => DISPATCH_IO_RANDOM, - } +bitflags! { + /// Type of flags to set on `Channel::set_interval()` call + pub struct IntervalFlags: dispatch_io_interval_flags_t { + /// Enqueue I/O handlers at a channel's interval setting + /// even if the amount of data ready to be delivered is inferior to + /// the low water mark (or zero). + const STRICT_INTERVAL = DISPATCH_IO_STRICT_INTERVAL; } } +/// The type of a dispatch I/O channel. +#[cfg_attr(target_os = "macos", repr(u64))] +#[cfg_attr(target_os = "ios", repr(u32))] +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum ChannelType { + /// A dispatch I/O channel representing a stream of bytes. + Stream = DISPATCH_IO_STREAM, + /// A dispatch I/O channel representing a random access file. + Random = DISPATCH_IO_RANDOM, +} + +/// A dispatch I/O channel represents the asynchronous I/O policy applied to a file descriptor. pub struct Channel { ptr: dispatch_io_t, } @@ -52,7 +67,7 @@ impl Channel { let ptr = unsafe { dispatch_io_create( - channel_type.as_raw(), + channel_type as dispatch_io_type_t, fd, queue.ptr, cleanup_handler.map_or(ptr::null(), block), @@ -97,7 +112,7 @@ impl Channel { let s = unsafe { CStr::from_bytes_with_nul_unchecked(&v) }; let ptr = unsafe { dispatch_io_create_with_path( - channel_type.as_raw(), + channel_type as dispatch_io_type_t, s.as_ptr(), flags, mode, @@ -121,7 +136,7 @@ impl Channel { /// /// The new channel inherits the file descriptor or path name associated with the existing channel, /// but not its channel type or policies. - pub fn open_stream(&self, queue: &Queue, cleanup_handler: Option) -> Self + pub fn open_as_stream(&self, queue: &Queue, cleanup_handler: Option) -> Self where H: 'static + Fn(i32), { @@ -148,7 +163,7 @@ impl Channel { /// /// The new channel inherits the file descriptor or path name associated with the existing channel, /// but not its channel type or policies. - pub fn open_file(&self, queue: &Queue, cleanup_handler: Option) -> Self + pub fn open_as_file(&self, queue: &Queue, cleanup_handler: Option) -> Self where H: 'static + Fn(i32), { @@ -174,9 +189,9 @@ impl Channel { /// Schedule a read operation for asynchronous execution on the specified I/O channel. /// The I/O handler is enqueued one or more times depending on the general load of the system /// and the policy specified on the I/O channel. - pub fn read(&self, offset: isize, length: usize, queue: &Queue, io_handler: H) + pub fn async_read(&self, offset: isize, length: usize, queue: &Queue, io_handler: H) where - H: 'static + Fn(io::Result), + H: 'static + Fn(io::Result<(Data, bool)>), { debug!( "read {} bytes at offset {}, channel: {:?}", @@ -185,14 +200,14 @@ impl Channel { let fd = self.descriptor(); let handler = block(move |done: bool, ptr: dispatch_data_t, error: i32| { - let result = if done { + let result = if error == 0 { unsafe { dispatch_retain(ptr) }; let data = Data::borrow(ptr); trace!("read {} bytes with fd: {:?}", data.len(), fd); - Ok(data) + Ok((data, done)) } else { trace!("read failed, error: {}", error); @@ -208,9 +223,9 @@ impl Channel { /// Schedule a write operation for asynchronous execution on the specified I/O channel. /// The I/O handler is enqueued one or more times depending on the general load of the system /// and the policy specified on the I/O channel. - pub fn write(&self, offset: isize, data: &Data, queue: &Queue, io_handler: H) + pub fn async_write(&self, offset: isize, data: &Data, queue: &Queue, io_handler: H) where - H: 'static + Fn(io::Result), + H: 'static + Fn(io::Result<(Data, bool)>), { debug!( "write {} bytes at offset {}, channel: {:?}", @@ -230,7 +245,7 @@ impl Channel { data.len() ); - Ok(data) + Ok((data, done)) } else { trace!("write failed, error: {}", error); @@ -253,10 +268,10 @@ impl Channel { /// Close the specified I/O channel to new read or write operations; /// scheduling operations on a closed channel results in their handler returning an error. - pub fn close(&self, flags: dispatch_io_close_flags_t) { + pub fn close(&self, flags: CloseFlags) { debug!("close channel: {:?}", self.ptr); - unsafe { dispatch_io_close(self.ptr, flags) } + unsafe { dispatch_io_close(self.ptr, flags.bits()) } } /// Schedule a barrier operation on the specified I/O channel; @@ -301,7 +316,7 @@ impl Channel { /// Set a interval at which I/O handlers are to be enqueued /// on the I/O channel for all operations. - pub fn set_interval(&self, internal: Duration, flags: dispatch_io_interval_flags_t) { + pub fn set_interval(&self, internal: Duration, flags: IntervalFlags) { debug!("set internal to {:?}, channel: {:?}", internal, self.ptr); unsafe { @@ -312,7 +327,7 @@ impl Channel { .checked_mul(1_000_000_000) .and_then(|dur| dur.checked_add(internal.subsec_nanos() as u64)) .unwrap_or(u64::max_value()), - flags, + flags.bits(), ) } } @@ -374,7 +389,7 @@ impl Queue { /// Schedule a read operation for asynchronous execution on the specified file descriptor. /// The specified handler is enqueued with the data read from the file descriptor /// when the operation has completed or an error occurs. - pub fn read(&self, f: &F, length: usize, handler: H) + pub fn async_read(&self, f: &F, length: usize, handler: H) where F: AsRawFd, H: 'static + Fn(io::Result), @@ -404,7 +419,7 @@ impl Queue { /// Schedule a write operation for asynchronous execution on the specified file descriptor. /// The specified handler is enqueued when the operation has completed or an error occurs. - pub fn write(&self, f: &F, data: &Data, handler: H) + pub fn async_write(&self, f: &F, data: &Data, handler: H) where F: AsRawFd, H: 'static + Fn(io::Result), @@ -459,13 +474,18 @@ mod tests { let data = q.data(b"hello world"); - c.write(0, &data, &q, move |result| { - assert!(result.unwrap().is_empty()); + c.async_write(0, &data, &q, move |result| match result { + Ok((data, done)) => { + assert!(data.is_empty()); + assert!(done); + } + Err(err) => warn!("write channel failed, {}", err), }); - c.read(0, 5, &q, move |result| match result { - Ok(data) => { + c.async_read(0, 5, &q, move |result| match result { + Ok((data, done)) => { assert_eq!(data.len(), 5); + assert!(done); } Err(err) => warn!("read channel failed, {}", err), }); @@ -482,7 +502,7 @@ mod tests { trace!("sync up channel finished"); - c.close(0); + c.close(CloseFlags::empty()); } #[test] @@ -497,7 +517,7 @@ mod tests { let barrier = Arc::new(Barrier::new(2)); let b = barrier.clone(); - q.read(&f, 5, move |result| { + q.async_read(&f, 5, move |result| { match result { Ok(data) => { assert_eq!(data.len(), 5); diff --git a/src/lib.rs b/src/lib.rs index 44b37c9..19726d2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -48,6 +48,8 @@ extern crate block; extern crate libc; #[macro_use] extern crate log; +#[macro_use] +extern crate bitflags; #[cfg(test)] extern crate pretty_env_logger; @@ -67,30 +69,27 @@ use ffi::*; /// Raw foreign function interface for libdispatch. pub mod ffi; -#[cfg(target_os = "macos")] +#[cfg(any(target_os = "macos", target_os = "ios"))] mod blk; -#[cfg(target_os = "macos")] +#[cfg(any(target_os = "macos", target_os = "ios"))] mod data; -#[cfg(target_os = "macos")] +#[cfg(any(target_os = "macos", target_os = "ios"))] mod io; mod qos; -#[cfg(target_os = "macos")] +#[cfg(any(target_os = "macos", target_os = "ios"))] mod sem; mod time; -#[cfg(target_os = "macos")] -pub use blk::{perform, DispatchBlock}; -#[cfg(target_os = "macos")] -pub use data::{ - dispatch_data_destructor_default, dispatch_data_destructor_free, - dispatch_data_destructor_munmap, Data, Destructor, -}; -#[cfg(target_os = "macos")] -pub use ffi::{DISPATCH_IO_STOP, DISPATCH_IO_STRICT_INTERVAL}; +#[cfg(any(target_os = "macos", target_os = "ios"))] +pub use blk::{perform, BlockFlags, DispatchBlock}; +#[cfg(any(target_os = "macos", target_os = "ios"))] +pub use data::{Data, Destructor, IntoDestructor}; +#[cfg(any(target_os = "macos", target_os = "ios"))] +pub use io::{Channel, ChannelType, CloseFlags, IntervalFlags}; pub use qos::QosClass; -#[cfg(target_os = "macos")] +#[cfg(any(target_os = "macos", target_os = "ios"))] pub use sem::Semaphore; -pub use time::{after, at, now, Timeout, WaitTimeout}; +pub use time::{after, at, IntoTimeout, WaitTimeout, FOREVER, NOW}; /// The type of a dispatch queue. #[derive(Debug, Hash, PartialEq)] @@ -145,7 +144,7 @@ impl QueueAttribute { /// Returns an attribute value which may be provided to `Queue::create` or `Queue::with_target_queue`, /// in order to make the created queue initially inactive. - #[cfg(target_os = "macos")] + #[cfg(any(target_os = "macos", target_os = "ios"))] pub fn inactive(self) -> Self { let attr = unsafe { dispatch_queue_attr_make_initially_inactive(self.as_raw()) }; @@ -154,7 +153,7 @@ impl QueueAttribute { /// Returns an attribute value which may be provided to `Queue::create` or `Queue::with_target_queue`, /// in order to assign a QOS class and relative priority to the queue. - #[cfg(target_os = "macos")] + #[cfg(any(target_os = "macos", target_os = "ios"))] pub fn with_qos_class(self, qos_class: QosClass, relative_priority: i32) -> Self { let attr = unsafe { dispatch_queue_attr_make_with_qos_class( @@ -315,6 +314,7 @@ impl Queue { } /// Returns the QOS class and relative priority of the given queue. + #[cfg(any(target_os = "macos", target_os = "ios"))] pub fn qos_class(&self) -> (QosClass, i32) { let mut relative_priority = 0; @@ -373,9 +373,9 @@ impl Queue { pub fn after(&self, delay: T, work: F) where F: 'static + Send + FnOnce(), - T: Timeout, + T: IntoTimeout, { - let when = delay.as_raw(); + let when = delay.into_raw(); let (context, work) = context_and_function(work); unsafe { dispatch_after_f(when, self.ptr, context, work); @@ -627,8 +627,8 @@ impl Group { /// Waits for all tasks associated with self to complete within the /// specified duration. /// Returns true if the tasks completed or false if the timeout elapsed. - pub fn wait_timeout(&self, timeout: T) -> bool { - let when = timeout.as_raw(); + pub fn wait_timeout(&self, timeout: T) -> bool { + let when = timeout.into_raw(); let result = unsafe { dispatch_group_wait(self.ptr, when) }; result == 0 } @@ -757,11 +757,11 @@ mod tests { q.sync(|| num = 1); assert_eq!(num, 1); - assert_eq!(q.qos_class(), (QosClass::Unspecified, 0)); assert_eq!(q.sync(|| num), 1); } + #[cfg(any(target_os = "macos", target_os = "ios"))] #[test] fn test_serial_queue_with_qos_class() { let q = Queue::create( diff --git a/src/sem.rs b/src/sem.rs index c06a2f2..6f5c0f8 100644 --- a/src/sem.rs +++ b/src/sem.rs @@ -1,5 +1,7 @@ +use std::os::raw::c_long; + use ffi::*; -use {Timeout, WaitTimeout}; +use {IntoTimeout, WaitTimeout}; /// A counting semaphore. /// @@ -18,7 +20,7 @@ impl Semaphore { /// Passing a value greater than zero is useful for managing a finite pool of resources, /// where the pool size is equal to the value. pub fn new(n: u64) -> Self { - let ptr = unsafe { dispatch_semaphore_create(n as i64) }; + let ptr = unsafe { dispatch_semaphore_create(n as c_long) }; Semaphore { ptr } } @@ -33,8 +35,8 @@ impl Semaphore { /// Wait (decrement) for a semaphoreor until the specified timeout has elapsed. /// /// Decrement the counting semaphore. - pub fn wait_timeout(&self, timeout: T) -> Result<(), WaitTimeout> { - let when = timeout.as_raw(); + pub fn wait_timeout(&self, timeout: T) -> Result<(), WaitTimeout> { + let when = timeout.into_raw(); let n = unsafe { dispatch_semaphore_wait(self.ptr, when) }; diff --git a/src/time.rs b/src/time.rs index 403d2eb..5938305 100644 --- a/src/time.rs +++ b/src/time.rs @@ -1,67 +1,85 @@ -use std::ptr; +use std::error::Error; +use std::fmt; use std::time::{Duration, Instant, SystemTime, SystemTimeError, UNIX_EPOCH}; use libc::{c_long, time_t, timespec}; use ffi::*; +/// A `dispatch_time_t` corresponding to the current time. +pub const NOW: dispatch_time_t = DISPATCH_TIME_NOW; +/// A `dispatch_time_t` corresponding to the maximum time. +pub const FOREVER: dispatch_time_t = DISPATCH_TIME_FOREVER; + /// A type indicating whether a timed wait on a dispatch object returned due to a time out or not. #[derive(Clone, Copy, Debug, PartialEq)] pub struct WaitTimeout; +impl fmt::Display for WaitTimeout { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "operation time out") + } +} + +impl Error for WaitTimeout { + fn description(&self) -> &str { + "operation time out" + } +} + /// When to timeout. -pub trait Timeout { - /// Extracts the raw `dispatch_time_t`. - fn as_raw(self) -> dispatch_time_t; +pub trait IntoTimeout { + /// Consumes the `IntoTimeout`, returning the raw `dispatch_time_t`. + fn into_raw(self) -> dispatch_time_t; } -impl Timeout for Option { - fn as_raw(self) -> dispatch_time_t { +impl IntoTimeout for Option { + fn into_raw(self) -> dispatch_time_t { if let Some(timeout) = self { - timeout.as_raw() + timeout.into_raw() } else { DISPATCH_TIME_NOW } } } -impl Timeout for i32 { - fn as_raw(self) -> dispatch_time_t { - Duration::from_millis(self as u64).as_raw() +impl IntoTimeout for i32 { + fn into_raw(self) -> dispatch_time_t { + Duration::from_millis(self as u64).into_raw() } } -impl Timeout for u32 { - fn as_raw(self) -> dispatch_time_t { - Duration::from_millis(self as u64).as_raw() +impl IntoTimeout for u32 { + fn into_raw(self) -> dispatch_time_t { + Duration::from_millis(self as u64).into_raw() } } -impl Timeout for Duration { - fn as_raw(self) -> dispatch_time_t { +impl IntoTimeout for Duration { + fn into_raw(self) -> dispatch_time_t { after(self) } } -impl Timeout for dispatch_time_t { - fn as_raw(self) -> dispatch_time_t { +impl IntoTimeout for dispatch_time_t { + fn into_raw(self) -> dispatch_time_t { self } } -impl Timeout for Instant { - fn as_raw(self) -> dispatch_time_t { - self.duration_since(Instant::now()).as_raw() +impl IntoTimeout for Instant { + fn into_raw(self) -> dispatch_time_t { + self.duration_since(Instant::now()).into_raw() } } -impl Timeout for SystemTime { - fn as_raw(self) -> dispatch_time_t { - self.duration_since(SystemTime::now()).unwrap().as_raw() +impl IntoTimeout for SystemTime { + fn into_raw(self) -> dispatch_time_t { + self.duration_since(SystemTime::now()).unwrap().into_raw() } } -/// Returns a `dispatch_time_t` corresponding to the given time. +/// Returns a `dispatch_time_t` corresponding to the given duration. pub fn after(delay: Duration) -> dispatch_time_t { delay .as_secs() @@ -79,11 +97,6 @@ pub fn after(delay: Duration) -> dispatch_time_t { }) } -/// Returns a `dispatch_time_t` corresponding to the wall time. -pub fn now() -> dispatch_time_t { - unsafe { dispatch_walltime(ptr::null(), 0) } -} - /// Returns a `dispatch_time_t` corresponding to the given time. pub fn at(tm: SystemTime) -> Result { let dur = tm.duration_since(UNIX_EPOCH)?; diff --git a/tests-ios/prelude.rs b/tests-ios/prelude.rs index 0b2f738..825683d 100644 --- a/tests-ios/prelude.rs +++ b/tests-ios/prelude.rs @@ -1,10 +1,20 @@ +#[macro_use] +extern crate log; +extern crate pretty_env_logger; +extern crate tempfile; + extern crate dispatch; -use std::sync::{Arc, Mutex}; +use std::cell::RefCell; +use std::mem; +use std::os::raw::{c_long, c_void}; +use std::sync::{Arc, Barrier, Mutex}; use std::time::{Duration, Instant}; -use dispatch::*; +use tempfile::tempfile; + use dispatch::ffi::*; +use dispatch::*; fn async_increment(queue: &Queue, num: &Arc>) { let num = num.clone(); @@ -13,3 +23,14 @@ fn async_increment(queue: &Queue, num: &Arc>) { *num += 1; }); } + +fn async_increment_block(queue: &Queue, num: &Arc>) { + let num = num.clone(); + queue.async_block(DispatchBlock::create( + BlockFlags::ASSIGN_CURRENT, + move || { + let mut num = num.lock().unwrap(); + *num += 1; + }, + )); +}