Skip to content

AssemblyAI Plugin #315

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: next
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions plugins/assemblyai/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<!--
SPDX-FileCopyrightText: 2024 LiveKit, Inc.

SPDX-License-Identifier: Apache-2.0
-->
# AssemblyAI plugin for LiveKit Agents

The Agents Framework is designed for building realtime, programmable
participants that run on servers. Use it to create conversational, multi-modal
voice agents that can see, hear, and understand.

This package contains the AssemblyAI plugin, which allows for speech recognition.
Refer to the [documentation](https://docs.livekit.io/agents/overview/) for
information on how to use it.
See the [repository](https://github.com/livekit/agents-js) for more information
about the framework as a whole.
20 changes: 20 additions & 0 deletions plugins/assemblyai/api-extractor.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/**
* Config file for API Extractor. For more info, please visit: https://api-extractor.com
*/
{
"$schema": "https://developer.microsoft.com/json-schemas/api-extractor/v7/api-extractor.schema.json",

/**
* Optionally specifies another JSON config file that this file extends from. This provides a way for
* standard settings to be shared across multiple projects.
*
* If the path starts with "./" or "../", the path is resolved relative to the folder of the file that contains
* the "extends" field. Otherwise, the first path segment is interpreted as an NPM package name, and will be
* resolved using NodeJS require().
*
* SUPPORTED TOKENS: none
* DEFAULT VALUE: ""
*/
"extends": "../../api-extractor-shared.json",
"mainEntryPointFilePath": "./dist/index.d.ts"
}
48 changes: 48 additions & 0 deletions plugins/assemblyai/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
{
"name": "@livekit/agents-plugin-assemblyai",
"version": "0.0.0",
"description": "AssemblyAI plugin for LiveKit Agents for Node.js",
"main": "dist/index.js",
"require": "dist/index.cjs",
"types": "dist/index.d.ts",
"exports": {
".": {
"types": "./dist/index.d.ts",
"import": "./dist/index.js",
"require": "./dist/index.cjs"
}
},
"author": "LiveKit",
"type": "module",
"repository": "git@github.com:livekit/agents-js.git",
"license": "Apache-2.0",
"files": [
"dist",
"src",
"README.md"
],
"scripts": {
"build": "tsup --onSuccess \"tsc --declaration --emitDeclarationOnly\"",
"clean": "rm -rf dist",
"clean:build": "pnpm clean && pnpm build",
"lint": "eslint -f unix \"src/**/*.{ts,js}\"",
"api:check": "api-extractor run --typescript-compiler-folder ../../node_modules/typescript",
"api:update": "api-extractor run --local --typescript-compiler-folder ../../node_modules/typescript --verbose"
},
"devDependencies": {
"@livekit/agents": "workspace:^x",
"@livekit/agents-plugin-silero": "workspace:^x",
"@livekit/agents-plugins-test": "workspace:^x",
"@livekit/rtc-node": "^0.13.4",
"@microsoft/api-extractor": "^7.35.0",
"tsup": "^8.3.5",
"typescript": "^5.0.0"
},
"dependencies": {
"assemblyai": "^4.9.0"
},
"peerDependencies": {
"@livekit/agents": "workspace:^x",
"@livekit/rtc-node": "^0.13.4"
}
}
5 changes: 5 additions & 0 deletions plugins/assemblyai/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// SPDX-FileCopyrightText: 2024 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0

export * from './stt.js';
13 changes: 13 additions & 0 deletions plugins/assemblyai/src/stt.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// SPDX-FileCopyrightText: 2024 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import { initializeLogger } from '@livekit/agents';
import { VAD } from '@livekit/agents-plugin-silero';
import { stt } from '@livekit/agents-plugins-test';
import { describe } from 'vitest';
import { STT } from './stt.js';

describe('AssemblyAI', async () => {
initializeLogger({ pretty: false });
await stt(new STT(), await VAD.load(), { nonStreaming: false });
});
197 changes: 197 additions & 0 deletions plugins/assemblyai/src/stt.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
// SPDX-FileCopyrightText: 2024 Josiah Bryan, LLC
//
// SPDX-License-Identifier: Apache-2.0
import { type AudioBuffer, AudioByteStream, AudioEnergyFilter, log, stt } from '@livekit/agents';
import type { AudioFrame } from '@livekit/rtc-node';
import { AssemblyAI } from 'assemblyai';
import type { RealtimeTranscriber } from 'assemblyai';

export interface STTOptions {
apiKey?: string;
interimResults: boolean;
sampleRate: number;
keywords: [string, number][];
endUtteranceSilenceThreshold?: number;
}

const defaultSTTOptions: STTOptions = {
apiKey: process.env.ASSEMBLY_AI_KEY,
interimResults: true,
sampleRate: 16000,
keywords: [],
// NOTE:
// The default is 700ms from AssemblyAI.
// I use a low default of 300ms here because I also use
// the new end-of-utterance model from LiveKit to handle
// turn detection in my agent. Which means that even though
// this will quickly return a final transcript EVEN THOUGH
// USER IS NOT DONE SPEAKING, the EOU model from LiveKit
// DOES properly differentiate and doesn't interrupt (magically!)
// Ref: https://blog.livekit.io/using-a-transformer-to-improve-end-of-turn-detection/
endUtteranceSilenceThreshold: 200,
};

export class STT extends stt.STT {
#opts: STTOptions;
#logger = log();
label = 'assemblyai.STT';

constructor(opts: Partial<STTOptions> = defaultSTTOptions) {
super({
streaming: true,
interimResults: opts.interimResults ?? defaultSTTOptions.interimResults,
});
if (opts.apiKey === undefined && defaultSTTOptions.apiKey === undefined) {
throw new Error(
'AssemblyAI API key is required, whether as an argument or as $ASSEMBLY_AI_KEY',
);
}

this.#opts = { ...defaultSTTOptions, ...opts };
}

// eslint-disable-next-line @typescript-eslint/no-unused-vars
async _recognize(_: AudioBuffer): Promise<stt.SpeechEvent> {
throw new Error('Recognize is not supported on AssemblyAI STT');
}

stream(): stt.SpeechStream {
return new SpeechStream(this, this.#opts);
}
}

export class SpeechStream extends stt.SpeechStream {
#opts: STTOptions;
#audioEnergyFilter: AudioEnergyFilter;
#logger = log();
#speaking = false;
#client: AssemblyAI;
#transcriber?: RealtimeTranscriber;
label = 'assemblyai.SpeechStream';

constructor(stt: STT, opts: STTOptions) {
super(stt);
this.#opts = opts;
this.closed = false;
this.#audioEnergyFilter = new AudioEnergyFilter();
this.#client = new AssemblyAI({
// Defaults to the apiKey in defaultSTTOptions, which pulls in process.env.ASSEMBLY_AI_KEY,
apiKey: this.#opts.apiKey || '',
});

this.#run();
}

async #run() {
try {
// Create the realtime transcriber with parameters that AssemblyAI supports
this.#transcriber = this.#client.realtime.transcriber({
sampleRate: this.#opts.sampleRate,
wordBoost: this.#opts.keywords.map((k) => k[0]),
endUtteranceSilenceThreshold: this.#opts.endUtteranceSilenceThreshold,
});

// Set up event handlers
this.#transcriber.on('open', (data) => {
this.#logger
.child({ sessionId: data.sessionId, expiresAt: data.expiresAt })
.debug(`AssemblyAI session opened`);
});

