diff --git a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/AudioFormat.java b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/AudioFormat.java index 29f34200707..64987e08659 100644 --- a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/AudioFormat.java +++ b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/AudioFormat.java @@ -12,6 +12,7 @@ */ package org.openhab.core.audio; +import java.util.Objects; import java.util.Set; import org.eclipse.jdt.annotation.NonNullByDefault; @@ -23,9 +24,13 @@ * @author Harald Kuhn - Initial contribution * @author Kelly Davis - Modified to match discussion in #584 * @author Kai Kreuzer - Moved class, included constants, added toString + * @author Miguel Álvarez Díez - Add pcm signed format */ @NonNullByDefault public class AudioFormat { + // generic pcm signed format (no container) without any further constraints + public static final AudioFormat PCM_SIGNED = new AudioFormat(AudioFormat.CONTAINER_NONE, + AudioFormat.CODEC_PCM_SIGNED, null, null, null, null); // generic mp3 format without any further constraints public static final AudioFormat MP3 = new AudioFormat(AudioFormat.CONTAINER_NONE, AudioFormat.CODEC_MP3, null, null, @@ -436,29 +441,13 @@ public boolean isCompatible(@Nullable AudioFormat audioFormat) { @Override public boolean equals(@Nullable Object obj) { if (obj instanceof AudioFormat format) { - if (!(null == getCodec() ? null == format.getCodec() : getCodec().equals(format.getCodec()))) { - return false; - } - if (!(null == getContainer() ? null == format.getContainer() - : getContainer().equals(format.getContainer()))) { - return false; - } - if (!(null == isBigEndian() ? null == format.isBigEndian() : isBigEndian().equals(format.isBigEndian()))) { - return false; - } - if (!(null == getBitDepth() ? null == format.getBitDepth() : getBitDepth().equals(format.getBitDepth()))) { - return false; - } - if (!(null == getBitRate() ? null == format.getBitRate() : getBitRate().equals(format.getBitRate()))) { - return false; - } - if (!(null == getFrequency() ? null == format.getFrequency() - : getFrequency().equals(format.getFrequency()))) { - return false; - } - if (!(null == getChannels() ? null == format.getChannels() : getChannels().equals(format.getChannels()))) { - return false; - } + return Objects.equals(getCodec(), format.getCodec()) && // + Objects.equals(getContainer(), format.getContainer()) && // + Objects.equals(isBigEndian(), format.isBigEndian()) && // + Objects.equals(getBitDepth(), format.getBitDepth()) && // + Objects.equals(getBitRate(), format.getBitRate()) && // + Objects.equals(getFrequency(), format.getFrequency()) && // + Objects.equals(getChannels(), format.getChannels()); } return super.equals(obj); } diff --git a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/PipedAudioStream.java b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/PipedAudioStream.java new file mode 100644 index 00000000000..03b81881b48 --- /dev/null +++ b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/PipedAudioStream.java @@ -0,0 +1,283 @@ +/** + * Copyright (c) 2010-2024 Contributors to the openHAB project + * + * See the NOTICE file(s) distributed with this work for additional + * information. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.openhab.core.audio; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.OutputStream; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.util.LinkedList; +import java.util.Objects; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.eclipse.jdt.annotation.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is an implementation of an {@link AudioStream} used to transmit raw audio data to a sink. + * + * It just pipes the audio through it, the default pipe size is equal to 0.5 seconds of audio, + * the implementation locks if you set a pipe size lower to the byte length used to write. + * + * In order to support audio multiplex out of the box you should create a {@link PipedAudioStream.Group} instance + * which can be used to create the {@link PipedAudioStream} connected to it and then write to all of them though the + * group. + * + * @author Miguel Álvarez Díez - Initial contribution + */ +@NonNullByDefault +public class PipedAudioStream extends AudioStream { + private final AudioFormat format; + private final PipedInputStream pipedInput; + private final PipedOutputStream pipedOutput; + private final AtomicBoolean closed = new AtomicBoolean(false); + private final LinkedList onCloseChain = new LinkedList<>(); + + protected PipedAudioStream(AudioFormat format, int pipeSize, PipedOutputStream outputStream) throws IOException { + this.pipedOutput = outputStream; + this.pipedInput = new PipedInputStream(outputStream, pipeSize); + this.format = format; + } + + @Override + public AudioFormat getFormat() { + return this.format; + } + + @Override + public int read() throws IOException { + if (closed.get()) { + return -1; + } + return pipedInput.read(); + } + + @Override + public int read(byte @Nullable [] b) throws IOException { + if (closed.get()) { + return -1; + } + return pipedInput.read(b); + } + + @Override + public int read(byte @Nullable [] b, int off, int len) throws IOException { + if (closed.get()) { + return -1; + } + return pipedInput.read(b, off, len); + } + + @Override + public void close() throws IOException { + if (closed.getAndSet(true)) { + return; + } + if (this.onCloseChain.size() > 0) { + this.onCloseChain.forEach(Runnable::run); + this.onCloseChain.clear(); + } + pipedOutput.close(); + pipedInput.close(); + } + + /** + * Add a new handler that will be executed on stream close. + * It will be chained to the previous handler if any, and executed in order. + * + * @param onClose block to run on stream close + */ + public void onClose(Runnable onClose) { + this.onCloseChain.add(onClose); + } + + protected PipedOutputStream getOutputStream() { + return pipedOutput; + } + + /** + * Creates a new piped stream group used to open new streams and write data to them. + * + * Internal pipe size is 0.5s. + * + * @param format the audio format of the group audio streams + * @return a group instance + */ + public static Group newGroup(AudioFormat format) { + int pipeSize = Math.round(( // + (float) Objects.requireNonNull(format.getFrequency()) * // + (float) Objects.requireNonNull(format.getBitDepth()) * // + (float) Objects.requireNonNull(format.getChannels()) // + ) / 2f); + return new Group(format, pipeSize); + } + + /** + * Creates a new piped stream group used to open new streams and write data to them. + * + * @param format the audio format of the group audio streams + * @param pipeSize the pipe size of the created streams + * @return a piped stream group instance + */ + public static Group newGroup(AudioFormat format, int pipeSize) { + return new Group(format, pipeSize); + } + + /** + * The {@link PipedAudioStream.Group} is an {@link OutputStream} implementation that can be use to + * create one or more {@link PipedAudioStream} instances and write to them at once. + * + * The created {@link PipedAudioStream} instances are removed from the group when closed. + */ + public static class Group extends OutputStream { + private final int pipeSize; + private final AudioFormat format; + private final ConcurrentLinkedQueue openPipes = new ConcurrentLinkedQueue<>(); + private final Logger logger = LoggerFactory.getLogger(Group.class); + + protected Group(AudioFormat format, int pipeSize) { + this.pipeSize = pipeSize; + this.format = format; + } + + /** + * Creates a new {@link PipedAudioStream} connected to the group. + * The stream unregisters itself from the group on close. + * + * @return a new {@link PipedAudioStream} to pipe data written to the group + * @throws IOException when unable to create the stream + */ + public PipedAudioStream getAudioStreamInGroup() throws IOException { + var pipedOutput = new PipedOutputStream(); + var audioStream = new PipedAudioStream(format, pipeSize, pipedOutput); + if (!openPipes.add(audioStream)) { + audioStream.close(); + throw new IOException("Unable to add new piped stream to group"); + } + audioStream.onClose(() -> { + if (!openPipes.remove(audioStream)) { + logger.warn("Trying to remove an unregistered stream, this is not expected"); + } + }); + return audioStream; + } + + /** + * Returns true if this group has no streams connected. + * + * @return true if this group has no streams connected + */ + public boolean isEmpty() { + return openPipes.isEmpty(); + } + + /** + * Returns the number of streams connected. + * + * @return the number of streams connected + */ + public int size() { + return openPipes.size(); + } + + @Override + public void write(byte @Nullable [] b, int off, int len) { + synchronized (openPipes) { + for (var pipe : openPipes) { + try { + pipe.getOutputStream().write(b, off, len); + } catch (InterruptedIOException e) { + logger.warn("InterruptedIOException while writing to pipe: {}", e.getMessage()); + } catch (IOException e) { + logger.warn("IOException while writing to pipe: {}", e.getMessage()); + } catch (RuntimeException e) { + logger.warn("RuntimeException while writing to pipe: {}", e.getMessage()); + } + } + } + } + + @Override + public void write(int b) throws IOException { + synchronized (openPipes) { + for (var pipe : openPipes) { + try { + pipe.getOutputStream().write(b); + } catch (InterruptedIOException e) { + logger.warn("InterruptedIOException while writing to pipe: {}", e.getMessage()); + } catch (IOException e) { + logger.warn("IOException while writing to pipe: {}", e.getMessage()); + } catch (RuntimeException e) { + logger.warn("RuntimeException while writing to pipe: {}", e.getMessage()); + } + } + } + } + + @Override + public void write(byte @Nullable [] bytes) { + synchronized (openPipes) { + for (var pipe : openPipes) { + try { + pipe.getOutputStream().write(bytes); + } catch (InterruptedIOException e) { + logger.warn("InterruptedIOException on pipe flush: {}", e.getMessage()); + } catch (IOException e) { + logger.warn("IOException on pipe flush: {}", e.getMessage()); + } catch (RuntimeException e) { + logger.warn("RuntimeException on pipe flush: {}", e.getMessage()); + } + } + } + } + + @Override + public void flush() { + synchronized (openPipes) { + for (var pipe : openPipes) { + try { + pipe.getOutputStream().flush(); + } catch (InterruptedIOException e) { + logger.warn("InterruptedIOException while writing to pipe: {}", e.getMessage()); + } catch (IOException e) { + logger.warn("IOException while writing to pipe: {}", e.getMessage()); + } catch (RuntimeException e) { + logger.warn("RuntimeException while writing to pipe: {}", e.getMessage()); + } + } + } + } + + @Override + public void close() { + synchronized (openPipes) { + for (var pipe : openPipes) { + try { + pipe.close(); + } catch (InterruptedIOException e) { + logger.warn("InterruptedIOException closing pipe: {}", e.getMessage()); + } catch (IOException e) { + logger.warn("IOException closing pipe: {}", e.getMessage()); + } catch (RuntimeException e) { + logger.warn("RuntimeException closing pipe: {}", e.getMessage()); + } + } + openPipes.clear(); + } + } + } +} diff --git a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/internal/javasound/JavaSoundAudioSink.java b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/internal/javasound/JavaSoundAudioSink.java index eb3c39ff445..deae55c4cc9 100644 --- a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/internal/javasound/JavaSoundAudioSink.java +++ b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/internal/javasound/JavaSoundAudioSink.java @@ -33,6 +33,7 @@ import org.openhab.core.audio.AudioSink; import org.openhab.core.audio.AudioSinkAsync; import org.openhab.core.audio.AudioStream; +import org.openhab.core.audio.PipedAudioStream; import org.openhab.core.audio.URLAudioStream; import org.openhab.core.audio.UnsupportedAudioFormatException; import org.openhab.core.audio.UnsupportedAudioStreamException; @@ -51,13 +52,14 @@ * * @author Kai Kreuzer - Initial contribution and API * @author Christoph Weitkamp - Added getSupportedStreams() and UnsupportedAudioStreamException + * @author Miguel Álvarez Díez - Added piped audio stream support * */ @NonNullByDefault @Component(service = AudioSink.class, immediate = true) public class JavaSoundAudioSink extends AudioSinkAsync { - private static final Logger LOGGER = LoggerFactory.getLogger(JavaSoundAudioSink.class); + private final Logger logger = LoggerFactory.getLogger(JavaSoundAudioSink.class); private boolean isMac = false; private @Nullable PercentType macVolumeValue = null; @@ -65,7 +67,8 @@ public class JavaSoundAudioSink extends AudioSinkAsync { private NamedThreadFactory threadFactory = new NamedThreadFactory("audio"); - private static final Set SUPPORTED_AUDIO_FORMATS = Set.of(AudioFormat.MP3, AudioFormat.WAV); + private static final Set SUPPORTED_AUDIO_FORMATS = Set.of(AudioFormat.MP3, AudioFormat.WAV, + AudioFormat.PCM_SIGNED); // we accept any stream private static final Set> SUPPORTED_AUDIO_STREAMS = Set.of(AudioStream.class); @@ -81,14 +84,24 @@ protected void activate(BundleContext context) { @Override public synchronized void processAsynchronously(final @Nullable AudioStream audioStream) throws UnsupportedAudioFormatException, UnsupportedAudioStreamException { - if (audioStream != null && !AudioFormat.CODEC_MP3.equals(audioStream.getFormat().getCodec())) { + if (audioStream instanceof PipedAudioStream pipedAudioStream + && AudioFormat.PCM_SIGNED.isCompatible(pipedAudioStream.getFormat())) { + pipedAudioStream.onClose(() -> playbackFinished(pipedAudioStream)); + AudioPlayer audioPlayer = new AudioPlayer(pipedAudioStream); + audioPlayer.start(); + try { + audioPlayer.join(); + } catch (InterruptedException e) { + logger.debug("Audio stream has been interrupted."); + } + } else if (audioStream != null && !AudioFormat.CODEC_MP3.equals(audioStream.getFormat().getCodec())) { AudioPlayer audioPlayer = new AudioPlayer(audioStream); audioPlayer.start(); try { audioPlayer.join(); playbackFinished(audioStream); } catch (InterruptedException e) { - LOGGER.error("Playing audio has been interrupted."); + logger.error("Playing audio has been interrupted."); } } else { if (audioStream == null || audioStream instanceof URLAudioStream) { @@ -106,7 +119,7 @@ public synchronized void processAsynchronously(final @Nullable AudioStream audio // we start a new continuous stream and store its handle playInThread(audioStream, true); } catch (JavaLayerException e) { - LOGGER.error("An exception occurred while playing url audio stream : '{}'", e.getMessage()); + logger.error("An exception occurred while playing url audio stream : '{}'", e.getMessage()); } return; } @@ -115,7 +128,7 @@ public synchronized void processAsynchronously(final @Nullable AudioStream audio try { playInThread(audioStream, false); } catch (JavaLayerException e) { - LOGGER.error("An exception occurred while playing audio : '{}'", e.getMessage()); + logger.error("An exception occurred while playing audio : '{}'", e.getMessage()); } } } @@ -131,7 +144,7 @@ private void playInThread(final AudioStream audioStream, boolean store) throws J try { streamPlayerFinal.play(); } catch (Exception e) { - LOGGER.error("An exception occurred while playing audio : '{}'", e.getMessage()); + logger.error("An exception occurred while playing audio : '{}'", e.getMessage()); } finally { streamPlayerFinal.close(); playbackFinished(audioStream); @@ -179,7 +192,7 @@ public PercentType getVolume() throws IOException { if (volumes[0] != null) { return new PercentType(Math.round(volumes[0] * 100f)); } else { - LOGGER.warn("Cannot determine master volume level - assuming 100%"); + logger.warn("Cannot determine master volume level - assuming 100%"); return PercentType.HUNDRED; } } else { @@ -196,7 +209,7 @@ public PercentType getVolume() throws IOException { cachedVolume = new PercentType(value); macVolumeValue = cachedVolume; } catch (NumberFormatException e) { - LOGGER.warn("Cannot determine master volume level, received response '{}' - assuming 100%", value); + logger.warn("Cannot determine master volume level, received response '{}' - assuming 100%", value); return PercentType.HUNDRED; } } @@ -236,7 +249,7 @@ private void runVolumeCommand(Function closure) { } port.close(); } catch (LineUnavailableException e) { - LOGGER.error("Cannot access master volume control", e); + logger.error("Cannot access master volume control", e); } } } diff --git a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/internal/javasound/JavaSoundAudioSource.java b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/internal/javasound/JavaSoundAudioSource.java index b464fb938a5..571f4665b85 100644 --- a/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/internal/javasound/JavaSoundAudioSource.java +++ b/bundles/org.openhab.core.audio/src/main/java/org/openhab/core/audio/internal/javasound/JavaSoundAudioSource.java @@ -13,13 +13,8 @@ package org.openhab.core.audio.internal.javasound; import java.io.IOException; -import java.io.InputStream; -import java.io.InterruptedIOException; -import java.io.PipedInputStream; -import java.io.PipedOutputStream; import java.util.Locale; import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; @@ -32,6 +27,7 @@ import org.openhab.core.audio.AudioFormat; import org.openhab.core.audio.AudioSource; import org.openhab.core.audio.AudioStream; +import org.openhab.core.audio.PipedAudioStream; import org.openhab.core.common.ThreadPoolManager; import org.osgi.service.component.annotations.Component; import org.slf4j.Logger; @@ -43,6 +39,7 @@ * @author Kelly Davis - Initial contribution and API * @author Kai Kreuzer - Refactored and stabilized * @author Miguel Álvarez - Share microphone line only under Windows OS + * @author Miguel Álvarez - Share microphone line on all OS using piped audio streams * */ @NonNullByDefault @@ -63,22 +60,17 @@ public class JavaSoundAudioSource implements AudioSource { private final AudioFormat audioFormat = convertAudioFormat(format); /** - * Running on Windows OS - */ - private final boolean windowsOS = System.getProperty("os.name", "Unknown").startsWith("Win"); - - /** - * TargetDataLine for sharing the mic on Windows OS due to limitations + * TargetDataLine for sharing the mic */ private @Nullable TargetDataLine microphone; /** - * Set for control microphone sharing on Windows OS + * Group for microphone sharing */ - private final ConcurrentLinkedQueue openStreamRefs = new ConcurrentLinkedQueue<>(); + private final PipedAudioStream.Group streamGroup = PipedAudioStream.newGroup(audioFormat); /** - * Task for writing microphone data to each of the open sources on Windows OS + * Task for writing microphone data to each of the open sources */ private @Nullable Future pipeWriteTask; @@ -106,50 +98,35 @@ public AudioStream getInputStream(AudioFormat expectedFormat) throws AudioExcept if (!expectedFormat.isCompatible(audioFormat)) { throw new AudioException("Cannot produce streams in format " + expectedFormat); } - // on OSs other than windows we can open multiple lines for the microphone - if (!windowsOS) { - TargetDataLine microphone = initMicrophone(format); - var inputStream = new JavaSoundInputStream(new InputStream() { - @Override - public int read() throws IOException { - return microphone.available(); - } - - @Override - public int read(byte @Nullable [] b, int off, int len) throws IOException { - return microphone.read(b, off, len); - } - - @Override - public void close() throws IOException { - microphone.close(); - } - }, audioFormat); - microphone.start(); - return inputStream; - } - // on Windows OS we share the microphone line - synchronized (openStreamRefs) { + synchronized (streamGroup) { TargetDataLine microphone = this.microphone; if (microphone == null) { microphone = initMicrophone(format); this.microphone = microphone; } - var pipedOutputStream = new PipedOutputStream(); - PipedInputStream pipedInputStream; + PipedAudioStream audioStream; try { - pipedInputStream = new PipedInputStream(pipedOutputStream, 1024 * 10) { - @Override - public void close() throws IOException { - unregisterPipe(pipedOutputStream); - super.close(); + audioStream = streamGroup.getAudioStreamInGroup(); + audioStream.onClose(() -> { + synchronized (streamGroup) { + if (streamGroup.isEmpty()) { + Future pipeWriteTask = this.pipeWriteTask; + if (pipeWriteTask != null) { + pipeWriteTask.cancel(true); + this.pipeWriteTask = null; + } + TargetDataLine microphoneDataLine = this.microphone; + if (microphoneDataLine != null) { + microphoneDataLine.close(); + this.microphone = null; + } + } } - }; + }); } catch (IOException ie) { throw new AudioException("Cannot open stream pipe: " + ie.getMessage()); } - openStreamRefs.add(pipedOutputStream); - var inputStream = new JavaSoundInputStream(pipedInputStream, audioFormat); + var inputStream = new JavaSoundInputStream(audioStream, audioFormat); microphone.start(); startPipeWrite(); return inputStream; @@ -161,30 +138,13 @@ private void startPipeWrite() { this.pipeWriteTask = executor.submit(() -> { int lengthRead; byte[] buffer = new byte[1024]; - while (!openStreamRefs.isEmpty()) { + while (!streamGroup.isEmpty()) { TargetDataLine stream = this.microphone; if (stream != null) { try { lengthRead = stream.read(buffer, 0, buffer.length); - for (PipedOutputStream output : openStreamRefs) { - try { - output.write(buffer, 0, lengthRead); - if (openStreamRefs.contains(output)) { - output.flush(); - } - } catch (InterruptedIOException e) { - if (openStreamRefs.isEmpty()) { - // task has been ended while writing - return; - } - logger.warn("InterruptedIOException while writing to source pipe: {}", - e.getMessage()); - } catch (IOException e) { - logger.warn("IOException while writing to source pipe: {}", e.getMessage()); - } catch (RuntimeException e) { - logger.warn("RuntimeException while writing to source pipe: {}", e.getMessage()); - } - } + streamGroup.write(buffer, 0, lengthRead); + streamGroup.flush(); } catch (RuntimeException e) { logger.warn("RuntimeException while reading from JavaSound source: {}", e.getMessage()); } @@ -197,32 +157,6 @@ private void startPipeWrite() { } } - private void unregisterPipe(PipedOutputStream pipedOutputStream) { - synchronized (openStreamRefs) { - openStreamRefs.remove(pipedOutputStream); - try { - Thread.sleep(0); - } catch (InterruptedException ignored) { - } - if (openStreamRefs.isEmpty()) { - Future pipeWriteTask = this.pipeWriteTask; - if (pipeWriteTask != null) { - pipeWriteTask.cancel(true); - this.pipeWriteTask = null; - } - TargetDataLine microphone = this.microphone; - if (microphone != null) { - microphone.close(); - this.microphone = null; - } - } - try { - pipedOutputStream.close(); - } catch (IOException ignored) { - } - } - } - @Override public String toString() { return "javasound"; @@ -235,7 +169,7 @@ public String toString() { * @return The converted AudioFormat */ private static AudioFormat convertAudioFormat(javax.sound.sampled.AudioFormat audioFormat) { - String container = AudioFormat.CONTAINER_WAVE; + String container = AudioFormat.CONTAINER_NONE; String codec = audioFormat.getEncoding().toString();