diff --git a/src/main/java/org/usb4java/javax/AbstractIrpQueue.java b/src/main/java/org/usb4java/javax/AbstractIrpQueue.java index 39add76..98cf032 100644 --- a/src/main/java/org/usb4java/javax/AbstractIrpQueue.java +++ b/src/main/java/org/usb4java/javax/AbstractIrpQueue.java @@ -6,8 +6,9 @@ package org.usb4java.javax; import java.nio.ByteBuffer; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import javax.usb.UsbControlIrp; import javax.usb.UsbException; @@ -27,18 +28,18 @@ */ abstract class AbstractIrpQueue { - /** The queued packets. */ - private final Queue irps = new ConcurrentLinkedQueue(); - - /** The queue processor thread. */ - private volatile Thread processor; - /** If queue is currently aborting. */ private volatile boolean aborting; /** The USB device. */ private final AbstractDevice device; + /** The non-parallel ExecutorService we will use for this queue on this device. */ + private final ExecutorService singleThreadExecutor; + + /** The job counter for active jobs in this queue. */ + private final AtomicInteger activeJobs = new AtomicInteger(0); + /** * Constructor. * @@ -50,6 +51,8 @@ abstract class AbstractIrpQueue if (device == null) throw new IllegalArgumentException("device must be set"); this.device = device; + + this.singleThreadExecutor = Services.getInstance().getConfig().newExecutorService(); } /** @@ -60,73 +63,29 @@ abstract class AbstractIrpQueue */ public final void add(final T irp) { - this.irps.add(irp); - - // Start the queue processor if not already running. - if (this.processor == null) - { - this.processor = new Thread(new Runnable() - { - @Override - public void run() - { - process(); - } - }); - this.processor.setDaemon(true); - this.processor.setName("usb4java IRP Queue Processor"); - this.processor.start(); - } - } - - /** - * Processes the queue. Methods returns when the queue is empty. - */ - final void process() - { - // Get the next IRP - T irp = this.irps.poll(); - - // If there are no IRPs to process then mark the thread as closing - // right away. Otherwise process the IRP (and more IRPs from the queue - // if present). - if (irp == null) - { - this.processor = null; - } - else - { - while (irp != null) - { - // Process the IRP - try - { - processIrp(irp); + singleThreadExecutor.execute(new Runnable() { + final T irp0 = irp; + + @Override + public void run() { + activeJobs.incrementAndGet(); + + try { + if (!aborting) { + try { + processIrp(irp0); + } catch (final UsbException e) { + irp0.setUsbException(e); + } + + irp0.complete(); + finishIrp(irp0); + } + } finally { + activeJobs.decrementAndGet(); } - catch (final UsbException e) - { - irp.setUsbException(e); - } - - // Get next IRP and mark the thread as closing before sending - // the events for the previous IRP - final T nextIrp = this.irps.poll(); - if (nextIrp == null) this.processor = null; - - // Finish the previous IRP - irp.complete(); - finishIrp(irp); - - // Process next IRP (if present) - irp = nextIrp; } - } - - // No more IRPs are present in the queue so terminate the thread. - synchronized (this.irps) - { - this.irps.notifyAll(); - } + }); } /** @@ -156,22 +115,15 @@ final void process() public final void abort() { this.aborting = true; - this.irps.clear(); - while (isBusy()) - { - try - { - synchronized (this.irps) - { - if (isBusy()) this.irps.wait(); - } - } - catch (final InterruptedException e) - { - Thread.currentThread().interrupt(); - } + + singleThreadExecutor.shutdown(); + try { + singleThreadExecutor.awaitTermination(4, TimeUnit.SECONDS); + this.aborting = false; + } catch (InterruptedException e) { + e.printStackTrace(); + Thread.currentThread().interrupt(); } - this.aborting = false; } /** @@ -180,9 +132,9 @@ public final void abort() * * @return True if queue is busy, false if not. */ - public final boolean isBusy() + public final synchronized boolean isBusy() { - return !this.irps.isEmpty() || this.processor != null; + return activeJobs.get() > 0; } /** diff --git a/src/main/java/org/usb4java/javax/Config.java b/src/main/java/org/usb4java/javax/Config.java index fef6399..7ff9a58 100644 --- a/src/main/java/org/usb4java/javax/Config.java +++ b/src/main/java/org/usb4java/javax/Config.java @@ -6,6 +6,8 @@ package org.usb4java.javax; import java.util.Properties; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; /** * Configuration. @@ -29,12 +31,54 @@ final class Config /** Key name for USB communication timeout. */ private static final String SCAN_INTERVAL_KEY = KEY_BASE + "scanInterval"; + /** Key name for USB IRP executor. */ + private static final String EXECUTOR_SERVICE_KEY = KEY_BASE + "irpExecutorService"; + /** The timeout for USB communication in milliseconds. */ private int timeout = DEFAULT_TIMEOUT; /** The scan interval in milliseconds. */ private int scanInterval = DEFAULT_SCAN_INTERVAL; + /** The executor service factory. */ + private ExecutorServiceProvider executorService = new ExecutorServiceProvider() { + private final AtomicInteger poolNumber = new AtomicInteger(1); + + class LocalThreadFactory extends Object implements ThreadFactory { + private final ThreadGroup group; + private final AtomicInteger threadNumber = new AtomicInteger(1); + private final String namePrefix; + + LocalThreadFactory() { + SecurityManager s = System.getSecurityManager(); + group = (s != null) ? s.getThreadGroup() : + Thread.currentThread().getThreadGroup(); + namePrefix = "usb4java-irp-" + + poolNumber.getAndIncrement() + + "-thread-"; + } + + public Thread newThread(Runnable r) { + Thread t = new Thread(group, r, + namePrefix + threadNumber.getAndIncrement(), + 0); + t.setDaemon(true); + if (t.getPriority() != Thread.MAX_PRIORITY) + t.setPriority(Thread.MAX_PRIORITY); + return t; + } + } + + public ExecutorService newExecutorService() { + /* The default executor is a pool of max 1 thread, with 3s timeout. */ + ThreadPoolExecutor es = new ThreadPoolExecutor(0, 1, + 3L, TimeUnit.SECONDS, + new LinkedBlockingQueue()); + es.setThreadFactory(new LocalThreadFactory()); + return es; + } + }; + /** * Constructs new configuration from the specified properties. * @@ -55,6 +99,22 @@ final class Config this.scanInterval = Integer.valueOf(properties.getProperty( SCAN_INTERVAL_KEY)); } + + // Read the irp executor class + if (properties.containsKey(EXECUTOR_SERVICE_KEY)) + { + try { + Class cls = getClass().getClassLoader().loadClass(properties.getProperty( + EXECUTOR_SERVICE_KEY)); + this.executorService = (ExecutorServiceProvider)cls.newInstance(); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + } catch (InstantiationException e) { + e.printStackTrace(); + } catch (IllegalAccessException e) { + e.printStackTrace(); + } + } } /** @@ -76,4 +136,10 @@ public int getScanInterval() { return this.scanInterval; } + + /** + * Creates a new non-parallel execution service. Defaults to single thread exec + * @return new non-parallel ExecutorService + */ + public ExecutorService newExecutorService() { return this.executorService.newExecutorService(); } } diff --git a/src/main/java/org/usb4java/javax/ExecutorServiceProvider.java b/src/main/java/org/usb4java/javax/ExecutorServiceProvider.java new file mode 100644 index 0000000..5215c3c --- /dev/null +++ b/src/main/java/org/usb4java/javax/ExecutorServiceProvider.java @@ -0,0 +1,7 @@ +package org.usb4java.javax; + +import java.util.concurrent.ExecutorService; + +public interface ExecutorServiceProvider { + public ExecutorService newExecutorService(); +} \ No newline at end of file