Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pulseaudio] Apply real disconnection when needed #13338

Merged
merged 3 commits into from
Sep 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@ public void process(@Nullable AudioStream audioStream)
if (audioStream == null) {
return;
}
addClientCount();
try (ConvertedInputStream normalizedPCMStream = new ConvertedInputStream(audioStream)) {
for (int countAttempt = 1; countAttempt <= 2; countAttempt++) { // two attempts allowed
try {
connectIfNeeded();
final Socket clientSocketLocal = clientSocket;
if (clientSocketLocal != null) {
// send raw audio to the socket and to pulse audio
setIdle(false);
Instant start = Instant.now();
normalizedPCMStream.transferTo(clientSocketLocal.getOutputStream());
if (normalizedPCMStream.getDuration() != -1) { // ensure, if the sound has a duration
Expand Down Expand Up @@ -108,9 +108,8 @@ public void process(@Nullable AudioStream audioStream)
throw new UnsupportedAudioFormatException("Cannot send sound to the pulseaudio sink",
audioStream.getFormat(), e);
} finally {
scheduleDisconnect();
minusClientCount();
}
setIdle(true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;

import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
Expand Down Expand Up @@ -84,7 +83,6 @@ public AudioStream getInputStream(AudioFormat audioFormat) throws AudioException
if (!audioFormat.isCompatible(sourceFormat)) {
throw new AudioException("Incompatible audio format requested");
}
setIdle(true);
var pipeOutput = new PipedOutputStream();
var pipeInput = new PipedInputStream(pipeOutput, 1024 * 10) {
@Override
Expand All @@ -95,14 +93,9 @@ public void close() throws IOException {
};
registerPipe(pipeOutput);
// get raw audio from the pulse audio socket
return new PulseAudioStream(sourceFormat, pipeInput, (idle) -> {
setIdle(idle);
if (idle) {
scheduleDisconnect();
} else {
// ensure pipe is writing
startPipeWrite();
}
return new PulseAudioStream(sourceFormat, pipeInput, () -> {
GiviMAD marked this conversation as resolved.
Show resolved Hide resolved
// ensure pipe is writing
startPipeWrite();
});
} catch (IOException e) {
disconnect(); // disconnect to force clear connection in case of socket not cleanly shutdown
Expand All @@ -113,31 +106,40 @@ public void close() throws IOException {
logger.warn(
"Error while trying to get audio from pulseaudio audio source. Cannot connect to {}:{}, error: {}",
pulseaudioHandler.getHost(), port, e.getMessage());
setIdle(true);
throw e;
}
} catch (InterruptedException ie) {
logger.info("Interrupted during source audio connection: {}", ie.getMessage());
setIdle(true);
throw new AudioException(ie);
}
countAttempt++;
}
} catch (IOException e) {
throw new AudioException(e);
} finally {
scheduleDisconnect();
}
setIdle(true);
throw new AudioException("Unable to create input stream");
}

private synchronized void registerPipe(PipedOutputStream pipeOutput) {
this.pipeOutputs.add(pipeOutput);
boolean isAdded = this.pipeOutputs.add(pipeOutput);
if (isAdded) {
addClientCount();
}
startPipeWrite();
}

private synchronized void startPipeWrite() {
/**
* As startPipeWrite is called for every chunk read,
* this wrapper method make the test before effectively
* locking the object (which is a costly operation)
*/
private void startPipeWrite() {
if (this.pipeWriteTask == null) {
startPipeWriteSynchronized();
}
}

private synchronized void startPipeWriteSynchronized() {
if (this.pipeWriteTask == null) {
this.pipeWriteTask = executor.submit(() -> {
int lengthRead;
Expand Down Expand Up @@ -191,7 +193,10 @@ private synchronized void startPipeWrite() {
}

private synchronized void unregisterPipe(PipedOutputStream pipeOutput) {
this.pipeOutputs.remove(pipeOutput);
boolean isRemoved = this.pipeOutputs.remove(pipeOutput);
if (isRemoved) {
minusClientCount();
}
try {
Thread.sleep(0);
} catch (InterruptedException ignored) {
Expand Down Expand Up @@ -243,13 +248,13 @@ static class PulseAudioStream extends AudioStream {
private final Logger logger = LoggerFactory.getLogger(PulseAudioAudioSource.class);
private final AudioFormat format;
private final InputStream input;
private final Consumer<Boolean> setIdle;
private final Runnable activity;
GiviMAD marked this conversation as resolved.
Show resolved Hide resolved
private boolean closed = false;

public PulseAudioStream(AudioFormat format, InputStream input, Consumer<Boolean> setIdle) {
public PulseAudioStream(AudioFormat format, InputStream input, Runnable activity) {
this.input = input;
this.format = format;
this.setIdle = setIdle;
this.activity = activity;
}

@Override
Expand Down Expand Up @@ -282,14 +287,13 @@ public int read(byte @Nullable [] b, int off, int len) throws IOException {
if (closed) {
throw new IOException("Stream is closed");
}
setIdle.accept(false);
activity.run();
return input.read(b, off, len);
}

@Override
public void close() throws IOException {
closed = true;
setIdle.accept(true);
input.close();
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
Expand All @@ -43,7 +44,8 @@ public abstract class PulseaudioSimpleProtocolStream {

protected @Nullable Socket clientSocket;

private boolean isIdle = true;
private ReentrantLock countClientLock = new ReentrantLock();
private Integer countClient = 0;

private @Nullable ScheduledFuture<?> scheduledDisconnection;

Expand All @@ -54,19 +56,21 @@ public PulseaudioSimpleProtocolStream(PulseaudioHandler pulseaudioHandler, Sched

/**
* Connect to pulseaudio with the simple protocol
* Will schedule an attempt for disconnection after timeout
*
* @throws IOException
* @throws InterruptedException when interrupted during the loading module wait
*/
public void connectIfNeeded() throws IOException, InterruptedException {
Socket clientSocketLocal = clientSocket;
if (clientSocketLocal == null || !clientSocketLocal.isConnected() || clientSocketLocal.isClosed()) {
logger.debug("Simple TCP Stream connecting");
logger.debug("Simple TCP Stream connecting for {}", getLabel(null));
String host = pulseaudioHandler.getHost();
int port = pulseaudioHandler.getSimpleTcpPortAndLoadModuleIfNecessary();
var clientSocketFinal = new Socket(host, port);
clientSocketFinal.setSoTimeout(pulseaudioHandler.getBasicProtocolSOTimeout());
clientSocket = clientSocketFinal;
scheduleDisconnectIfNoClient();
}
}

Expand All @@ -75,8 +79,8 @@ public void connectIfNeeded() throws IOException, InterruptedException {
*/
public void disconnect() {
final Socket clientSocketLocal = clientSocket;
if (clientSocketLocal != null && isIdle) {
logger.debug("Simple TCP Stream disconnecting");
if (clientSocketLocal != null) {
logger.debug("Simple TCP Stream disconnecting for {}", getLabel(null));
try {
clientSocketLocal.close();
} catch (IOException ignored) {
Expand All @@ -86,15 +90,23 @@ public void disconnect() {
}
}

public void scheduleDisconnect() {
var scheduledDisconnectionFinal = scheduledDisconnection;
if (scheduledDisconnectionFinal != null) {
scheduledDisconnectionFinal.cancel(true);
}
int idleTimeout = pulseaudioHandler.getIdleTimeout();
if (idleTimeout > -1) {
logger.debug("Scheduling disconnect");
scheduledDisconnection = scheduler.schedule(this::disconnect, idleTimeout, TimeUnit.MILLISECONDS);
private void scheduleDisconnectIfNoClient() {
countClientLock.lock();
try {
if (countClient <= 0) {
var scheduledDisconnectionFinal = scheduledDisconnection;
if (scheduledDisconnectionFinal != null) {
logger.debug("Aborting next disconnect");
scheduledDisconnectionFinal.cancel(true);
}
int idleTimeout = pulseaudioHandler.getIdleTimeout();
if (idleTimeout > -1) {
logger.debug("Scheduling next disconnect");
scheduledDisconnection = scheduler.schedule(this::disconnect, idleTimeout, TimeUnit.MILLISECONDS);
}
}
} finally {
countClientLock.unlock();
}
}

Expand All @@ -115,7 +127,35 @@ public String getLabel(@Nullable Locale locale) {
return label != null ? label : pulseaudioHandler.getThing().getUID().getId();
}

public void setIdle(boolean idle) {
isIdle = idle;
protected void addClientCount() {
countClientLock.lock();
try {
countClient += 1;
logger.debug("Adding new client for pulseaudio sink/source {}. Current count: {}", getLabel(null),
countClient);
if (countClient <= 0) { // safe against misuse
countClient = 1;
}
var scheduledDisconnectionFinal = scheduledDisconnection;
if (scheduledDisconnectionFinal != null) {
logger.debug("Aborting next disconnect");
scheduledDisconnectionFinal.cancel(true);
}
} finally {
countClientLock.unlock();
}
}

protected void minusClientCount() {
countClientLock.lock();
countClient -= 1;
logger.debug("Removing client for pulseaudio sink/source {}. Current count: {}", getLabel(null), countClient);
if (countClient < 0) { // safe against misuse
countClient = 0;
}
countClientLock.unlock();
if (countClient <= 0) {
scheduleDisconnectIfNoClient();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ public static List<SourceOutput> parseSourceOutputs(String raw, PulseaudioClient
private static int parseVolume(String vol) {
int volumeTotal = 0;
int nChannels = 0;
for (String channel : vol.split(", ")) {
for (String channel : vol.split(",")) {
Matcher matcher = VOLUME_PATTERN.matcher(channel.trim());
if (matcher.find()) {
volumeTotal += Integer.valueOf(matcher.group(3));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,6 @@ public void run() {
} catch (InterruptedException i) {
logger.info("Interrupted during sink audio connection: {}", i.getMessage());
return;
} finally {
audioSink.scheduleDisconnect();
}
}
});
Expand Down Expand Up @@ -194,8 +192,6 @@ public void run() {
} catch (InterruptedException i) {
logger.info("Interrupted during source audio connection: {}", i.getMessage());
return;
} finally {
audioSource.scheduleDisconnect();
}
}
});
Expand Down