this.#transcriber.on('close', (code, reason) => {
this.#logger.child({ code, reason }).debug(`AssemblyAI session closed`);
if (!this.closed) {
// Try to reconnect if not intentionally closed
this.#run();
}
});

this.#transcriber.on('error', (error) => {
this.#logger.child({ error: error.message }).error(`AssemblyAI error`);
});

this.#transcriber.on('transcript', (transcript) => {
if (this.closed) return;

if (!transcript.text || transcript.text.trim() === '') {
return;
}

// If we haven't started speaking yet, emit a start of speech event
if (!this.#speaking) {
this.#speaking = true;
this.queue.put({ type: stt.SpeechEventType.START_OF_SPEECH });
}

// Handle partial and final transcripts
if (transcript.message_type === 'PartialTranscript') {
this.queue.put({
type: stt.SpeechEventType.INTERIM_TRANSCRIPT,
alternatives: [assemblyTranscriptToSpeechData(transcript)],
});
} else if (transcript.message_type === 'FinalTranscript') {
this.queue.put({
type: stt.SpeechEventType.FINAL_TRANSCRIPT,
alternatives: [assemblyTranscriptToSpeechData(transcript)],
});
}
});

// Connect to the AssemblyAI service
await this.#transcriber.connect();

// Process audio data from the input stream
const sendTask = async () => {
const samples100Ms = Math.floor(this.#opts.sampleRate / 10);
const stream = new AudioByteStream(this.#opts.sampleRate, 1, samples100Ms);

for await (const data of this.input) {
if (this.closed) break;

let frames: AudioFrame[];
if (data === SpeechStream.FLUSH_SENTINEL) {
frames = stream.flush();
} else if (data.sampleRate === this.#opts.sampleRate) {
frames = stream.write(data.data.buffer);
} else {
throw new Error(`Sample rate or channel count of frame does not match`);
}

for await (const frame of frames) {
if (this.#audioEnergyFilter.pushFrame(frame)) {
// Send audio data to AssemblyAI
this.#transcriber?.sendAudio(new Uint8Array(frame.data.buffer));
}
}
}

// Close the connection when done
if (this.#transcriber) {
await this.#transcriber.close();
}
};

// Start processing audio
await sendTask();
} catch (error: any) {
this.#logger.child({ error: error.message }).error(`Error in AssemblyAI STT`);

// Try to reconnect after a delay if not intentionally closed
if (!this.closed) {
setTimeout(() => this.#run(), 5000);
}
}
}
}

// Helper function to convert AssemblyAI transcript to SpeechData
const assemblyTranscriptToSpeechData = (transcript: any): stt.SpeechData => {
return {
language: 'en-US',
startTime: transcript.audio_start || 0,
endTime: transcript.audio_end || 0,
confidence: transcript.confidence || 1.0,
text: transcript.text || '',
};
};
16 changes: 16 additions & 0 deletions plugins/assemblyai/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"extends": "../../tsconfig.json",
"include": ["./src"],
"compilerOptions": {
// match output dir to input dir. e.g. dist/index instead of dist/src/index
"rootDir": "./src",
"declarationDir": "./dist",
"outDir": "./dist"
},
"typedocOptions": {
"name": "plugins/agents-plugin-assemblyai",
"entryPointStrategy": "resolve",
"readme": "none",
"entryPoints": ["src/index.ts"]
}
}
7 changes: 7 additions & 0 deletions plugins/assemblyai/tsup.config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { defineConfig } from 'tsup';

import defaults from '../../tsup.config';

export default defineConfig({
...defaults,
});
Loading
Loading