Skip to content

Commit

Permalink
[Audio] Add piped audio stream and fix raw PCM streams format (openha…
Browse files Browse the repository at this point in the history
…b#3960)

* [Audio] Add piped audio stream and fix pcm format usage

Signed-off-by: Miguel Álvarez <miguelwork92@gmail.com>
Signed-off-by: Ciprian Pascu <contact@ciprianpascu.ro>
  • Loading branch information
GiviMAD authored and Ciprian Pascu committed Jan 17, 2024
1 parent 6a280c9 commit 8034416
Show file tree
Hide file tree
Showing 4 changed files with 347 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package org.openhab.core.audio;

import java.util.Objects;
import java.util.Set;

import org.eclipse.jdt.annotation.NonNullByDefault;
Expand All @@ -23,9 +24,13 @@
* @author Harald Kuhn - Initial contribution
* @author Kelly Davis - Modified to match discussion in #584
* @author Kai Kreuzer - Moved class, included constants, added toString
* @author Miguel Álvarez Díez - Add pcm signed format
*/
@NonNullByDefault
public class AudioFormat {
// generic pcm signed format (no container) without any further constraints
public static final AudioFormat PCM_SIGNED = new AudioFormat(AudioFormat.CONTAINER_NONE,
AudioFormat.CODEC_PCM_SIGNED, null, null, null, null);

// generic mp3 format without any further constraints
public static final AudioFormat MP3 = new AudioFormat(AudioFormat.CONTAINER_NONE, AudioFormat.CODEC_MP3, null, null,
Expand Down Expand Up @@ -436,29 +441,13 @@ public boolean isCompatible(@Nullable AudioFormat audioFormat) {
@Override
public boolean equals(@Nullable Object obj) {
if (obj instanceof AudioFormat format) {
if (!(null == getCodec() ? null == format.getCodec() : getCodec().equals(format.getCodec()))) {
return false;
}
if (!(null == getContainer() ? null == format.getContainer()
: getContainer().equals(format.getContainer()))) {
return false;
}
if (!(null == isBigEndian() ? null == format.isBigEndian() : isBigEndian().equals(format.isBigEndian()))) {
return false;
}
if (!(null == getBitDepth() ? null == format.getBitDepth() : getBitDepth().equals(format.getBitDepth()))) {
return false;
}
if (!(null == getBitRate() ? null == format.getBitRate() : getBitRate().equals(format.getBitRate()))) {
return false;
}
if (!(null == getFrequency() ? null == format.getFrequency()
: getFrequency().equals(format.getFrequency()))) {
return false;
}
if (!(null == getChannels() ? null == format.getChannels() : getChannels().equals(format.getChannels()))) {
return false;
}
return Objects.equals(getCodec(), format.getCodec()) && //
Objects.equals(getContainer(), format.getContainer()) && //
Objects.equals(isBigEndian(), format.isBigEndian()) && //
Objects.equals(getBitDepth(), format.getBitDepth()) && //
Objects.equals(getBitRate(), format.getBitRate()) && //
Objects.equals(getFrequency(), format.getFrequency()) && //
Objects.equals(getChannels(), format.getChannels());
}
return super.equals(obj);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
/**
* Copyright (c) 2010-2024 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.core.audio;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.LinkedList;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This is an implementation of an {@link AudioStream} used to transmit raw audio data to a sink.
*
* It just pipes the audio through it, the default pipe size is equal to 0.5 seconds of audio,
* the implementation locks if you set a pipe size lower to the byte length used to write.
*
* In order to support audio multiplex out of the box you should create a {@link PipedAudioStream.Group} instance
* which can be used to create the {@link PipedAudioStream} connected to it and then write to all of them though the
* group.
*
* @author Miguel Álvarez Díez - Initial contribution
*/
@NonNullByDefault
public class PipedAudioStream extends AudioStream {
private final AudioFormat format;
private final PipedInputStream pipedInput;
private final PipedOutputStream pipedOutput;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final LinkedList<Runnable> onCloseChain = new LinkedList<>();

protected PipedAudioStream(AudioFormat format, int pipeSize, PipedOutputStream outputStream) throws IOException {
this.pipedOutput = outputStream;
this.pipedInput = new PipedInputStream(outputStream, pipeSize);
this.format = format;
}

@Override
public AudioFormat getFormat() {
return this.format;
}

@Override
public int read() throws IOException {
if (closed.get()) {
return -1;
}
return pipedInput.read();
}

@Override
public int read(byte @Nullable [] b) throws IOException {
if (closed.get()) {
return -1;
}
return pipedInput.read(b);
}

@Override
public int read(byte @Nullable [] b, int off, int len) throws IOException {
if (closed.get()) {
return -1;
}
return pipedInput.read(b, off, len);
}

@Override
public void close() throws IOException {
if (closed.getAndSet(true)) {
return;
}
if (this.onCloseChain.size() > 0) {
this.onCloseChain.forEach(Runnable::run);
this.onCloseChain.clear();
}
pipedOutput.close();
pipedInput.close();
}

/**
* Add a new handler that will be executed on stream close.
* It will be chained to the previous handler if any, and executed in order.
*
* @param onClose block to run on stream close
*/
public void onClose(Runnable onClose) {
this.onCloseChain.add(onClose);
}

protected PipedOutputStream getOutputStream() {
return pipedOutput;
}

/**
* Creates a new piped stream group used to open new streams and write data to them.
*
* Internal pipe size is 0.5s.
*
* @param format the audio format of the group audio streams
* @return a group instance
*/
public static Group newGroup(AudioFormat format) {
int pipeSize = Math.round(( //
(float) Objects.requireNonNull(format.getFrequency()) * //
(float) Objects.requireNonNull(format.getBitDepth()) * //
(float) Objects.requireNonNull(format.getChannels()) //
) / 2f);
return new Group(format, pipeSize);
}

/**
* Creates a new piped stream group used to open new streams and write data to them.
*
* @param format the audio format of the group audio streams
* @param pipeSize the pipe size of the created streams
* @return a piped stream group instance
*/
public static Group newGroup(AudioFormat format, int pipeSize) {
return new Group(format, pipeSize);
}

/**
* The {@link PipedAudioStream.Group} is an {@link OutputStream} implementation that can be use to
* create one or more {@link PipedAudioStream} instances and write to them at once.
*
* The created {@link PipedAudioStream} instances are removed from the group when closed.
*/
public static class Group extends OutputStream {
private final int pipeSize;
private final AudioFormat format;
private final ConcurrentLinkedQueue<PipedAudioStream> openPipes = new ConcurrentLinkedQueue<>();
private final Logger logger = LoggerFactory.getLogger(Group.class);

protected Group(AudioFormat format, int pipeSize) {
this.pipeSize = pipeSize;
this.format = format;
}

/**
* Creates a new {@link PipedAudioStream} connected to the group.
* The stream unregisters itself from the group on close.
*
* @return a new {@link PipedAudioStream} to pipe data written to the group
* @throws IOException when unable to create the stream
*/
public PipedAudioStream getAudioStreamInGroup() throws IOException {
var pipedOutput = new PipedOutputStream();
var audioStream = new PipedAudioStream(format, pipeSize, pipedOutput);
if (!openPipes.add(audioStream)) {
audioStream.close();
throw new IOException("Unable to add new piped stream to group");
}
audioStream.onClose(() -> {
if (!openPipes.remove(audioStream)) {
logger.warn("Trying to remove an unregistered stream, this is not expected");
}
});
return audioStream;
}

/**
* Returns true if this group has no streams connected.
*
* @return true if this group has no streams connected
*/
public boolean isEmpty() {
return openPipes.isEmpty();
}

/**
* Returns the number of streams connected.
*
* @return the number of streams connected
*/
public int size() {
return openPipes.size();
}

@Override
public void write(byte @Nullable [] b, int off, int len) {
synchronized (openPipes) {
for (var pipe : openPipes) {
try {
pipe.getOutputStream().write(b, off, len);
} catch (InterruptedIOException e) {
logger.warn("InterruptedIOException while writing to pipe: {}", e.getMessage());
} catch (IOException e) {
logger.warn("IOException while writing to pipe: {}", e.getMessage());
} catch (RuntimeException e) {
logger.warn("RuntimeException while writing to pipe: {}", e.getMessage());
}
}
}
}

@Override
public void write(int b) throws IOException {
synchronized (openPipes) {
for (var pipe : openPipes) {
try {
pipe.getOutputStream().write(b);
} catch (InterruptedIOException e) {
logger.warn("InterruptedIOException while writing to pipe: {}", e.getMessage());
} catch (IOException e) {
logger.warn("IOException while writing to pipe: {}", e.getMessage());
} catch (RuntimeException e) {
logger.warn("RuntimeException while writing to pipe: {}", e.getMessage());
}
}
}
}

@Override
public void write(byte @Nullable [] bytes) {
synchronized (openPipes) {
for (var pipe : openPipes) {
try {
pipe.getOutputStream().write(bytes);
} catch (InterruptedIOException e) {
logger.warn("InterruptedIOException on pipe flush: {}", e.getMessage());
} catch (IOException e) {
logger.warn("IOException on pipe flush: {}", e.getMessage());
} catch (RuntimeException e) {
logger.warn("RuntimeException on pipe flush: {}", e.getMessage());
}
}
}
}

@Override
public void flush() {
synchronized (openPipes) {
for (var pipe : openPipes) {
try {
pipe.getOutputStream().flush();
} catch (InterruptedIOException e) {
logger.warn("InterruptedIOException while writing to pipe: {}", e.getMessage());
} catch (IOException e) {
logger.warn("IOException while writing to pipe: {}", e.getMessage());
} catch (RuntimeException e) {
logger.warn("RuntimeException while writing to pipe: {}", e.getMessage());
}
}
}
}

@Override
public void close() {
synchronized (openPipes) {
for (var pipe : openPipes) {
try {
pipe.close();
} catch (InterruptedIOException e) {
logger.warn("InterruptedIOException closing pipe: {}", e.getMessage());
} catch (IOException e) {
logger.warn("IOException closing pipe: {}", e.getMessage());
} catch (RuntimeException e) {
logger.warn("RuntimeException closing pipe: {}", e.getMessage());
}
}
openPipes.clear();
}
}
}
}
Loading

0 comments on commit 8034416

Please sign in to comment.