From db0041b87ed119f7a3372cb414228bdad9d1633f Mon Sep 17 00:00:00 2001 From: Kevin Mehall Date: Thu, 10 Mar 2016 21:25:16 -0800 Subject: [PATCH] Add wrapper for libusb asynchronous transfer API --- examples/async.rs | 131 +++++++++++++++++++ examples/read_device.rs | 2 +- src/async_io.rs | 267 +++++++++++++++++++++++++++++++++++++++ src/config_descriptor.rs | 2 +- src/context.rs | 2 +- src/device.rs | 2 +- src/device_handle.rs | 4 +- src/lib.rs | 2 + 8 files changed, 406 insertions(+), 6 deletions(-) create mode 100644 examples/async.rs create mode 100644 src/async_io.rs diff --git a/examples/async.rs b/examples/async.rs new file mode 100644 index 0000000..ffc0867 --- /dev/null +++ b/examples/async.rs @@ -0,0 +1,131 @@ +extern crate libusb; +use std::str::FromStr; +use std::time::Duration; + +#[derive(Debug)] +struct Endpoint { + config: u8, + iface: u8, + setting: u8, + address: u8 +} + +fn main() { + let args: Vec = std::env::args().collect(); + + if args.len() < 3 { + println!("usage: async "); + return; + } + + let vid: u16 = FromStr::from_str(args[1].as_ref()).unwrap(); + let pid: u16 = FromStr::from_str(args[2].as_ref()).unwrap(); + + let context = libusb::Context::new().unwrap(); + + let (device, device_desc, mut handle) = open_device(&context, vid, pid).expect("Could not open device"); + read_device(&context, &device, &device_desc, &mut handle).unwrap(); +} + +fn open_device(context: &libusb::Context, vid: u16, pid: u16) -> Option<(libusb::Device, libusb::DeviceDescriptor, libusb::DeviceHandle)> { + let devices = match context.devices() { + Ok(d) => d, + Err(_) => return None + }; + + for device in devices.iter() { + let device_desc = match device.device_descriptor() { + Ok(d) => d, + Err(_) => continue + }; + + if device_desc.vendor_id() == vid && device_desc.product_id() == pid { + match device.open() { + Ok(handle) => return Some((device, device_desc, handle)), + Err(_) => continue + } + } + } + + None +} + +fn read_device(context: &libusb::Context, device: &libusb::Device, device_desc: &libusb::DeviceDescriptor, handle: &mut libusb::DeviceHandle) -> libusb::Result<()> { + match find_readable_endpoint(device, device_desc, libusb::TransferType::Interrupt) { + Some(endpoint) => read_endpoint(context, handle, endpoint, libusb::TransferType::Interrupt), + None => println!("No readable interrupt endpoint") + } + + match find_readable_endpoint(device, device_desc, libusb::TransferType::Bulk) { + Some(endpoint) => read_endpoint(context, handle, endpoint, libusb::TransferType::Bulk), + None => println!("No readable bulk endpoint") + } + + Ok(()) +} + +fn find_readable_endpoint(device: &libusb::Device, device_desc: &libusb::DeviceDescriptor, transfer_type: libusb::TransferType) -> Option { + for n in 0..device_desc.num_configurations() { + let config_desc = match device.config_descriptor(n) { + Ok(c) => c, + Err(_) => continue + }; + + for interface in config_desc.interfaces() { + for interface_desc in interface.descriptors() { + for endpoint_desc in interface_desc.endpoint_descriptors() { + if endpoint_desc.direction() == libusb::Direction::In && endpoint_desc.transfer_type() == transfer_type { + return Some(Endpoint { + config: config_desc.number(), + iface: interface_desc.interface_number(), + setting: interface_desc.setting_number(), + address: endpoint_desc.address() + }); + } + } + } + } + } + + None +} + +fn read_endpoint(context: &libusb::Context, handle: &mut libusb::DeviceHandle, endpoint: Endpoint, transfer_type: libusb::TransferType) { + println!("Reading from endpoint: {:?}", endpoint); + + configure_endpoint(handle, &endpoint).unwrap(); + + let mut buffers = [[0u8; 128]; 8]; + + { + let mut async_group = ::libusb::AsyncGroup::new(context); + let timeout = Duration::from_secs(1); + + match transfer_type { + libusb::TransferType::Interrupt => { + for buf in &mut buffers { + async_group.submit(::libusb::Transfer::interrupt(handle, endpoint.address, buf, timeout)).unwrap(); + } + }, + libusb::TransferType::Bulk => { + for buf in &mut buffers { + async_group.submit(::libusb::Transfer::bulk(handle, endpoint.address, buf, timeout)).unwrap(); + } + } + _ => unimplemented!() + } + + loop { + let mut transfer = async_group.wait_any().unwrap(); + println!("Read: {:?} {:?}", transfer.status(), transfer.actual()); + async_group.submit(transfer).unwrap(); + } + } +} + +fn configure_endpoint<'a>(handle: &'a mut libusb::DeviceHandle, endpoint: &Endpoint) -> libusb::Result<()> { + handle.set_active_configuration(endpoint.config)?; + handle.claim_interface(endpoint.iface)?; + handle.set_alternate_setting(endpoint.iface, endpoint.setting)?; + Ok(()) +} diff --git a/examples/read_device.rs b/examples/read_device.rs index 98197a6..065e59f 100644 --- a/examples/read_device.rs +++ b/examples/read_device.rs @@ -129,7 +129,7 @@ fn read_endpoint(handle: &mut libusb::DeviceHandle, endpoint: Endpoint, transfer match configure_endpoint(handle, &endpoint) { Ok(_) => { let mut vec = Vec::::with_capacity(256); - let mut buf = unsafe { slice::from_raw_parts_mut((&mut vec[..]).as_mut_ptr(), vec.capacity()) }; + let buf = unsafe { slice::from_raw_parts_mut((&mut vec[..]).as_mut_ptr(), vec.capacity()) }; let timeout = Duration::from_secs(1); diff --git a/src/async_io.rs b/src/async_io.rs new file mode 100644 index 0000000..be63b7b --- /dev/null +++ b/src/async_io.rs @@ -0,0 +1,267 @@ +use libc::{c_int, c_uchar, c_uint, c_void}; +use std::cell::UnsafeCell; +use std::collections::{HashSet, VecDeque}; +use std::{marker::PhantomData, mem, slice, sync::Mutex, time::Duration}; + +use crate::{constants::*, Context, DeviceHandle, Error, Result}; + +/// An asynchronous transfer that is not currently pending. +/// Specifies the data necessary to perform a transfer on a specified endpoint, and holds the +/// result of a completed transfer. A completed Transfer can be resubmitted. +pub struct Transfer<'d> { + _handle: PhantomData<&'d DeviceHandle<'d>>, // transfer.dev_handle + _buffer: PhantomData<&'d mut [u8]>, // transfer.data + transfer: *mut libusb_sys::libusb_transfer, +} + +/// The status of a Transfer returned by wait_any. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum TransferStatus { + /// Completed without error + Success = LIBUSB_TRANSFER_COMPLETED as isize, + + /// Failed (IO error) + Error = LIBUSB_TRANSFER_ERROR as isize, + + /// Timed out + Timeout = LIBUSB_TRANSFER_TIMED_OUT as isize, + + /// Cancelled + Cancelled = LIBUSB_TRANSFER_CANCELLED as isize, + + /// Endpoint stalled or control request not supported + Stall = LIBUSB_TRANSFER_STALL as isize, + + /// Device was disconnected + NoDevice = LIBUSB_TRANSFER_NO_DEVICE as isize, + + /// Device sent more data than requested + Overflow = LIBUSB_TRANSFER_OVERFLOW as isize, + + /// No status, not yet submitted + Unknown = -1 as isize, +} + +impl<'d> Transfer<'d> { + fn new( + handle: &'d DeviceHandle<'d>, + endpoint: u8, + transfer_type: c_uchar, + buffer: &'d mut [u8], + timeout: Duration, + ) -> Transfer<'d> { + let timeout_ms = timeout.as_secs() * 1000 + timeout.subsec_nanos() as u64 / 1_000_000; + unsafe { + let t = libusb_sys::libusb_alloc_transfer(0); + (*t).status = -1; + (*t).dev_handle = handle.as_raw(); + (*t).endpoint = endpoint as c_uchar; + (*t).transfer_type = transfer_type; + (*t).timeout = timeout_ms as c_uint; + (*t).buffer = buffer.as_mut_ptr(); + (*t).length = buffer.len() as i32; + (*t).actual_length = 0; + + Transfer { + transfer: t, + _handle: PhantomData, + _buffer: PhantomData, + } + } + } + + /// Creates an asynchronous bulk transfer, but does not submit it. + pub fn bulk( + handle: &'d DeviceHandle<'d>, + endpoint: u8, + buffer: &'d mut [u8], + timeout: Duration, + ) -> Transfer<'d> { + Transfer::new(handle, endpoint, LIBUSB_TRANSFER_TYPE_BULK, buffer, timeout) + } + + /// Creates an asynchronous interrupt transfer, but does not submit it. + pub fn interrupt( + handle: &'d DeviceHandle<'d>, + endpoint: u8, + buffer: &'d mut [u8], + timeout: Duration, + ) -> Transfer<'d> { + Transfer::new( + handle, + endpoint, + LIBUSB_TRANSFER_TYPE_INTERRUPT, + buffer, + timeout, + ) + } + + /// Gets the status of a completed transfer. + pub fn status(&self) -> TransferStatus { + match unsafe { (*self.transfer).status } { + LIBUSB_TRANSFER_COMPLETED => TransferStatus::Success, + LIBUSB_TRANSFER_ERROR => TransferStatus::Error, + LIBUSB_TRANSFER_TIMED_OUT => TransferStatus::Timeout, + LIBUSB_TRANSFER_CANCELLED => TransferStatus::Cancelled, + LIBUSB_TRANSFER_STALL => TransferStatus::Stall, + LIBUSB_TRANSFER_NO_DEVICE => TransferStatus::NoDevice, + _ => TransferStatus::Unknown, + } + } + + /// Access the buffer of a transfer. + pub fn buffer(&mut self) -> &'d mut [u8] { + unsafe { + slice::from_raw_parts_mut((*self.transfer).buffer, (*self.transfer).length as usize) + } + } + + /// Replace the buffer of a transfer. + pub fn set_buffer(&mut self, buffer: &'d mut [u8]) { + unsafe { + (*self.transfer).buffer = buffer.as_mut_ptr(); + (*self.transfer).length = buffer.len() as i32; + (*self.transfer).actual_length = 0; + } + } + + /// Access the slice of the buffer containing actual data received on an IN transfer. + pub fn actual(&mut self) -> &'d mut [u8] { + unsafe { + slice::from_raw_parts_mut( + (*self.transfer).buffer, + (*self.transfer).actual_length as usize, + ) + } + } +} + +impl<'d> Drop for Transfer<'d> { + fn drop(&mut self) { + unsafe { + libusb_sys::libusb_free_transfer(self.transfer); + } + } +} + +/// Internal type holding data touched by libusb completion callback. +struct CallbackData { + /// Transfers that have completed, but haven't yet been returned from `wait_any`. + completed: Mutex>, + + /// Signals a completion to avoid race conditions between callback and + /// `libusb_handle_events_completed`. This is synchronized with the + /// Mutex above, but can't be included in it because libusb reads it + /// without the lock held. + flag: UnsafeCell, +} + +/// An AsyncGroup manages outstanding asynchronous transfers. +pub struct AsyncGroup<'d> { + context: &'d Context, + + /// The data touched by the callback, boxed to keep a consistent address if the AsyncGroup + /// is moved while transfers are active. + callback_data: Box, + + /// The set of pending transfers. We need to keep track of them so they can be cancelled on + /// drop. + pending: HashSet<*mut libusb_sys::libusb_transfer>, +} + +/// The libusb transfer completion callback. Careful: libusb may call this on any thread! +extern "C" fn async_group_callback(transfer: *mut libusb_sys::libusb_transfer) { + unsafe { + let callback_data: &CallbackData = &*((*transfer).user_data as *const CallbackData); + let mut completed = callback_data.completed.lock().unwrap(); + completed.push_back(transfer); + *(callback_data.flag.get()) = 1; + } +} + +impl<'d> AsyncGroup<'d> { + /// Creates an AsyncGroup to process transfers for devices from the given context. + pub fn new(context: &'d Context) -> AsyncGroup<'d> { + AsyncGroup { + context: context, + callback_data: Box::new(CallbackData { + completed: Mutex::new(VecDeque::new()), + flag: UnsafeCell::new(0), + }), + pending: HashSet::new(), + } + } + + /// Starts a transfer. + /// + /// The Transfer is owned by the AsyncGroup while it is pending, and is + /// returned from `wait_any` when it completes or fails. + pub fn submit(&mut self, t: Transfer<'d>) -> Result<()> { + unsafe { + (*t.transfer).user_data = &mut *self.callback_data as *mut _ as *mut c_void; + (*t.transfer).callback = async_group_callback; + } + try_unsafe!(libusb_sys::libusb_submit_transfer(t.transfer)); + self.pending.insert(t.transfer); + mem::forget(t); + Ok(()) + } + + /// Waits for any pending transfer to complete, and return it. + pub fn wait_any(&mut self) -> Result> { + if self.pending.len() == 0 { + // Otherwise this function would block forever waiting for a transfer to complete + return Err(Error::NotFound); + } + + { + let transfer; + loop { + { + let mut completed = self.callback_data.completed.lock().unwrap(); + if let Some(t) = completed.pop_front() { + transfer = t; + break; + } + unsafe {*self.callback_data.flag.get() = 0}; + } + try_unsafe!(libusb_sys::libusb_handle_events_completed( + self.context.as_raw(), + self.callback_data.flag.get() + )); + } + + if !self.pending.remove(&transfer) { + panic!("Got a completion for a transfer that wasn't pending"); + } + + Ok(Transfer { + transfer: transfer, + _handle: PhantomData, + _buffer: PhantomData, + }) + } + } + + /// Cancels all pending transfers. + /// + /// Throws away any received data and errors on transfers that have completed, but haven't been + /// collected by `wait_any`. + pub fn cancel_all(&mut self) -> Result<()> { + for &transfer in self.pending.iter() { + try_unsafe!(libusb_sys::libusb_cancel_transfer(transfer)) + } + + while self.pending.len() > 0 { + self.wait_any()?; + } + + Ok(()) + } +} + +impl<'d> Drop for AsyncGroup<'d> { + fn drop(&mut self) { + self.cancel_all().ok(); + } +} diff --git a/src/config_descriptor.rs b/src/config_descriptor.rs index f396570..a963a47 100644 --- a/src/config_descriptor.rs +++ b/src/config_descriptor.rs @@ -28,7 +28,7 @@ impl<'c> ConfigDescriptor { unsafe { (*self.descriptor).bConfigurationValue } } - /// Returns the device's maximum power consumption (in milliwatts) in this configuration. + /// Returns the device's maximum power consumption (in milliamps) in this configuration. pub fn max_power(&self) -> u16 { unsafe { (*self.descriptor).bMaxPower as u16 * 2 } } diff --git a/src/context.rs b/src/context.rs index 6df9a97..9ddd1ba 100644 --- a/src/context.rs +++ b/src/context.rs @@ -56,7 +56,7 @@ impl Context { } /// Get the raw libusb_context pointer, for advanced use in unsafe code. - pub fn as_raw(&self) -> *mut ::libusb::libusb_context { + pub fn as_raw(&self) -> *mut libusb_context { self.context } diff --git a/src/device.rs b/src/device.rs index 9a72975..3f102e2 100644 --- a/src/device.rs +++ b/src/device.rs @@ -29,7 +29,7 @@ unsafe impl<'a> Sync for Device<'a> {} impl<'a> Device<'a> { /// Get the raw libusb_device pointer, for advanced use in unsafe code - pub fn as_raw(&self) -> *mut ::libusb::libusb_device { + pub fn as_raw(&self) -> *mut libusb_device { self.device } diff --git a/src/device_handle.rs b/src/device_handle.rs index e3af497..7f72842 100644 --- a/src/device_handle.rs +++ b/src/device_handle.rs @@ -4,8 +4,8 @@ use std::slice; use std::time::Duration; use bit_set::BitSet; -use libusb_sys::{constants::*, *}; use libc::{c_int, c_uchar, c_uint}; +use libusb_sys::{constants::*, *}; use crate::config_descriptor::ConfigDescriptor; use crate::context::Context; @@ -43,7 +43,7 @@ impl<'a> DeviceHandle<'a> { /// /// This structure tracks claimed interfaces, and will get out if sync if interfaces are /// manipulated externally. Use only libusb endpoint IO functions. - pub fn as_raw(&self) -> *mut ::libusb::libusb_device_handle { + pub fn as_raw(&self) -> *mut libusb_device_handle { self.handle } diff --git a/src/lib.rs b/src/lib.rs index 3ddd7cf..912d8c0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,7 @@ pub use libusb_sys::constants; +pub use crate::async_io::{AsyncGroup, Transfer, TransferStatus}; pub use crate::error::{Error, Result}; pub use crate::version::{version, LibraryVersion}; @@ -30,6 +31,7 @@ mod test_helpers; mod error; mod version; +mod async_io; mod context; mod device; mod device_handle;