From 4c579f044d292d2e9b73796b228b0e44d4df5ebb Mon Sep 17 00:00:00 2001 From: Gwendal Roulleau Date: Wed, 31 Aug 2022 12:44:19 +0200 Subject: [PATCH 1/3] [pulseaudio] Removing isIdle test The isIdle boolean was not properly handled. When disconnection is called, isIdle is not relevant : we should always honnor the disconnection request. In fact, isIdle prevented disconnection when it is necessary (example : when a IOException occurs when sending audio to sink) +Little bug fix on volume parsing: some volume request doesn't respond with a space after the comma separating left/right channel. Signed-off-by: Gwendal Roulleau --- .../binding/pulseaudio/internal/PulseAudioAudioSink.java | 2 -- .../pulseaudio/internal/PulseAudioAudioSource.java | 5 ----- .../internal/PulseaudioSimpleProtocolStream.java | 8 +------- .../openhab/binding/pulseaudio/internal/cli/Parser.java | 2 +- 4 files changed, 2 insertions(+), 15 deletions(-) diff --git a/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseAudioAudioSink.java b/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseAudioAudioSink.java index 8e42bb1c261e7..680185aa9b893 100644 --- a/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseAudioAudioSink.java +++ b/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseAudioAudioSink.java @@ -73,7 +73,6 @@ public void process(@Nullable AudioStream audioStream) final Socket clientSocketLocal = clientSocket; if (clientSocketLocal != null) { // send raw audio to the socket and to pulse audio - setIdle(false); Instant start = Instant.now(); normalizedPCMStream.transferTo(clientSocketLocal.getOutputStream()); if (normalizedPCMStream.getDuration() != -1) { // ensure, if the sound has a duration @@ -110,7 +109,6 @@ public void process(@Nullable AudioStream audioStream) } finally { scheduleDisconnect(); } - setIdle(true); } @Override diff --git a/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseAudioAudioSource.java b/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseAudioAudioSource.java index 5369f600114c9..72e2f7278023e 100644 --- a/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseAudioAudioSource.java +++ b/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseAudioAudioSource.java @@ -84,7 +84,6 @@ public AudioStream getInputStream(AudioFormat audioFormat) throws AudioException if (!audioFormat.isCompatible(sourceFormat)) { throw new AudioException("Incompatible audio format requested"); } - setIdle(true); var pipeOutput = new PipedOutputStream(); var pipeInput = new PipedInputStream(pipeOutput, 1024 * 10) { @Override @@ -96,7 +95,6 @@ public void close() throws IOException { registerPipe(pipeOutput); // get raw audio from the pulse audio socket return new PulseAudioStream(sourceFormat, pipeInput, (idle) -> { - setIdle(idle); if (idle) { scheduleDisconnect(); } else { @@ -113,12 +111,10 @@ public void close() throws IOException { logger.warn( "Error while trying to get audio from pulseaudio audio source. Cannot connect to {}:{}, error: {}", pulseaudioHandler.getHost(), port, e.getMessage()); - setIdle(true); throw e; } } catch (InterruptedException ie) { logger.info("Interrupted during source audio connection: {}", ie.getMessage()); - setIdle(true); throw new AudioException(ie); } countAttempt++; @@ -128,7 +124,6 @@ public void close() throws IOException { } finally { scheduleDisconnect(); } - setIdle(true); throw new AudioException("Unable to create input stream"); } diff --git a/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseaudioSimpleProtocolStream.java b/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseaudioSimpleProtocolStream.java index 98c8bfb3d78dc..90a200925197e 100644 --- a/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseaudioSimpleProtocolStream.java +++ b/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseaudioSimpleProtocolStream.java @@ -43,8 +43,6 @@ public abstract class PulseaudioSimpleProtocolStream { protected @Nullable Socket clientSocket; - private boolean isIdle = true; - private @Nullable ScheduledFuture scheduledDisconnection; public PulseaudioSimpleProtocolStream(PulseaudioHandler pulseaudioHandler, ScheduledExecutorService scheduler) { @@ -75,7 +73,7 @@ public void connectIfNeeded() throws IOException, InterruptedException { */ public void disconnect() { final Socket clientSocketLocal = clientSocket; - if (clientSocketLocal != null && isIdle) { + if (clientSocketLocal != null) { logger.debug("Simple TCP Stream disconnecting"); try { clientSocketLocal.close(); @@ -114,8 +112,4 @@ public String getLabel(@Nullable Locale locale) { var label = pulseaudioHandler.getThing().getLabel(); return label != null ? label : pulseaudioHandler.getThing().getUID().getId(); } - - public void setIdle(boolean idle) { - isIdle = idle; - } } diff --git a/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/cli/Parser.java b/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/cli/Parser.java index b9bcd09074e3e..e5bdf08cf278c 100644 --- a/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/cli/Parser.java +++ b/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/cli/Parser.java @@ -352,7 +352,7 @@ public static List parseSourceOutputs(String raw, PulseaudioClient private static int parseVolume(String vol) { int volumeTotal = 0; int nChannels = 0; - for (String channel : vol.split(", ")) { + for (String channel : vol.split(",")) { Matcher matcher = VOLUME_PATTERN.matcher(channel.trim()); if (matcher.find()) { volumeTotal += Integer.valueOf(matcher.group(3)); From 60f35468a74039c9fa05b998dbc8641369e8011a Mon Sep 17 00:00:00 2001 From: Gwendal Roulleau Date: Sat, 3 Sep 2022 22:51:34 +0200 Subject: [PATCH 2/3] [pulseaudio] Enhancement to the idle detection for disconnection Using a counter to count client instead of a isIdle variable, which was not thread safe. The PulseaudioSimpleProtocolStream parent class is now the sole responsible for closing source or sink stream. Signed-off-by: Gwendal Roulleau --- .../internal/PulseAudioAudioSink.java | 3 +- .../internal/PulseAudioAudioSource.java | 32 ++++----- .../PulseaudioSimpleProtocolStream.java | 68 ++++++++++++++++--- .../internal/handler/PulseaudioHandler.java | 4 -- 4 files changed, 74 insertions(+), 33 deletions(-) diff --git a/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseAudioAudioSink.java b/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseAudioAudioSink.java index 680185aa9b893..dad493e5e85fd 100644 --- a/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseAudioAudioSink.java +++ b/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseAudioAudioSink.java @@ -66,6 +66,7 @@ public void process(@Nullable AudioStream audioStream) if (audioStream == null) { return; } + addClientCount(); try (ConvertedInputStream normalizedPCMStream = new ConvertedInputStream(audioStream)) { for (int countAttempt = 1; countAttempt <= 2; countAttempt++) { // two attempts allowed try { @@ -107,7 +108,7 @@ public void process(@Nullable AudioStream audioStream) throw new UnsupportedAudioFormatException("Cannot send sound to the pulseaudio sink", audioStream.getFormat(), e); } finally { - scheduleDisconnect(); + minusClientCount(); } } diff --git a/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseAudioAudioSource.java b/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseAudioAudioSource.java index 72e2f7278023e..ba5aba7cc05a7 100644 --- a/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseAudioAudioSource.java +++ b/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseAudioAudioSource.java @@ -23,7 +23,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; -import java.util.function.Consumer; import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.Nullable; @@ -94,13 +93,9 @@ public void close() throws IOException { }; registerPipe(pipeOutput); // get raw audio from the pulse audio socket - return new PulseAudioStream(sourceFormat, pipeInput, (idle) -> { - if (idle) { - scheduleDisconnect(); - } else { - // ensure pipe is writing - startPipeWrite(); - } + return new PulseAudioStream(sourceFormat, pipeInput, () -> { + // ensure pipe is writing + startPipeWrite(); }); } catch (IOException e) { disconnect(); // disconnect to force clear connection in case of socket not cleanly shutdown @@ -121,14 +116,15 @@ public void close() throws IOException { } } catch (IOException e) { throw new AudioException(e); - } finally { - scheduleDisconnect(); } throw new AudioException("Unable to create input stream"); } private synchronized void registerPipe(PipedOutputStream pipeOutput) { - this.pipeOutputs.add(pipeOutput); + boolean isAdded = this.pipeOutputs.add(pipeOutput); + if (isAdded) { + addClientCount(); + } startPipeWrite(); } @@ -186,7 +182,10 @@ private synchronized void startPipeWrite() { } private synchronized void unregisterPipe(PipedOutputStream pipeOutput) { - this.pipeOutputs.remove(pipeOutput); + boolean isRemoved = this.pipeOutputs.remove(pipeOutput); + if (isRemoved) { + minusClientCount(); + } try { Thread.sleep(0); } catch (InterruptedException ignored) { @@ -238,13 +237,13 @@ static class PulseAudioStream extends AudioStream { private final Logger logger = LoggerFactory.getLogger(PulseAudioAudioSource.class); private final AudioFormat format; private final InputStream input; - private final Consumer setIdle; + private final Runnable activity; private boolean closed = false; - public PulseAudioStream(AudioFormat format, InputStream input, Consumer setIdle) { + public PulseAudioStream(AudioFormat format, InputStream input, Runnable activity) { this.input = input; this.format = format; - this.setIdle = setIdle; + this.activity = activity; } @Override @@ -277,14 +276,13 @@ public int read(byte @Nullable [] b, int off, int len) throws IOException { if (closed) { throw new IOException("Stream is closed"); } - setIdle.accept(false); + activity.run(); return input.read(b, off, len); } @Override public void close() throws IOException { closed = true; - setIdle.accept(true); input.close(); } }; diff --git a/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseaudioSimpleProtocolStream.java b/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseaudioSimpleProtocolStream.java index 90a200925197e..8889a8ea300f8 100644 --- a/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseaudioSimpleProtocolStream.java +++ b/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseaudioSimpleProtocolStream.java @@ -18,6 +18,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.Nullable; @@ -43,6 +44,9 @@ public abstract class PulseaudioSimpleProtocolStream { protected @Nullable Socket clientSocket; + private ReentrantLock countClientLock = new ReentrantLock(); + private Integer countClient = 0; + private @Nullable ScheduledFuture scheduledDisconnection; public PulseaudioSimpleProtocolStream(PulseaudioHandler pulseaudioHandler, ScheduledExecutorService scheduler) { @@ -52,6 +56,7 @@ public PulseaudioSimpleProtocolStream(PulseaudioHandler pulseaudioHandler, Sched /** * Connect to pulseaudio with the simple protocol + * Will schedule an attempt for disconnection after timeout * * @throws IOException * @throws InterruptedException when interrupted during the loading module wait @@ -59,12 +64,13 @@ public PulseaudioSimpleProtocolStream(PulseaudioHandler pulseaudioHandler, Sched public void connectIfNeeded() throws IOException, InterruptedException { Socket clientSocketLocal = clientSocket; if (clientSocketLocal == null || !clientSocketLocal.isConnected() || clientSocketLocal.isClosed()) { - logger.debug("Simple TCP Stream connecting"); + logger.debug("Simple TCP Stream connecting for {}", getLabel(null)); String host = pulseaudioHandler.getHost(); int port = pulseaudioHandler.getSimpleTcpPortAndLoadModuleIfNecessary(); var clientSocketFinal = new Socket(host, port); clientSocketFinal.setSoTimeout(pulseaudioHandler.getBasicProtocolSOTimeout()); clientSocket = clientSocketFinal; + scheduleDisconnectIfNoClient(); } } @@ -74,7 +80,7 @@ public void connectIfNeeded() throws IOException, InterruptedException { public void disconnect() { final Socket clientSocketLocal = clientSocket; if (clientSocketLocal != null) { - logger.debug("Simple TCP Stream disconnecting"); + logger.debug("Simple TCP Stream disconnecting for {}", getLabel(null)); try { clientSocketLocal.close(); } catch (IOException ignored) { @@ -84,15 +90,23 @@ public void disconnect() { } } - public void scheduleDisconnect() { - var scheduledDisconnectionFinal = scheduledDisconnection; - if (scheduledDisconnectionFinal != null) { - scheduledDisconnectionFinal.cancel(true); - } - int idleTimeout = pulseaudioHandler.getIdleTimeout(); - if (idleTimeout > -1) { - logger.debug("Scheduling disconnect"); - scheduledDisconnection = scheduler.schedule(this::disconnect, idleTimeout, TimeUnit.MILLISECONDS); + private void scheduleDisconnectIfNoClient() { + countClientLock.lock(); + try { + if (countClient <= 0) { + var scheduledDisconnectionFinal = scheduledDisconnection; + if (scheduledDisconnectionFinal != null) { + logger.debug("Aborting next disconnect"); + scheduledDisconnectionFinal.cancel(true); + } + int idleTimeout = pulseaudioHandler.getIdleTimeout(); + if (idleTimeout > -1) { + logger.debug("Scheduling next disconnect"); + scheduledDisconnection = scheduler.schedule(this::disconnect, idleTimeout, TimeUnit.MILLISECONDS); + } + } + } finally { + countClientLock.unlock(); } } @@ -112,4 +126,36 @@ public String getLabel(@Nullable Locale locale) { var label = pulseaudioHandler.getThing().getLabel(); return label != null ? label : pulseaudioHandler.getThing().getUID().getId(); } + + protected void addClientCount() { + countClientLock.lock(); + try { + countClient += 1; + logger.debug("Adding new client for pulseaudio sink/source {}. Current count: {}", getLabel(null), + countClient); + if (countClient <= 0) { // safe against misuse + countClient = 1; + } + var scheduledDisconnectionFinal = scheduledDisconnection; + if (scheduledDisconnectionFinal != null) { + logger.debug("Aborting next disconnect"); + scheduledDisconnectionFinal.cancel(true); + } + } finally { + countClientLock.unlock(); + } + } + + protected void minusClientCount() { + countClientLock.lock(); + countClient -= 1; + logger.debug("Removing client for pulseaudio sink/source {}. Current count: {}", getLabel(null), countClient); + if (countClient < 0) { // safe against misuse + countClient = 0; + } + countClientLock.unlock(); + if (countClient <= 0) { + scheduleDisconnectIfNoClient(); + } + } } diff --git a/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/handler/PulseaudioHandler.java b/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/handler/PulseaudioHandler.java index a4f48ef0d2589..502f230198136 100644 --- a/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/handler/PulseaudioHandler.java +++ b/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/handler/PulseaudioHandler.java @@ -140,8 +140,6 @@ public void run() { } catch (InterruptedException i) { logger.info("Interrupted during sink audio connection: {}", i.getMessage()); return; - } finally { - audioSink.scheduleDisconnect(); } } }); @@ -194,8 +192,6 @@ public void run() { } catch (InterruptedException i) { logger.info("Interrupted during source audio connection: {}", i.getMessage()); return; - } finally { - audioSource.scheduleDisconnect(); } } }); From b8c91663739e3d38eec1fb902af1b77c6c300d95 Mon Sep 17 00:00:00 2001 From: Gwendal Roulleau Date: Sat, 3 Sep 2022 23:28:59 +0200 Subject: [PATCH 3/3] [pulseaudio] Small performance enhancement Avoid a costly synchronized operation for a method called very often. Signed-off-by: Gwendal Roulleau --- .../pulseaudio/internal/PulseAudioAudioSource.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseAudioAudioSource.java b/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseAudioAudioSource.java index ba5aba7cc05a7..f7acb4b0af28c 100644 --- a/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseAudioAudioSource.java +++ b/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseAudioAudioSource.java @@ -128,7 +128,18 @@ private synchronized void registerPipe(PipedOutputStream pipeOutput) { startPipeWrite(); } - private synchronized void startPipeWrite() { + /** + * As startPipeWrite is called for every chunk read, + * this wrapper method make the test before effectively + * locking the object (which is a costly operation) + */ + private void startPipeWrite() { + if (this.pipeWriteTask == null) { + startPipeWriteSynchronized(); + } + } + + private synchronized void startPipeWriteSynchronized() { if (this.pipeWriteTask == null) { this.pipeWriteTask = executor.submit(() -> { int lengthRead;