Skip to content

Commit

Permalink
improve piped stream implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Miguel Álvarez <miguelwork92@gmail.com>
  • Loading branch information
GiviMAD committed Dec 31, 2023
1 parent c9c7dc2 commit 8b667c4
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.Objects;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -37,13 +38,16 @@
*
* @author Miguel Álvarez Díez - Initial contribution
*/
@NonNullByDefault
public class PipedAudioStream extends AudioStream {
private final AudioFormat format;
private final PipedInputStream pipedInput;
private AtomicBoolean closed = new AtomicBoolean(false);
private @Nullable Runnable onCloseChain;
private final PipedOutputStream pipedOutput;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final LinkedList<Runnable> onCloseChain = new LinkedList<>();

protected PipedAudioStream(AudioFormat format, int pipeSize, PipedOutputStream outputStream) throws IOException {
this.pipedOutput = outputStream;
this.pipedInput = new PipedInputStream(outputStream, pipeSize);
this.format = format;
}
Expand Down Expand Up @@ -82,9 +86,11 @@ public void close() throws IOException {
if (closed.getAndSet(true)) {
return;
}
if (this.onCloseChain != null) {
this.onCloseChain.run();
if (this.onCloseChain.size() > 0) {
this.onCloseChain.forEach(Runnable::run);
this.onCloseChain.clear();
}
pipedOutput.close();
pipedInput.close();
}

Expand All @@ -95,14 +101,11 @@ public void close() throws IOException {
* @param onClose block to run on stream close.
*/
public void onClose(Runnable onClose) {
final @Nullable Runnable currentOnClose = this.onCloseChain;
// chain previous on close call
this.onCloseChain = () -> {
if (currentOnClose != null) {
currentOnClose.run();
}
onClose.run();
};
this.onCloseChain.add(onClose);
}

protected PipedOutputStream getOutputStream() {
return pipedOutput;
}

/**
Expand All @@ -114,12 +117,7 @@ public void onClose(Runnable onClose) {
* @return a group instance.
*/
public static Group newGroup(AudioFormat format) {
int pipeSize = Math.round(( //
(float) Objects.requireNonNull(format.getFrequency()) * //
(float) Objects.requireNonNull(format.getBitDepth()) * //
(float) Objects.requireNonNull(format.getChannels()) //
));
return new Group(format, pipeSize);
return new Group(format, 1024 * 10);
}

/**
Expand All @@ -143,8 +141,8 @@ public static Group newGroup(AudioFormat format, int pipeSize) {
public static class Group extends OutputStream {
private final int pipeSize;
private final AudioFormat format;
private ConcurrentLinkedQueue<PipedOutputStream> openPipes = new ConcurrentLinkedQueue<>();
private Logger logger = LoggerFactory.getLogger(Group.class);
private final ConcurrentLinkedQueue<PipedAudioStream> openPipes = new ConcurrentLinkedQueue<>();
private final Logger logger = LoggerFactory.getLogger(Group.class);

protected Group(AudioFormat format, int pipeSize) {
this.pipeSize = pipeSize;
Expand All @@ -160,11 +158,11 @@ protected Group(AudioFormat format, int pipeSize) {
*/
public PipedAudioStream getAudioStreamInGroup() throws IOException {
var pipedOutput = new PipedOutputStream();
if (!openPipes.add(pipedOutput)) {
pipedOutput.close();
var audioStream = new PipedAudioStream(format, pipeSize, pipedOutput);
if (!openPipes.add(audioStream)) {
audioStream.close();
throw new IOException("Unable to add new piped stream to group");
}
var audioStream = new PipedAudioStream(format, pipeSize, pipedOutput);
audioStream.onClose(() -> {
if (!openPipes.remove(pipedOutput)) {
logger.warn("Trying to remove an unregistered stream, this is not expected");
Expand All @@ -183,11 +181,11 @@ public boolean isEmpty() {
}

@Override
public void write(byte b[], int off, int len) {
public void write(byte @Nullable [] b, int off, int len) {
synchronized (openPipes) {
for (var pipe : openPipes) {
try {
pipe.write(b, off, len);
pipe.getOutputStream().write(b, off, len);
} catch (InterruptedIOException e) {
logger.warn("InterruptedIOException while writing to pipe: {}", e.getMessage());
} catch (IOException e) {
Expand All @@ -204,7 +202,7 @@ public void write(int b) throws IOException {
synchronized (openPipes) {
for (var pipe : openPipes) {
try {
pipe.write(b);
pipe.getOutputStream().write(b);
} catch (InterruptedIOException e) {
logger.warn("InterruptedIOException while writing to pipe: {}", e.getMessage());
} catch (IOException e) {
Expand All @@ -217,11 +215,11 @@ public void write(int b) throws IOException {
}

@Override
public void write(byte[] bytes) {
public void write(byte @Nullable [] bytes) {
synchronized (openPipes) {
for (var pipe : openPipes) {
try {
pipe.write(bytes);
pipe.getOutputStream().write(bytes);
} catch (InterruptedIOException e) {
logger.warn("InterruptedIOException on pipe flush: {}", e.getMessage());
} catch (IOException e) {
Expand All @@ -238,7 +236,7 @@ public void flush() {
synchronized (openPipes) {
for (var pipe : openPipes) {
try {
pipe.flush();
pipe.getOutputStream().flush();
} catch (InterruptedIOException e) {
logger.warn("InterruptedIOException while writing to pipe: {}", e.getMessage());
} catch (IOException e) {
Expand All @@ -249,5 +247,23 @@ public void flush() {
}
}
}

@Override
public void close() {
synchronized (openPipes) {
for (var pipe : openPipes) {
try {
pipe.close();
} catch (InterruptedIOException e) {
logger.warn("InterruptedIOException closing pipe: {}", e.getMessage());
} catch (IOException e) {
logger.warn("IOException closing pipe: {}", e.getMessage());
} catch (RuntimeException e) {
logger.warn("RuntimeException closing pipe: {}", e.getMessage());
}
}
openPipes.clear();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ private void startPipeWrite() {
if (this.pipeWriteTask == null) {
this.pipeWriteTask = executor.submit(() -> {
int lengthRead;
byte[] buffer = new byte[2024];
byte[] buffer = new byte[1024];
while (!streamGroup.isEmpty()) {
TargetDataLine stream = this.microphone;
if (stream != null) {
Expand Down

0 comments on commit 8b667c4

Please sign in to comment.