diff --git a/bundles/org.openhab.core.io.websocket/pom.xml b/bundles/org.openhab.core.io.websocket/pom.xml index 5f5c6203104..a6023ba85ef 100644 --- a/bundles/org.openhab.core.io.websocket/pom.xml +++ b/bundles/org.openhab.core.io.websocket/pom.xml @@ -25,6 +25,18 @@ org.openhab.core.io.rest.auth ${project.version} + + org.openhab.core.bundles + org.openhab.core.audio + ${project.version} + compile + + + org.openhab.core.bundles + org.openhab.core.voice + ${project.version} + compile + diff --git a/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/audiopcm/PCMWebSocketAdapter.java b/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/audiopcm/PCMWebSocketAdapter.java new file mode 100644 index 00000000000..436b84a3ab1 --- /dev/null +++ b/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/audiopcm/PCMWebSocketAdapter.java @@ -0,0 +1,169 @@ +/* + * Copyright (c) 2010-2025 Contributors to the openHAB project + * + * See the NOTICE file(s) distributed with this work for additional + * information. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.openhab.core.io.websocket.audiopcm; + +import static java.nio.ByteBuffer.wrap; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.eclipse.jdt.annotation.Nullable; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; +import org.openhab.core.audio.AudioManager; +import org.openhab.core.audio.AudioSink; +import org.openhab.core.audio.AudioSource; +import org.openhab.core.common.ThreadPoolManager; +import org.openhab.core.io.websocket.WebSocketAdapter; +import org.osgi.framework.BundleContext; +import org.osgi.service.component.annotations.Activate; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.Deactivate; +import org.osgi.service.component.annotations.Reference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The {@link PCMWebSocketAdapter} creates instances of {@link PCMWebSocketConnection} to handle pcm audio + * + * @author Miguel Álvarez Díez - Initial contribution + */ +@NonNullByDefault +@Component(immediate = true, service = { PCMWebSocketAdapter.class, WebSocketAdapter.class }) +public class PCMWebSocketAdapter implements WebSocketAdapter { + public static final String ADAPTER_ID = "audio-pcm"; + + private final Logger logger = LoggerFactory.getLogger(PCMWebSocketAdapter.class); + private final ScheduledExecutorService executor = ThreadPoolManager.getScheduledPool("audio-pcm-websocket"); + protected final BundleContext bundleContext; + protected final AudioManager audioManager; + protected PCMWebSocketAdapter.@Nullable DialogProvider dialogProvider = null; + private final ScheduledFuture pingTask; + private final Set webSocketConnections = Collections.synchronizedSet(new HashSet<>()); + + @Activate + public PCMWebSocketAdapter(BundleContext bundleContext, final @Reference AudioManager audioManager) { + this.bundleContext = bundleContext; + this.audioManager = audioManager; + this.pingTask = executor.scheduleWithFixedDelay(this::pingHandlers, 10, 5, TimeUnit.SECONDS); + } + + protected void onSpeakerConnected(PCMWebSocketConnection speaker) throws IllegalStateException { + synchronized (webSocketConnections) { + if (getSpeakerConnection(speaker.getId()) != null) { + throw new IllegalStateException("Another speaker with the same id is already connected"); + } + webSocketConnections.add(speaker); + logger.debug("connected speakers {}", webSocketConnections.size()); + } + } + + protected void onClientDisconnected(PCMWebSocketConnection connection) { + logger.debug("speaker disconnected '{}'", connection.getId()); + synchronized (webSocketConnections) { + webSocketConnections.remove(connection); + logger.debug("connected speakers {}", webSocketConnections.size()); + } + } + + public @Nullable PCMWebSocketConnection getSpeakerConnection(String id) { + synchronized (webSocketConnections) { + return webSocketConnections.stream() + .filter(speakerConnection -> speakerConnection.getId().equalsIgnoreCase(id)).findAny().orElse(null); + } + } + + public void setDialogProvider(DialogProvider dialogProvider) { + this.dialogProvider = dialogProvider; + } + + @Override + public String getId() { + return ADAPTER_ID; + } + + @Override + public Object createWebSocket(ServletUpgradeRequest servletUpgradeRequest, + ServletUpgradeResponse servletUpgradeResponse) { + logger.debug("creating connection!"); + return new PCMWebSocketConnection(this, executor); + } + + @Deactivate + @SuppressWarnings("unused") + public synchronized void deactivate() { + logger.debug("stopping connection check"); + pingTask.cancel(true); + disconnectAll(); + } + + private void pingHandlers() { + var handlers = new ArrayList<>(webSocketConnections); + for (var handler : handlers) { + if (handler != null) { + boolean pinned = false; + var remote = handler.getRemote(); + if (remote != null) { + try { + remote.sendPing(wrap("oh".getBytes(StandardCharsets.UTF_8))); + pinned = true; + } catch (IOException ignored) { + } + } + if (!pinned) { + logger.warn("ping failed, disconnecting speaker {}", handler.getId()); + var session = handler.getSession(); + if (session != null) { + session.close(); + } + } + } + + } + } + + private void disconnectAll() { + logger.debug("Disconnecting {} clients...", webSocketConnections.size()); + var connections = new ArrayList<>(webSocketConnections); + for (var connection : connections) { + onClientDisconnected(connection); + connection.disconnect(); + } + } + + /** + * These interface provides the dialog initialization functionality to this websocket connections + */ + public interface DialogProvider { + /** + * Starts a dialog and returns a runnable that triggers it + * + * @param webSocket the WebSocket connection associated with this dialog + * @param audioSink the audio sink to play sound + * @param audioSource the audio source to capture sound + * @param locationItem an Item name to scope dialog commands + * @param listeningItem an Item name to toggle while dialog is listening + * @return a {@link Runnable} instance to trigger dialog processing + */ + Runnable startDialog(PCMWebSocketConnection webSocket, AudioSink audioSink, AudioSource audioSource, + @Nullable String locationItem, @Nullable String listeningItem); + } +} diff --git a/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/audiopcm/PCMWebSocketAudioSink.java b/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/audiopcm/PCMWebSocketAudioSink.java new file mode 100644 index 00000000000..c717de82df3 --- /dev/null +++ b/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/audiopcm/PCMWebSocketAudioSink.java @@ -0,0 +1,265 @@ +/* + * Copyright (c) 2010-2025 Contributors to the openHAB project + * + * See the NOTICE file(s) distributed with this work for additional + * information. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.openhab.core.io.websocket.audiopcm; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.time.Instant; +import java.util.Arrays; +import java.util.Locale; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.eclipse.jdt.annotation.Nullable; +import org.openhab.core.audio.AudioFormat; +import org.openhab.core.audio.AudioSink; +import org.openhab.core.audio.AudioStream; +import org.openhab.core.audio.FixedLengthAudioStream; +import org.openhab.core.audio.PipedAudioStream; +import org.openhab.core.audio.SizeableAudioStream; +import org.openhab.core.audio.UnsupportedAudioFormatException; +import org.openhab.core.audio.UnsupportedAudioStreamException; +import org.openhab.core.audio.utils.AudioWaveUtils; +import org.openhab.core.library.types.PercentType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is an {@link AudioSink} implementation connected to the {@link PCMWebSocketConnection} that allow to + * transmit concurrent pcm audio lines through the websocket. + *

+ * To identify the different audio lines the data chucks are prefixed by a header added by the + * {@link PCMWebSocketOutputStream} class. + * + * @author Miguel Álvarez Díez - Initial contribution + */ +@NonNullByDefault +public class PCMWebSocketAudioSink implements AudioSink { + /** + * Byte send to the sink after last chunk to indicate that streaming has ended. + * Should try to be sent event on and error as the client should be aware that data transmission has ended. + */ + private static final byte STREAM_TERMINATION_BYTE = (byte) 254; + private final Set supportedFormats = Set.of(AudioFormat.WAV, AudioFormat.PCM_SIGNED); + private final Set> supportedStreams = Set.of(FixedLengthAudioStream.class, + PipedAudioStream.class); + + private final Logger logger = LoggerFactory.getLogger(PCMWebSocketAudioSink.class); + + private final String sinkId; + private final String sinkLabel; + private final PCMWebSocketConnection websocket; + private PercentType sinkVolume = new PercentType(100); + @Nullable + Integer forceSampleRate; + @Nullable + Integer forceBitDepth; + @Nullable + Integer forceChannels; + + public PCMWebSocketAudioSink(String id, String label, PCMWebSocketConnection websocket, + @Nullable Integer forceSampleRate, @Nullable Integer forceBitDepth, @Nullable Integer forceChannels) { + this.sinkId = id; + this.sinkLabel = label; + this.websocket = websocket; + this.forceSampleRate = forceSampleRate; + this.forceBitDepth = forceBitDepth; + this.forceChannels = forceChannels; + } + + @Override + public String getId() { + return this.sinkId; + } + + @Override + public @Nullable String getLabel(@Nullable Locale locale) { + return this.sinkLabel; + } + + @Override + public void process(@Nullable AudioStream audioStream) + throws UnsupportedAudioFormatException, UnsupportedAudioStreamException { + if (audioStream == null) { + return; + } + OutputStream outputStream = null; + try { + long duration = -1; + if (AudioFormat.CONTAINER_WAVE.equals(audioStream.getFormat().getContainer())) { + logger.debug("Removing wav container from data"); + try { + AudioWaveUtils.removeFMT(audioStream); + } catch (IOException e) { + logger.warn("IOException trying to remove wav header: {}", e.getMessage()); + } + } + var audioFormat = audioStream.getFormat(); + if (audioStream instanceof SizeableAudioStream sizeableAudioStream) { + long byteLength = sizeableAudioStream.length(); + long bytesPerSecond = (Objects.requireNonNull(audioFormat.getBitDepth()) / 8) + * Objects.requireNonNull(audioFormat.getFrequency()) + * Objects.requireNonNull(audioFormat.getChannels()); + float durationInSeconds = (float) byteLength / bytesPerSecond; + duration = Math.round(durationInSeconds * 1000); + logger.debug("Duration of input stream : {}", duration); + } + AtomicBoolean transferenceAborted = new AtomicBoolean(false); + if (audioStream instanceof PipedAudioStream pipedAudioStream) { + pipedAudioStream.onClose(() -> transferenceAborted.set(true)); + } + var sampleRate = Objects.requireNonNull(audioFormat.getFrequency()).intValue(); + var bitDepth = Objects.requireNonNull(audioFormat.getBitDepth()); + var channels = Objects.requireNonNull(audioFormat.getChannels()); + int targetSampleRate = Objects.requireNonNullElse(forceSampleRate, sampleRate); + var targetBitDepth = Objects.requireNonNullElse(forceBitDepth, bitDepth); + var targetChannels = Objects.requireNonNullElse(forceChannels, channels); + outputStream = new PCMWebSocketOutputStream(websocket, targetSampleRate, targetBitDepth.byteValue(), + targetChannels.byteValue()); + InputStream finalAudioStream; + if ( // + (forceSampleRate != null && !forceSampleRate.equals(sampleRate)) || // + (forceBitDepth != null && !forceBitDepth.equals(bitDepth)) || // + (forceChannels != null && !forceChannels.equals(channels)) // + ) { + logger.debug("Sound is not in the target format. Trying to re-encode it"); + finalAudioStream = PCMWebSocketAudioUtil.getPCMStreamNormalized(audioStream, sampleRate, bitDepth, + channels, targetSampleRate, targetBitDepth, targetChannels); + } else { + finalAudioStream = audioStream; + } + int bytesPer500ms = (targetSampleRate * (targetBitDepth / 8) * channels) / 2; + transferAudio(finalAudioStream, outputStream, bytesPer500ms, duration, transferenceAborted); + } catch (InterruptedIOException ignored) { + } catch (IOException e) { + logger.warn("IOException: {}", e.getMessage()); + } catch (InterruptedException e) { + logger.warn("InterruptedException: {}", e.getMessage()); + } finally { + try { + audioStream.close(); + } catch (IOException e) { + logger.warn("IOException: {}", e.getMessage(), e); + } + try { + if (outputStream != null) { + outputStream.close(); + } + } catch (IOException e) { + logger.warn("IOException: {}", e.getMessage(), e); + } + } + } + + private void transferAudio(InputStream inputStream, OutputStream outputStream, int chunkSize, long duration, + AtomicBoolean aborted) throws IOException, InterruptedException { + Instant start = Instant.now(); + long transferred = 0; + try { + byte[] buffer = new byte[chunkSize]; + int read; + while (!aborted.get() && (read = inputStream.read(buffer, 0, chunkSize)) >= 0) { + outputStream.write(buffer, 0, read); + transferred += read; + } + } finally { + try { + // send a byte indicating this stream has ended, so it can be tear down on the client + outputStream.write(new byte[] { STREAM_TERMINATION_BYTE }, 0, 1); + } catch (IOException e) { + logger.warn("Unable to send termination byte to sink {}", sinkId); + } + } + if (duration != -1) { + Instant end = Instant.now(); + long millisSecondTimedToSendAudioData = Duration.between(start, end).toMillis(); + if (millisSecondTimedToSendAudioData < duration) { + long timeToSleep = duration - millisSecondTimedToSendAudioData; + logger.debug("Sleep time to let the system play sound : {}ms", timeToSleep); + Thread.sleep(timeToSleep); + } + } + } + + @Override + public Set getSupportedFormats() { + return supportedFormats; + } + + @Override + public Set> getSupportedStreams() { + return supportedStreams; + } + + @Override + public PercentType getVolume() throws IOException { + return this.sinkVolume; + } + + @Override + public void setVolume(PercentType percentType) throws IOException { + this.sinkVolume = percentType; + websocket.setSinkVolume(percentType.intValue()); + } + + /** + * This is an {@link OutputStream} implementation for writing binary data to the websocket that + * will prefix each chunk with a header composed of 6 bytes. + * Header: 3 bytes (stream id) + 1 byte (stream sample rate) + 1 byte (stream bit depth) + 1 byte (channels). + */ + protected static class PCMWebSocketOutputStream extends OutputStream { + private final byte[] header; + private final PCMWebSocketConnection websocket; + private boolean closed = false; + + public PCMWebSocketOutputStream(PCMWebSocketConnection websocket, int sampleRate, byte bitDepth, + byte channels) { + this.websocket = websocket; + this.header = PCMWebSocketStreamIdUtil.generateAudioPacketHeader(sampleRate, bitDepth, channels).array(); + } + + @Override + public void write(int b) throws IOException { + write(ByteBuffer.allocate(4).putInt(b).array()); + } + + @Override + public void write(byte @Nullable [] b) throws IOException { + if (closed) { + throw new IOException("Stream closed"); + } + if (b != null) { + websocket.sendAudio(header, b); + } + } + + @Override + public void write(byte @Nullable [] b, int off, int len) throws IOException { + if (b != null) { + write(Arrays.copyOfRange(b, off, off + len)); + } + } + + @Override + public void close() throws IOException { + closed = true; + super.close(); + } + } +} diff --git a/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/audiopcm/PCMWebSocketAudioSource.java b/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/audiopcm/PCMWebSocketAudioSource.java new file mode 100644 index 00000000000..707be38564d --- /dev/null +++ b/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/audiopcm/PCMWebSocketAudioSource.java @@ -0,0 +1,200 @@ +/* + * Copyright (c) 2010-2025 Contributors to the openHAB project + * + * See the NOTICE file(s) distributed with this work for additional + * information. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.openhab.core.io.websocket.audiopcm; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.util.Arrays; +import java.util.Locale; +import java.util.Set; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; + +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.eclipse.jdt.annotation.Nullable; +import org.openhab.core.audio.AudioException; +import org.openhab.core.audio.AudioFormat; +import org.openhab.core.audio.AudioSource; +import org.openhab.core.audio.AudioStream; +import org.openhab.core.audio.PipedAudioStream; +import org.openhab.core.common.ThreadPoolManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is an {@link AudioSource} implementation connected to the {@link PCMWebSocketConnection} that allow to + * a single pcm audio line through the websocket which is shared across the active {@link AudioStream} instances. + * + * @author Miguel Álvarez Díez - Initial contribution + */ +@NonNullByDefault +public class PCMWebSocketAudioSource implements AudioSource { + private final Logger logger = LoggerFactory.getLogger(PCMWebSocketAudioSource.class); + public static int supportedBitDepth = 16; + public static int supportedSampleRate = 16000; + public static int supportedChannels = 1; + public static AudioFormat supportedFormat = new AudioFormat(AudioFormat.CONTAINER_WAVE, + AudioFormat.CODEC_PCM_SIGNED, false, supportedBitDepth, null, (long) supportedSampleRate, + supportedChannels); + private final String sourceId; + private final String sourceLabel; + private final PCMWebSocketConnection websocket; + private @Nullable PipedOutputStream sourceAudioPipedOutput; + private @Nullable PipedInputStream sourceAudioPipedInput; + private @Nullable InputStream sourceAudioStream; + private final PipedAudioStream.Group streamGroup = PipedAudioStream.newGroup(supportedFormat); + private @Nullable Future sourceWriteTask; + private final ScheduledExecutorService scheduler = ThreadPoolManager.getScheduledPool("pcm-audio-source"); + private byte @Nullable [] streamId; + + public PCMWebSocketAudioSource(String id, String label, PCMWebSocketConnection websocket) { + this.sourceId = id; + this.sourceLabel = label; + this.websocket = websocket; + } + + @Override + public String getId() { + return this.sourceId; + } + + @Override + public String getLabel(@Nullable Locale locale) { + return this.sourceLabel; + } + + @Override + public Set getSupportedFormats() { + return Set.of(supportedFormat); + } + + @Override + public AudioStream getInputStream(AudioFormat audioFormat) throws AudioException { + try { + final PipedAudioStream stream = streamGroup.getAudioStreamInGroup(); + synchronized (streamGroup) { + if (this.streamGroup.size() == 1) { + logger.debug("Send start listening {}", getId()); + this.streamId = null; + websocket.setListening(true); + } + } + stream.onClose(this::onStreamClose); + return stream; + } catch (IOException e) { + throw new AudioException(e); + } + } + + public void close() throws Exception { + streamGroup.close(); + } + + public void writeToStreams(byte[] id, int sampleRate, int bitDepth, int channels, byte[] payload) { + if (streamGroup.isEmpty()) { + logger.debug("Source already disposed ignoring data"); + return; + } + if (this.streamId == null) { + this.streamId = id; + } else if (!Arrays.equals(this.streamId, id)) { + // source is only ready to handle a single data line + logger.warn("Ignoring data from source stream {}", id); + return; + } + boolean needsConvert = sampleRate != supportedSampleRate || bitDepth != supportedBitDepth + || channels != supportedChannels; + if (!needsConvert) { + streamGroup.write(payload); + return; + } + if (this.sourceAudioPipedOutput == null || this.sourceAudioStream == null) { + try { + this.sourceAudioPipedOutput = new PipedOutputStream(); + var sourceAudioPipedInput = this.sourceAudioPipedInput = new PipedInputStream( + this.sourceAudioPipedOutput, (sampleRate * (bitDepth / 8) * channels) * 2); + logger.debug( + "Enabling converting pcm audio for the audio source stream: sample rate {}, bit depth {}, channels {} => sample rate {}, bit depth {}, channels {}", + sampleRate, bitDepth, channels, supportedSampleRate, supportedBitDepth, supportedChannels); + this.sourceAudioStream = PCMWebSocketAudioUtil.getPCMStreamNormalized(sourceAudioPipedInput, sampleRate, + bitDepth, channels, supportedSampleRate, supportedBitDepth, supportedChannels); + sourceWriteTask = scheduler.submit(() -> { + int bytesPer250ms = (supportedSampleRate * (supportedBitDepth / 8) * supportedChannels) / 4; + while (true) { + byte[] convertedPayload; + try { + convertedPayload = this.sourceAudioStream.readNBytes(bytesPer250ms); + Thread.sleep(0); + } catch (InterruptedIOException | InterruptedException e) { + continue; + } catch (IOException e) { + if (e.getMessage().contains("Pipe closed")) { + return; + } + logger.error("Error reading converted audio data", e); + continue; + } + streamGroup.write(convertedPayload); + } + }); + } catch (IOException e) { + logger.error("Unable to setup audio source stream", e); + return; + } + } + try { + this.sourceAudioPipedOutput.write(payload); + } catch (IOException e) { + logger.error("Error converting source audio format", e); + } + } + + private void onStreamClose() { + logger.debug("Unregister source audio stream for '{}'", getId()); + synchronized (streamGroup) { + if (streamGroup.isEmpty()) { + logger.debug("Send stop listening {}", getId()); + websocket.setListening(false); + if (this.sourceWriteTask != null) { + this.sourceWriteTask.cancel(true); + this.sourceWriteTask = null; + } + if (this.sourceAudioStream != null) { + try { + this.sourceAudioStream.close(); + } catch (IOException ignored) { + } + this.sourceAudioStream = null; + } + if (this.sourceAudioPipedOutput != null) { + try { + this.sourceAudioPipedOutput.close(); + } catch (IOException ignored) { + } + this.sourceAudioPipedOutput = null; + } + if (this.sourceAudioPipedInput != null) { + try { + this.sourceAudioPipedInput.close(); + } catch (IOException ignored) { + } + this.sourceAudioPipedInput = null; + } + this.streamId = null; + } + } + } +} diff --git a/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/audiopcm/PCMWebSocketAudioUtil.java b/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/audiopcm/PCMWebSocketAudioUtil.java new file mode 100644 index 00000000000..c6495925077 --- /dev/null +++ b/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/audiopcm/PCMWebSocketAudioUtil.java @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2010-2025 Contributors to the openHAB project + * + * See the NOTICE file(s) distributed with this work for additional + * information. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.openhab.core.io.websocket.audiopcm; + +import java.io.InputStream; + +import javax.sound.sampled.AudioInputStream; +import javax.sound.sampled.AudioSystem; + +import org.eclipse.jdt.annotation.NonNullByDefault; + +/** + * Audio utils. + * + * @author Miguel Álvarez Díez - Initial contribution + */ +@NonNullByDefault +public class PCMWebSocketAudioUtil { + /** + * Ensure right PCM format by converting if needed (sample rate, channel) + * + * @param sampleRate Stream sample rate + * @param stream PCM input stream + * @return A PCM normalized stream at the desired format + */ + public static AudioInputStream getPCMStreamNormalized(InputStream stream, int sampleRate, int bitDepth, + int channels, int targetSampleRate, int targetBitDepth, int targetChannels) { + javax.sound.sampled.AudioFormat jFormat = new javax.sound.sampled.AudioFormat( // + (float) sampleRate, // + bitDepth, // + channels, // + true, // + false // + ); + javax.sound.sampled.AudioFormat fixedJFormat = new javax.sound.sampled.AudioFormat( // + (float) targetSampleRate, // + targetBitDepth, // + targetChannels, // + true, // + false // + ); + return AudioSystem.getAudioInputStream(fixedJFormat, new AudioInputStream(stream, jFormat, -1)); + } +} diff --git a/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/audiopcm/PCMWebSocketConnection.java b/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/audiopcm/PCMWebSocketConnection.java new file mode 100644 index 00000000000..8a2bd117f5d --- /dev/null +++ b/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/audiopcm/PCMWebSocketConnection.java @@ -0,0 +1,371 @@ +/* + * Copyright (c) 2010-2025 Contributors to the openHAB project + * + * See the NOTICE file(s) distributed with this work for additional + * information. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.openhab.core.io.websocket.audiopcm; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.eclipse.jdt.annotation.Nullable; +import org.eclipse.jetty.websocket.api.RemoteEndpoint; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.WebSocketListener; +import org.eclipse.jetty.websocket.api.annotations.WebSocket; +import org.openhab.core.audio.AudioSink; +import org.openhab.core.audio.AudioSource; +import org.osgi.framework.ServiceRegistration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + * The {@link PCMWebSocketConnection} represents a WebSocket connection used to transmit pcm audio + *

+ * The websocket uses the text protocol for send commands represented by {@link WebSocketCommand} and the binary + * protocol to transmit the audio data + *

+ * The websocket supports only one line for the {@link PCMWebSocketAudioSource} (the audio is shared on the server), + * the data transmission is instructed by the server (START_LISTENING and STOP_LISTENING commands) + *

+ * The websocket supports multiple lines for the {@link PCMWebSocketAudioSink} (to accomplis that, the outgoing data + * chucks are prefixed with a 6 byte header to transmit the identity and format specification, check + * {@link PCMWebSocketAudioSink.PCMWebSocketOutputStream}) + * + * @author Miguel Álvarez Díez - Initial contribution + */ +@WebSocket +@NonNullByDefault +@SuppressWarnings("unused") +public class PCMWebSocketConnection implements WebSocketListener { + private final Logger logger = LoggerFactory.getLogger(PCMWebSocketConnection.class); + protected final Map> audioComponentRegistrations = new ConcurrentHashMap<>(); + private volatile @Nullable Session session; + private @Nullable RemoteEndpoint remote; + private final PCMWebSocketAdapter wsAdapter; + private final ScheduledExecutorService executor; + private @Nullable ScheduledFuture scheduledDisconnection; + + private boolean initialized = false; + private @Nullable Runnable dialogTrigger = null; + + private final ObjectMapper jsonMapper = new ObjectMapper(); + private String id = ""; + private @Nullable PCMWebSocketAudioSource audioSource = null; + + public PCMWebSocketConnection(PCMWebSocketAdapter wsAdapter, ScheduledExecutorService executor) { + this.wsAdapter = wsAdapter; + this.executor = executor; + } + + public void sendAudio(byte[] id, byte[] b) { + try { + var remote = getRemote(); + if (remote != null) { + // concat stream identifier and send + ByteBuffer buff = ByteBuffer.wrap(new byte[id.length + b.length]); + buff.put(id); + buff.put(b); + remote.sendBytesByFuture(ByteBuffer.wrap(buff.array())); + } + } catch (IllegalStateException ignored) { + logger.warn("Unable to send audio buffer"); + } + } + + public void setListening(boolean listening) { + sendClientCommand(new WebSocketCommand(listening ? WebSocketCommand.OutputCommands.START_LISTENING + : WebSocketCommand.OutputCommands.STOP_LISTENING)); + } + + public void disconnect() { + var session = getSession(); + if (session != null) { + try { + session.disconnect(); + } catch (IOException ignored) { + } + } + } + + private void spot() { + onRemoteSpot(); + } + + @Override + public void onWebSocketConnect(@Nullable Session sess) { + if (sess == null) { + // never + return; + } + this.session = sess; + this.remote = sess.getRemote(); + logger.info("New client connected."); + scheduledDisconnection = executor.schedule(() -> { + try { + sess.disconnect(); + } catch (IOException ignored) { + } + }, 5, TimeUnit.SECONDS); + } + + private void sendClientCommand(T msg) { + var remote = getRemote(); + if (remote != null) { + try { + remote.sendStringByFuture(new ObjectMapper().writeValueAsString(msg)); + } catch (JsonProcessingException e) { + logger.warn("JsonProcessingException writing JSON message: ", e); + } + } + } + + @Override + public void onWebSocketBinary(byte @Nullable [] payload, int offset, int len) { + logger.trace("Received binary data of length {}", len); + PCMWebSocketAudioSource audioSource = this.audioSource; + if (payload != null && audioSource != null) { + var streamData = PCMWebSocketStreamIdUtil.parseAudioPacket(payload); + audioSource.writeToStreams(streamData.id(), streamData.sampleRate(), streamData.bitDepth(), + streamData.channels(), streamData.audioData()); + } + } + + @Override + public void onWebSocketText(@Nullable String message) { + try { + var rootMessageNode = jsonMapper.readTree(message); + if (rootMessageNode.has("cmd")) { + try { + var cmd = rootMessageNode.get("cmd").asText().trim().toUpperCase(); + logger.debug("Handling msg '{}'", cmd); + var messageType = WebSocketCommand.InputCommands.valueOf(cmd); + switch (messageType) { + case INITIALIZE -> { + wsAdapter.onSpeakerConnected(this); + JsonNode argsNode = rootMessageNode.get("args"); + var clientOptions = jsonMapper.treeToValue(argsNode, ConnectionOptions.class); + var scheduledDisconnection = this.scheduledDisconnection; + if (scheduledDisconnection != null) { + scheduledDisconnection.cancel(true); + } + // update connection settings + id = clientOptions.id; + registerSpeakerComponents(id, clientOptions); + sendClientCommand(new WebSocketCommand(WebSocketCommand.OutputCommands.INITIALIZED)); + } + case ON_SPOT -> onRemoteSpot(); + } + + } catch (IOException | IllegalStateException e) { + logger.warn("Disconnecting client: {}", e.getMessage()); + disconnect(); + } + } + } catch (JsonProcessingException e) { + logger.warn("Exception parsing JSON message: ", e); + } + } + + @Override + public void onWebSocketError(@Nullable Throwable cause) { + logger.warn("WebSocket Error: ", cause); + } + + @Override + public void onWebSocketClose(int statusCode, @Nullable String reason) { + this.session = null; + this.remote = null; + logger.debug("Session closed with code {}: {}", statusCode, reason); + wsAdapter.onClientDisconnected(this); + unregisterSpeakerComponents(id); + } + + public void setSinkVolume(int value) { + if (initialized) { + sendClientCommand( + new WebSocketCommand(WebSocketCommand.OutputCommands.SINK_VOLUME, Map.of("value", value))); + } + } + + public void setSourceVolume(int value) { + if (initialized) { + sendClientCommand( + new WebSocketCommand(WebSocketCommand.OutputCommands.SOURCE_VOLUME, Map.of("value", value))); + } + } + + public @Nullable RemoteEndpoint getRemote() { + return this.remote; + } + + public @Nullable Session getSession() { + return this.session; + } + + public boolean isConnected() { + Session sess = this.session; + return sess != null && sess.isOpen(); + } + + private synchronized void registerSpeakerComponents(String id, ConnectionOptions clientOptions) throws IOException { + if (id.isBlank()) { + throw new IOException("Unable to register audio components"); + } + String label = "UI (" + id + ")"; + logger.debug("Registering dialog components for '{}'", id); + this.initialized = true; + // register source + var audioSource = this.audioSource = new PCMWebSocketAudioSource(getSourceId(id), label, this); + logger.debug("Registering audio source {}", this.audioSource.getId()); + audioComponentRegistrations.put(this.audioSource.getId(), wsAdapter.bundleContext + .registerService(AudioSource.class.getName(), this.audioSource, new Hashtable<>())); + // register sink + var audioSink = new PCMWebSocketAudioSink(getSinkId(id), label, this, clientOptions.forceSampleRate, + clientOptions.forceBitDepth, clientOptions.forceChannels); + logger.debug("Registering audio sink {}", audioSink.getId()); + audioComponentRegistrations.put(audioSink.getId(), + wsAdapter.bundleContext.registerService(AudioSink.class.getName(), audioSink, new Hashtable<>())); + // init dialog + if (clientOptions.startDialog) { + var dialogProvider = this.wsAdapter.dialogProvider; + if (dialogProvider == null) { + throw new IOException("Voice functionality is not ready"); + } + dialogTrigger = dialogProvider.startDialog(this, audioSink, audioSource, + !clientOptions.locationItem.isBlank() ? clientOptions.locationItem : null, + !clientOptions.listeningItem.isBlank() ? clientOptions.listeningItem : null); + } else { + dialogTrigger = null; + } + } + + private synchronized void unregisterSpeakerComponents(String id) { + initialized = false; + dialogTrigger = null; + var source = wsAdapter.audioManager.getSource(getSourceId(id)); + if (source instanceof PCMWebSocketAudioSource hsAudioSource) { + try { + hsAudioSource.close(); + } catch (Exception ignored) { + } + } + if (source != null) { + ServiceRegistration sourceReg = audioComponentRegistrations.remove(source.getId()); + if (sourceReg != null) { + logger.debug("Unregistering audio source {}", source.getId()); + sourceReg.unregister(); + } + } + var sink = wsAdapter.audioManager.getSink(getSinkId(id)); + if (sink != null) { + ServiceRegistration sinkReg = audioComponentRegistrations.remove(sink.getId()); + if (sinkReg != null) { + logger.debug("Unregistering audio sink {}", sink.getId()); + sinkReg.unregister(); + } + } + } + + private void onRemoteSpot() { + var dialogTrigger = this.dialogTrigger; + if (dialogTrigger != null) { + dialogTrigger.run(); + } + } + + private String getSinkId(String id) { + return "pcm::" + id + "::sink"; + } + + private String getSourceId(String id) { + return "pcm::" + id + "::source"; + } + + public String getId() { + return id; + } + + private static class WebSocketCommand { + public String cmd = ""; + public Map args; + + public WebSocketCommand(OutputCommands cmd) { + this(cmd, new HashMap<>()); + } + + public WebSocketCommand(OutputCommands cmd, Map args) { + this.cmd = cmd.name(); + this.args = args; + } + + public enum OutputCommands { + INITIALIZED, + START_LISTENING, + STOP_LISTENING, + SINK_VOLUME, + SOURCE_VOLUME, + } + + public enum InputCommands { + INITIALIZE, + ON_SPOT, + } + } + + /** + * The {@link ConnectionOptions} represents the options provided by the ws client. + */ + public static class ConnectionOptions { + /** + * Identifier to concatenate to related services (dialog, source and sick) + */ + public String id = ""; + /** + * Force sink audio sample rate (resample in server) + */ + public @Nullable Integer forceSampleRate; + /** + * Force sink audio bit depth (resample in server) + */ + public @Nullable Integer forceBitDepth; + /** + * Force sink audio channels (resample in server) + */ + public @Nullable Integer forceChannels; + /** + * Start a dialog processor using the registered audio components + */ + public boolean startDialog = false; + /** + * Listening item for the dialog + */ + public String listeningItem = ""; + /** + * Location item for the dialog + */ + public String locationItem = ""; + + public ConnectionOptions() { + } + } +} diff --git a/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/audiopcm/PCMWebSocketDialogProvider.java b/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/audiopcm/PCMWebSocketDialogProvider.java new file mode 100644 index 00000000000..299c72c231d --- /dev/null +++ b/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/audiopcm/PCMWebSocketDialogProvider.java @@ -0,0 +1,111 @@ +/* + * Copyright (c) 2010-2025 Contributors to the openHAB project + * + * See the NOTICE file(s) distributed with this work for additional + * information. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.openhab.core.io.websocket.audiopcm; + +import java.util.Locale; + +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.eclipse.jdt.annotation.Nullable; +import org.openhab.core.audio.AudioSink; +import org.openhab.core.audio.AudioSource; +import org.openhab.core.voice.KSEdgeService; +import org.openhab.core.voice.KSException; +import org.openhab.core.voice.KSListener; +import org.openhab.core.voice.KSServiceHandle; +import org.openhab.core.voice.KSpottedEvent; +import org.openhab.core.voice.VoiceManager; +import org.osgi.service.component.annotations.Activate; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.Reference; + +/** + * This component provides dialog support to the {@link PCMWebSocketAdapter}. + * + * @author Miguel Álvarez Díez - Initial contribution + */ +@NonNullByDefault +@Component(immediate = true) +@SuppressWarnings("unused") +public class PCMWebSocketDialogProvider implements PCMWebSocketAdapter.DialogProvider { + private final VoiceManager voiceManager; + + @Activate + public PCMWebSocketDialogProvider(@Reference VoiceManager voiceManager, + @Reference PCMWebSocketAdapter pcmWebSocketAdapter) { + this.voiceManager = voiceManager; + pcmWebSocketAdapter.setDialogProvider(this); + } + + /** + * Starts a dialog and returns a runnable instance that triggers the dialog. + * + * @param sink the audio sink + * @param source the audio source + * @return a runnable that triggers the dialog + */ + public Runnable startDialog(PCMWebSocketConnection webSocket, AudioSink sink, AudioSource source, + @Nullable String locationItem, @Nullable String listeningItem) { + var ks = new WebSocketKeywordSpotter(webSocket::disconnect); + voiceManager.startDialog( // + voiceManager.getDialogContextBuilder() // + .withSource(source) // + .withSink(sink) // + .withKS(ks) // + .withLocationItem(locationItem) // + .withListeningItem(listeningItem) // + .build() // + ); + return ks::trigger; + } + + /** + * Anonymous keyword spotter used to trigger the dialog + */ + private static class WebSocketKeywordSpotter implements KSEdgeService { + private final Runnable onAbort; + private @Nullable KSListener ksListener = null; + + public WebSocketKeywordSpotter(Runnable onAbort) { + this.onAbort = onAbort; + } + + public void trigger() { + var ksListener = this.ksListener; + if (ksListener != null) { + ksListener.ksEventReceived(new KSpottedEvent()); + } + } + + @Override + public KSServiceHandle spot(KSListener ksListener) throws KSException { + this.ksListener = ksListener; + return () -> { + if (ksListener.equals(this.ksListener)) { + this.ksListener = null; + this.onAbort.run(); + } + }; + } + + @Override + public String getId() { + return "pcmws::anonymous::ks"; + } + + @Override + public String getLabel(@Nullable Locale locale) { + // never shown + return "Anonymous"; + } + } +} diff --git a/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/audiopcm/PCMWebSocketStreamIdUtil.java b/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/audiopcm/PCMWebSocketStreamIdUtil.java new file mode 100644 index 00000000000..5a037275ffc --- /dev/null +++ b/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/audiopcm/PCMWebSocketStreamIdUtil.java @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2010-2025 Contributors to the openHAB project + * + * See the NOTICE file(s) distributed with this work for additional + * information. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.openhab.core.io.websocket.audiopcm; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.security.SecureRandom; + +import org.eclipse.jdt.annotation.NonNullByDefault; + +/** + * Utils to read/write the audio packets send though the websocket binary protocol. + * + * @author Miguel Álvarez Díez - Initial contribution + */ +@NonNullByDefault +public class PCMWebSocketStreamIdUtil { + /** + * Packet header length in bytes: + * 2 for id + * 4 for sample rate as int little-endian + * 1 for bitDepth + * 1 for channels + */ + public static int packetHeaderByteLength = 2 + 4 + 1 + 1; + + public static AudioPacketData parseAudioPacket(byte[] bytes) { + assert (bytes.length >= packetHeaderByteLength); + var byteBuffer = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN); + byte[] idBytes = new byte[] { byteBuffer.get(), byteBuffer.get() }; + int sampleRate = byteBuffer.getInt(); + byte bitDepth = byteBuffer.get(); + byte channels = byteBuffer.get(); + byte[] data = new byte[byteBuffer.remaining()]; + byteBuffer.get(data); + return new AudioPacketData(idBytes, sampleRate, bitDepth, channels, data); + } + + public static ByteBuffer generateAudioPacketHeader(int sampleRate, byte bitDepth, byte channels) { + var byteBuffer = ByteBuffer.allocate(packetHeaderByteLength).order(ByteOrder.LITTLE_ENDIAN); + SecureRandom sr = new SecureRandom(); + byte[] rndBytes = new byte[2]; + sr.nextBytes(rndBytes); + byteBuffer.put(rndBytes[0]); + byteBuffer.put(rndBytes[1]); + byteBuffer.putInt(sampleRate); + byteBuffer.put(bitDepth); + byteBuffer.put(channels); + return byteBuffer; + } + + public record AudioPacketData(byte[] id, int sampleRate, byte bitDepth, byte channels, byte[] audioData) { + } +} diff --git a/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/KSEdgeService.java b/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/KSEdgeService.java index 8c7f4338094..25fe4bc7dbf 100644 --- a/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/KSEdgeService.java +++ b/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/KSEdgeService.java @@ -13,8 +13,10 @@ package org.openhab.core.voice; import java.util.Locale; +import java.util.Set; import org.eclipse.jdt.annotation.NonNullByDefault; +import org.openhab.core.audio.AudioFormat; import org.openhab.core.audio.AudioStream; /** @@ -41,4 +43,14 @@ default KSServiceHandle spot(KSListener ksListener, AudioStream audioStream, Loc throws KSException { throw new KSException("An edge keyword spotter is not meant to process audio in the server"); } + + @Override + default Set getSupportedFormats() { + return Set.of(); + } + + @Override + default Set getSupportedLocales() { + return Set.of(); + } } diff --git a/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/internal/DialogProcessor.java b/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/internal/DialogProcessor.java index d6288da8227..3a80550b105 100644 --- a/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/internal/DialogProcessor.java +++ b/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/internal/DialogProcessor.java @@ -114,7 +114,7 @@ public DialogProcessor(DialogContext context, DialogEventListener eventListener, this.activeDialogGroups = activeDialogGroups; this.bundle = bundle; var ks = context.ks(); - this.ksFormat = ks != null + this.ksFormat = ks != null && !(ks instanceof KSEdgeService) ? VoiceManagerImpl.getBestMatch(context.source().getSupportedFormats(), ks.getSupportedFormats()) : null; this.sttFormat = VoiceManagerImpl.getBestMatch(context.source().getSupportedFormats(), @@ -159,7 +159,7 @@ public void start() throws IllegalStateException { abortKS(); closeStreamKS(); AudioFormat fmt = ksFormat; - if (fmt == null) { + if (fmt == null && !(ksService instanceof KSEdgeService)) { logger.warn("No compatible audio format found for ks '{}' and source '{}'", ksService.getId(), dialogContext.source().getId()); return;