diff --git a/.changeset/heavy-cats-unite.md b/.changeset/heavy-cats-unite.md new file mode 100644 index 00000000..afda602d --- /dev/null +++ b/.changeset/heavy-cats-unite.md @@ -0,0 +1,5 @@ +--- +"@livekit/agents": patch +--- + +Publish transcriptions additionally via text stream APIs diff --git a/agents/src/constants.ts b/agents/src/constants.ts new file mode 100644 index 00000000..66975036 --- /dev/null +++ b/agents/src/constants.ts @@ -0,0 +1,7 @@ +// SPDX-FileCopyrightText: 2024 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +export const ATTRIBUTE_TRANSCRIPTION_TRACK_ID = 'lk.transcribed_track_id'; +export const ATTRIBUTE_TRANSCRIPTION_FINAL = 'lk.transcription_final'; +export const TOPIC_TRANSCRIPTION = 'lk.transcription'; +export const TOPIC_CHAT = 'lk.chat'; diff --git a/agents/src/multimodal/multimodal_agent.ts b/agents/src/multimodal/multimodal_agent.ts index 6d04adf4..19edde96 100644 --- a/agents/src/multimodal/multimodal_agent.ts +++ b/agents/src/multimodal/multimodal_agent.ts @@ -20,6 +20,11 @@ import { } from '@livekit/rtc-node'; import { EventEmitter } from 'node:events'; import { AudioByteStream } from '../audio.js'; +import { + ATTRIBUTE_TRANSCRIPTION_FINAL, + ATTRIBUTE_TRANSCRIPTION_TRACK_ID, + TOPIC_TRANSCRIPTION, +} from '../constants.js'; import * as llm from '../llm/index.js'; import { log } from '../log.js'; import type { MultimodalLLMMetrics } from '../metrics/base.js'; @@ -251,8 +256,8 @@ export class MultimodalAgent extends EventEmitter { if (message.contentType === 'text') return; const synchronizer = new TextAudioSynchronizer(defaultTextSyncOptions); - synchronizer.on('textUpdated', (text) => { - this.#publishTranscription( + synchronizer.on('textUpdated', async (text) => { + await this.#publishTranscription( this.room!.localParticipant!.identity!, this.#getLocalTrackSid()!, text.text, @@ -302,25 +307,31 @@ export class MultimodalAgent extends EventEmitter { }); // eslint-disable-next-line @typescript-eslint/no-explicit-any - this.#session.on('input_speech_committed', (ev: any) => { + this.#session.on('input_speech_committed', async (ev: any) => { // openai.realtime.InputSpeechCommittedEvent const participantIdentity = this.linkedParticipant?.identity; const trackSid = this.subscribedTrack?.sid; if (participantIdentity && trackSid) { - this.#publishTranscription(participantIdentity, trackSid, '…', false, ev.itemId); + await this.#publishTranscription(participantIdentity, trackSid, '…', false, ev.itemId); } else { this.#logger.error('Participant or track not set'); } }); // eslint-disable-next-line @typescript-eslint/no-explicit-any - this.#session.on('input_speech_transcription_completed', (ev: any) => { + this.#session.on('input_speech_transcription_completed', async (ev: any) => { // openai.realtime.InputSpeechTranscriptionCompletedEvent const transcription = ev.transcript; const participantIdentity = this.linkedParticipant?.identity; const trackSid = this.subscribedTrack?.sid; if (participantIdentity && trackSid) { - this.#publishTranscription(participantIdentity, trackSid, transcription, true, ev.itemId); + await this.#publishTranscription( + participantIdentity, + trackSid, + transcription, + true, + ev.itemId, + ); } else { this.#logger.error('Participant or track not set'); } @@ -332,7 +343,7 @@ export class MultimodalAgent extends EventEmitter { this.#logger.child({ transcription }).debug('committed user speech'); }); - this.#session.on('input_speech_started', (ev: any) => { + this.#session.on('input_speech_started', async (ev: any) => { this.emit('user_started_speaking'); if (this.#playingHandle && !this.#playingHandle.done) { this.#playingHandle.interrupt(); @@ -349,7 +360,7 @@ export class MultimodalAgent extends EventEmitter { const participantIdentity = this.linkedParticipant?.identity; const trackSid = this.subscribedTrack?.sid; if (participantIdentity && trackSid) { - this.#publishTranscription(participantIdentity, trackSid, '…', false, ev.itemId); + await this.#publishTranscription(participantIdentity, trackSid, '…', false, ev.itemId); } }); @@ -475,13 +486,13 @@ export class MultimodalAgent extends EventEmitter { return this.#localTrackSid; } - #publishTranscription( + async #publishTranscription( participantIdentity: string, trackSid: string, text: string, isFinal: boolean, id: string, - ): void { + ): Promise<void> { this.#logger.debug( `Publishing transcription ${participantIdentity} ${trackSid} ${text} ${isFinal} ${id}`, ); @@ -504,6 +515,17 @@ export class MultimodalAgent extends EventEmitter { }, ], }); + + const stream = await this.room.localParticipant.streamText({ + topic: TOPIC_TRANSCRIPTION, + senderIdentity: participantIdentity, + attributes: { + [ATTRIBUTE_TRANSCRIPTION_TRACK_ID]: trackSid, + [ATTRIBUTE_TRANSCRIPTION_FINAL]: isFinal.toString(), + }, + }); + await stream.write(text); + await stream.close(); } #updateState() { diff --git a/agents/src/pipeline/pipeline_agent.ts b/agents/src/pipeline/pipeline_agent.ts index 39023410..f83f4e05 100644 --- a/agents/src/pipeline/pipeline_agent.ts +++ b/agents/src/pipeline/pipeline_agent.ts @@ -17,6 +17,11 @@ import { import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter'; import { randomUUID } from 'node:crypto'; import EventEmitter from 'node:events'; +import { + ATTRIBUTE_TRANSCRIPTION_FINAL, + ATTRIBUTE_TRANSCRIPTION_TRACK_ID, + TOPIC_TRANSCRIPTION, +} from '../constants.js'; import type { CallableFunctionResult, FunctionCallInfo, @@ -518,28 +523,21 @@ export class VoicePipelineAgent extends (EventEmitter as new () => TypedEmitter< this.emit(VPAEvent.USER_STOPPED_SPEAKING); this.#deferredValidation.onHumanEndOfSpeech(event); }); - this.#humanInput.on(HumanInputEvent.INTERIM_TRANSCRIPT, (event) => { + this.#humanInput.on(HumanInputEvent.INTERIM_TRANSCRIPT, async (event) => { if (!this.#transcriptionId) { this.#transcriptionId = randomUUID(); } this.#transcribedInterimText = event.alternatives![0].text; - this.#room!.localParticipant!.publishTranscription({ - participantIdentity: this.#humanInput!.participant.identity, - trackSid: this.#humanInput!.subscribedTrack!.sid!, - segments: [ - { - text: this.#transcribedInterimText, - id: this.#transcriptionId, - final: true, - startTime: BigInt(0), - endTime: BigInt(0), - language: '', - }, - ], - }); + await this.#publishTranscription( + this.#humanInput!.participant.identity, + this.#humanInput!.subscribedTrack!.sid!, + this.#transcribedInterimText, + false, + this.#transcriptionId, + ); }); - this.#humanInput.on(HumanInputEvent.FINAL_TRANSCRIPT, (event) => { + this.#humanInput.on(HumanInputEvent.FINAL_TRANSCRIPT, async (event) => { const newTranscript = event.alternatives![0].text; if (!newTranscript) return; @@ -550,20 +548,14 @@ export class VoicePipelineAgent extends (EventEmitter as new () => TypedEmitter< this.#lastFinalTranscriptTime = Date.now(); this.transcribedText += (this.transcribedText ? ' ' : '') + newTranscript; - this.#room!.localParticipant!.publishTranscription({ - participantIdentity: this.#humanInput!.participant.identity, - trackSid: this.#humanInput!.subscribedTrack!.sid!, - segments: [ - { - text: this.transcribedText, - id: this.#transcriptionId, - final: true, - startTime: BigInt(0), - endTime: BigInt(0), - language: '', - }, - ], - }); + await this.#publishTranscription( + this.#humanInput!.participant.identity, + this.#humanInput!.subscribedTrack!.sid!, + this.transcribedText, + true, + this.#transcriptionId, + ); + this.#transcriptionId = undefined; if ( @@ -894,18 +886,54 @@ export class VoicePipelineAgent extends (EventEmitter as new () => TypedEmitter< handle.setDone(); } + async #publishTranscription( + participantIdentity: string, + trackSid: string, + text: string, + isFinal: boolean, + id: string, + ) { + this.#room!.localParticipant!.publishTranscription({ + participantIdentity: participantIdentity, + trackSid: trackSid, + segments: [ + { + text: text, + final: isFinal, + id: id, + startTime: BigInt(0), + endTime: BigInt(0), + language: '', + }, + ], + }); + const stream = await this.#room!.localParticipant!.streamText({ + senderIdentity: participantIdentity, + topic: TOPIC_TRANSCRIPTION, + attributes: { + [ATTRIBUTE_TRANSCRIPTION_TRACK_ID]: trackSid, + [ATTRIBUTE_TRANSCRIPTION_FINAL]: isFinal.toString(), + }, + }); + await stream.write(text); + await stream.close(); + } + #synthesizeAgentSpeech( speechId: string, source: string | LLMStream | AsyncIterable<string>, ): SynthesisHandle { const synchronizer = new TextAudioSynchronizer(defaultTextSyncOptions); - synchronizer.on('textUpdated', (text) => { + // TODO: where possible we would want to use deltas instead of full text segments, esp for LLM streams over the streamText API + synchronizer.on('textUpdated', async (text) => { this.#agentTranscribedText = text.text; - this.#room!.localParticipant!.publishTranscription({ - participantIdentity: this.#room!.localParticipant!.identity, - trackSid: this.#agentPublication!.sid!, - segments: [text], - }); + await this.#publishTranscription( + this.#room!.localParticipant!.identity!, + this.#agentPublication?.sid ?? '', + text.text, + text.final, + text.id, + ); }); if (!this.#agentOutput) {