diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ce8a8860..0f1750a1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -53,7 +53,7 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout Repo - uses: actions/checkout@v5 + uses: actions/checkout@v6 - uses: pnpm/action-setup@v4 - name: Use Node.js 24 uses: actions/setup-node@v6 @@ -81,7 +81,7 @@ jobs: node-version: [20, 22, 24, latest] runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v6 - uses: pnpm/action-setup@v4 - name: Setup Node.js uses: actions/setup-node@v6 @@ -90,8 +90,7 @@ jobs: cache: pnpm - name: Install dependencies run: pnpm install - - name: Test livekit-rtc - run: pnpm --filter="livekit-rtc" test + # RTC tests will be ran after they are built, because builds are so slow - name: Test livekit-server-sdk (Node) run: pnpm --filter="livekit-server-sdk" test - name: Test livekit-server-sdk (Browser) @@ -131,7 +130,7 @@ jobs: RUST_BACKTRACE: full needs: check-changes steps: - - uses: actions/checkout@v5 + - uses: actions/checkout@v6 with: submodules: recursive @@ -169,26 +168,70 @@ jobs: - name: Install dependencies run: pnpm install - - name: Build (Linux) + # on linux, we'll also run tests after building. for the e2e suite, this would + # only run on the main repo, and not forks + - name: Build & test (Linux) if: ${{ matrix.platform == 'linux' }} + env: + LIVEKIT_URL: ${{ secrets.LIVEKIT_URL }} + LIVEKIT_API_KEY: ${{ secrets.LIVEKIT_API_KEY }} + LIVEKIT_API_SECRET: ${{ secrets.LIVEKIT_API_SECRET }} run: | PROTOC_PATH=$(which protoc) + HOST_UID=$(id -u) + HOST_GID=$(id -g) docker run --rm \ + -e HOST_UID=$HOST_UID \ + -e HOST_GID=$HOST_GID \ + -e TARGET=${{ matrix.target }} \ -v $PWD:/workspace \ -v $PROTOC_PATH:/tmp/protoc:ro \ -w /workspace \ ${{ matrix.build_image }} \ - bash -c "\ - uname -a; \ - cp /tmp/protoc /usr/local/bin/protoc && chmod +x /usr/local/bin/protoc; \ - export PATH=/root/.cargo/bin:\$PATH; \ - export RUST_BACKTRACE=full; \ - yum install -y openssl-devel libX11-devel mesa-libGL-devel libXext-devel libva-devel libdrm-devel clang-devel; \ - curl --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y; \ - curl -fsSL https://rpm.nodesource.com/setup_20.x | bash -; \ - yum install -y nodejs --setopt=nodesource-nodejs.module_hotfixes=1; \ - npm install --global pnpm && pnpm install; \ - cd packages/livekit-rtc && pnpm build --target ${{ matrix.target }}" + bash -lc ' + set -euo pipefail + + uname -a + cp /tmp/protoc /usr/local/bin/protoc + chmod +x /usr/local/bin/protoc + + yum install -y openssl-devel libX11-devel mesa-libGL-devel libXext-devel libva-devel libdrm-devel clang clang-devel + yum install -y gcc-toolset-12-gcc gcc-toolset-12-gcc-c++ gcc-toolset-12-libstdc++-devel libstdc++-devel + source /opt/rh/gcc-toolset-12/enable + gcc --version + g++ --version + clang --version + clang++ --version + curl -fsSL https://rpm.nodesource.com/setup_20.x | bash - + yum install -y nodejs --setopt=nodesource-nodejs.module_hotfixes=1 + npm install --global pnpm + + groupadd -g "$HOST_GID" hostgroup 2>/dev/null || true + useradd -m -u "$HOST_UID" -g "$HOST_GID" hostuser 2>/dev/null || true + + su - hostuser -c " + set -euo pipefail + export RUST_BACKTRACE=full + curl --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y + export PATH=\$HOME/.cargo/bin:\$PATH + source /opt/rh/gcc-toolset-12/enable + export CC=clang + export CXX=clang++ + TOOLCHAIN_ROOT=/opt/rh/gcc-toolset-12/root/usr + export CFLAGS=\"--gcc-toolchain=\$TOOLCHAIN_ROOT\" + export CXXFLAGS=\"--gcc-toolchain=\$TOOLCHAIN_ROOT\" + cd /workspace + CI=true pnpm install + cd packages/livekit-rtc + pnpm build --target $TARGET + cd ../.. + pnpm --filter "./packages/livekit-server-sdk" build + export LIVEKIT_URL=${{ secrets.LIVEKIT_URL }} + export LIVEKIT_API_KEY=${{ secrets.LIVEKIT_API_KEY }} + export LIVEKIT_API_SECRET=${{ secrets.LIVEKIT_API_SECRET }} + pnpm --filter "@livekit/rtc-node" test + " + ' - name: Build (macOS) if: ${{ matrix.platform == 'macos' }} diff --git a/examples/agent-dispatch/package.json b/examples/agent-dispatch/package.json index 2fb41d92..be2e45a3 100644 --- a/examples/agent-dispatch/package.json +++ b/examples/agent-dispatch/package.json @@ -13,7 +13,8 @@ }, "dependencies": { "dotenv": "^16.4.5", - "livekit-server-sdk": "workspace:*" + "livekit-server-sdk": "workspace:*", + "@livekit/protocol": "^1.43.4" }, "devDependencies": { "@types/node": "^20.10.4", diff --git a/packages/livekit-rtc/package.json b/packages/livekit-rtc/package.json index 2ffce480..85e0d9f4 100644 --- a/packages/livekit-rtc/package.json +++ b/packages/livekit-rtc/package.json @@ -57,9 +57,11 @@ "@bufbuild/protoc-gen-es": "^1.10.1", "@napi-rs/cli": "^2.18.0", "@types/node": "^22.13.10", + "vitest": "^3.0.0", "prettier": "^3.0.3", "tsup": "^8.3.5", - "typescript": "5.8.2" + "typescript": "5.8.2", + "livekit-server-sdk": "workspace:*" }, "optionalDependencies": { "@livekit/rtc-node-darwin-arm64": "workspace:*", @@ -78,6 +80,8 @@ "artifacts": "pnpm build:ts && napi artifacts", "build:debug": "napi build --platform", "lint": "eslint -f unix \"src/**/*.ts\" --ignore-pattern \"src/proto/*\"", + "test": "vitest run", + "test:e2e": "vitest run src/tests/e2e.test.ts", "universal": "napi universal", "version": "napi version" } diff --git a/packages/livekit-rtc/src/audio_mixer.ts b/packages/livekit-rtc/src/audio_mixer.ts index 71885b6a..1f75e16c 100644 --- a/packages/livekit-rtc/src/audio_mixer.ts +++ b/packages/livekit-rtc/src/audio_mixer.ts @@ -3,6 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 import { AsyncQueue } from './async_queue.js'; import { AudioFrame } from './audio_frame.js'; +import { log } from './log.js'; // Re-export AsyncQueue for backward compatibility export { AsyncQueue } from './async_queue.js'; @@ -238,7 +239,7 @@ export class AudioMixer { for (const result of results) { if (result.status !== 'fulfilled') { - console.warn('AudioMixer: Stream contribution failed:', result.reason); + log.warn('AudioMixer: Stream contribution failed:', result.reason); continue; } @@ -261,6 +262,12 @@ export class AudioMixer { this.removeStream(stream); } + // If all streams are exhausted, end the mixer automatically. + // This keeps `for await...of` consumers from hanging indefinitely when inputs complete. + if (!this.ending && removals.length > 0 && this.streams.size === 0) { + this.ending = true; + } + if (!anyData) { // No data available from any stream, wait briefly before trying again await this.sleep(1); @@ -372,25 +379,29 @@ export class AudioMixer { } const length = this.chunkSize * this.numChannels; - const mixed = new Int16Array(length); + // Use a wider accumulator to avoid int16 overflow while summing. + const acc = new Int32Array(length); // Sum all contributions for (const contrib of contributions) { for (let i = 0; i < length; i++) { const val = contrib[i]; if (val !== undefined) { - mixed[i] = (mixed[i] ?? 0) + val; + acc[i] = (acc[i] ?? 0) + val; } } } // Clip to Int16 range + const mixed = new Int16Array(length); for (let i = 0; i < length; i++) { - const val = mixed[i] ?? 0; + const val = acc[i] ?? 0; if (val > 32767) { mixed[i] = 32767; } else if (val < -32768) { mixed[i] = -32768; + } else { + mixed[i] = val; } } diff --git a/packages/livekit-rtc/src/room.ts b/packages/livekit-rtc/src/room.ts index b68778f7..5122d05e 100644 --- a/packages/livekit-rtc/src/room.ts +++ b/packages/livekit-rtc/src/room.ts @@ -155,7 +155,14 @@ export class Room extends (EventEmitter as new () => TypedEmitter } get creationTime(): Date { - return new Date(Number(this.info?.creationTime ?? 0)); + // TODO: workaround for Rust SDK bug, remove after updating to: + // https://github.com/livekit/rust-sdks/pull/822 + // check if creationTime looks like seconds (less than year 3000 in ms), convert to ms if needed + let creationTimeMs = Number(this.info?.creationTime ?? 0); + if (creationTimeMs > 0 && creationTimeMs < 1e12) { + creationTimeMs *= 1000; + } + return new Date(creationTimeMs); } get isRecording(): boolean { @@ -362,7 +369,7 @@ export class Room extends (EventEmitter as new () => TypedEmitter participant.info.disconnectReason = ev.value.disconnectReason; this.emit(RoomEvent.ParticipantDisconnected, participant); } else { - console.log(`RoomEvent.ParticipantDisconnected: Could not find participant`); + log.warn(`RoomEvent.ParticipantDisconnected: Could not find participant`); } } else if (ev.case == 'localTrackPublished') { const publication = this.localParticipant.trackPublications.get(ev.value.trackSid!); @@ -377,7 +384,7 @@ export class Room extends (EventEmitter as new () => TypedEmitter publication.resolveFirstSubscription(); this.emit(RoomEvent.LocalTrackSubscribed, publication!.track!); } else { - console.warn(`RoomEvent.LocalTrackSubscribed: Publication not found: ${ev.value.trackSid}`); + log.warn(`RoomEvent.LocalTrackSubscribed: Publication not found: ${ev.value.trackSid}`); } } else if (ev.case == 'trackPublished') { const participant = this.remoteParticipants.get(ev.value.participantIdentity!); @@ -385,7 +392,7 @@ export class Room extends (EventEmitter as new () => TypedEmitter if (participant) { participant.trackPublications.set(publication.sid!, publication); } else { - console.warn( + log.warn( `RoomEvent.TrackPublished: Could not find participant: ${ev.value.participantIdentity}`, ); } @@ -397,7 +404,7 @@ export class Room extends (EventEmitter as new () => TypedEmitter if (publication) { this.emit(RoomEvent.TrackUnpublished, publication, participant); } else { - console.warn(`RoomEvent.TrackUnpublished: Could not find publication`); + log.warn(`RoomEvent.TrackUnpublished: Could not find publication`); } } else if (ev.case == 'trackSubscribed') { const ownedTrack = ev.value.track!; @@ -416,7 +423,7 @@ export class Room extends (EventEmitter as new () => TypedEmitter this.emit(RoomEvent.TrackSubscribed, publication.track!, publication, participant); } catch (e: unknown) { - console.warn(`RoomEvent.TrackSubscribed: ${(e as Error).message}`); + log.warn(`RoomEvent.TrackSubscribed: ${(e as Error).message}`); } } else if (ev.case == 'trackUnsubscribed') { try { @@ -429,7 +436,7 @@ export class Room extends (EventEmitter as new () => TypedEmitter publication.subscribed = false; this.emit(RoomEvent.TrackUnsubscribed, track, publication, participant); } catch (e: unknown) { - console.warn(`RoomEvent.TrackUnsubscribed: ${(e as Error).message}`); + log.warn(`RoomEvent.TrackUnsubscribed: ${(e as Error).message}`); } } else if (ev.case == 'trackSubscriptionFailed') { try { @@ -441,7 +448,7 @@ export class Room extends (EventEmitter as new () => TypedEmitter ev.value.error, ); } catch (e: unknown) { - console.warn(`RoomEvent.TrackSubscriptionFailed: ${(e as Error).message}`); + log.warn(`RoomEvent.TrackSubscriptionFailed: ${(e as Error).message}`); } } else if (ev.case == 'trackMuted') { try { @@ -455,7 +462,7 @@ export class Room extends (EventEmitter as new () => TypedEmitter } this.emit(RoomEvent.TrackMuted, publication, participant); } catch (e: unknown) { - console.warn(`RoomEvent.TrackMuted: ${(e as Error).message}`); + log.warn(`RoomEvent.TrackMuted: ${(e as Error).message}`); } } else if (ev.case == 'trackUnmuted') { try { @@ -469,7 +476,7 @@ export class Room extends (EventEmitter as new () => TypedEmitter } this.emit(RoomEvent.TrackUnmuted, publication, participant); } catch (e: unknown) { - console.warn(`RoomEvent.TrackUnmuted: ${(e as Error).message}`); + log.warn(`RoomEvent.TrackUnmuted: ${(e as Error).message}`); } } else if (ev.case == 'activeSpeakersChanged') { try { @@ -478,7 +485,7 @@ export class Room extends (EventEmitter as new () => TypedEmitter ); this.emit(RoomEvent.ActiveSpeakersChanged, activeSpeakers); } catch (e: unknown) { - console.warn(`RoomEvent.ActiveSpeakersChanged: ${(e as Error).message}`); + log.warn(`RoomEvent.ActiveSpeakersChanged: ${(e as Error).message}`); } } else if (ev.case == 'roomMetadataChanged') { this.info.metadata = ev.value.metadata ?? ''; @@ -489,7 +496,7 @@ export class Room extends (EventEmitter as new () => TypedEmitter participant.info.metadata = ev.value.metadata; this.emit(RoomEvent.ParticipantMetadataChanged, participant.metadata, participant); } catch (e: unknown) { - console.warn(`RoomEvent.ParticipantMetadataChanged: ${(e as Error).message}`); + log.warn(`RoomEvent.ParticipantMetadataChanged: ${(e as Error).message}`); } } else if (ev.case == 'participantNameChanged') { try { @@ -497,7 +504,7 @@ export class Room extends (EventEmitter as new () => TypedEmitter participant.info.name = ev.value.name; this.emit(RoomEvent.ParticipantNameChanged, participant.name!, participant); } catch (e: unknown) { - console.warn(`RoomEvent.ParticipantNameChanged: ${(e as Error).message}`); + log.warn(`RoomEvent.ParticipantNameChanged: ${(e as Error).message}`); } } else if (ev.case == 'participantAttributesChanged') { try { @@ -520,14 +527,14 @@ export class Room extends (EventEmitter as new () => TypedEmitter this.emit(RoomEvent.ParticipantAttributesChanged, changedAttributes, participant); } } catch (e: unknown) { - console.warn(`RoomEvent.ParticipantAttributesChanged: ${(e as Error).message}`); + log.warn(`RoomEvent.ParticipantAttributesChanged: ${(e as Error).message}`); } } else if (ev.case == 'connectionQualityChanged') { try { const participant = this.requireParticipantByIdentity(ev.value.participantIdentity!); this.emit(RoomEvent.ConnectionQualityChanged, ev.value.quality!, participant); } catch (e: unknown) { - console.warn(`RoomEvent.ConnectionQualityChanged: ${(e as Error).message}`); + log.warn(`RoomEvent.ConnectionQualityChanged: ${(e as Error).message}`); } } else if (ev.case == 'chatMessage') { const participant = this.retrieveParticipantByIdentity(ev.value.participantIdentity!); @@ -612,7 +619,7 @@ export class Room extends (EventEmitter as new () => TypedEmitter participant, ); } catch (e: unknown) { - console.warn(`RoomEvent.ParticipantEncryptionStatusChanged: ${(e as Error).message}`); + log.warn(`RoomEvent.ParticipantEncryptionStatusChanged: ${(e as Error).message}`); } } }; diff --git a/packages/livekit-rtc/src/tests/e2e.test.ts b/packages/livekit-rtc/src/tests/e2e.test.ts new file mode 100644 index 00000000..5ea186fe --- /dev/null +++ b/packages/livekit-rtc/src/tests/e2e.test.ts @@ -0,0 +1,514 @@ +// SPDX-FileCopyrightText: 2026 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { AccessToken } from 'livekit-server-sdk'; +import { randomUUID } from 'node:crypto'; +import { setTimeout as delay } from 'node:timers/promises'; +import { afterAll, describe, expect, it } from 'vitest'; +import { + AudioFrame, + AudioSource, + AudioStream, + ConnectionState, + LocalAudioTrack, + ParticipantKind, + Room, + RoomEvent, + RpcError, + TrackPublishOptions, + TrackSource, + dispose, +} from '../index.js'; + +const hasE2EEnv = + !!process.env.LIVEKIT_URL && !!process.env.LIVEKIT_API_KEY && !!process.env.LIVEKIT_API_SECRET; +const describeE2E = hasE2EEnv ? describe : describe.skip; +const testTimeoutMs = 10_000; + +type TestEnv = { + url: string; + apiKey: string; + apiSecret: string; +}; + +function normalizeLiveKitUrl(url: string): string { + if (url.startsWith('http://')) return `ws://${url.slice('http://'.length)}`; + if (url.startsWith('https://')) return `wss://${url.slice('https://'.length)}`; + return url; +} + +function getTestEnv(): TestEnv { + if (!hasE2EEnv) { + throw new Error( + 'Missing required env vars for e2e: LIVEKIT_URL, LIVEKIT_API_KEY, LIVEKIT_API_SECRET', + ); + } + return { + url: normalizeLiveKitUrl(process.env.LIVEKIT_URL!), + apiKey: process.env.LIVEKIT_API_KEY!, + apiSecret: process.env.LIVEKIT_API_SECRET!, + }; +} + +async function withTimeout(promise: Promise, timeoutMs: number, message: string): Promise { + return await Promise.race([ + promise, + (async () => { + await delay(timeoutMs); + throw new Error(message); + })(), + ]); +} + +async function waitFor( + condition: () => boolean, + opts: { timeoutMs: number; intervalMs?: number; debugName?: string }, +): Promise { + const intervalMs = opts.intervalMs ?? 50; + const deadline = Date.now() + opts.timeoutMs; + while (Date.now() < deadline) { + if (condition()) return; + await delay(intervalMs); + } + throw new Error(`Timed out waiting for condition${opts.debugName ? ` (${opts.debugName})` : ''}`); +} + +async function createJoinToken(params: { + env: TestEnv; + roomName: string; + identity: string; + name: string; +}): Promise { + const token = new AccessToken(params.env.apiKey, params.env.apiSecret, { + identity: params.identity, + name: params.name, + ttl: '30m', + }); + token.addGrant({ + room: params.roomName, + roomJoin: true, + roomCreate: true, + canPublish: true, + canSubscribe: true, + }); + return await token.toJwt(); +} + +async function connectTestRooms(count: number): Promise<{ roomName: string; rooms: Room[] }> { + const env = getTestEnv(); + const roomName = `test_room_${randomUUID()}`; + const rooms = await Promise.all( + Array.from({ length: count }, async (_, i) => { + const token = await createJoinToken({ + env, + roomName, + identity: `p${i}`, + name: `Participant ${i}`, + }); + const room = new Room(); + await room.connect(env.url, token, { autoSubscribe: true, dynacast: false }); + return room; + }), + ); + + const start = Date.now(); + await waitFor(() => rooms.every((r) => r.remoteParticipants.size === count - 1), { + timeoutMs: 5000, + debugName: `participant visibility (${Date.now() - start}ms)`, + }); + + return { roomName, rooms }; +} + +function waitForRoomEvent( + room: Room, + event: RoomEvent, + timeoutMs: number, + take: (...args: any[]) => R, +): Promise { + return withTimeout( + new Promise((resolve) => { + const handler = (...args: any[]) => { + // typed-emitter doesn't expose `.once` in the type surface, so do manual once+cleanup. + room.off(event as any, handler as any); + resolve(take(...args)); + }; + room.on(event as any, handler as any); + }), + timeoutMs, + `Timed out waiting for ${event}`, + ); +} + +function concatUint8(chunks: Uint8Array[]): Uint8Array { + const len = chunks.reduce((acc, c) => acc + c.byteLength, 0); + const out = new Uint8Array(len); + let offset = 0; + for (const c of chunks) { + out.set(c, offset); + offset += c.byteLength; + } + return out; +} + +function channelSamples(frame: AudioFrame, channelIndex: number): Int16Array { + const { data, channels, samplesPerChannel } = frame; + const out = new Int16Array(samplesPerChannel); + for (let i = 0; i < samplesPerChannel; i++) { + out[i] = data[i * channels + channelIndex]!; + } + return out; +} + +function estimateFreqHz(samples: Int16Array, sampleRate: number): number { + if (samples.length < sampleRate / 10) { + // need at least ~100ms for low-frequency signals + return 0; + } + + // basic autocorrelation over a plausible band around 60Hz + const expectedHz = 60; + const minHz = 20; + const maxHz = 200; + const minLag = Math.floor(sampleRate / maxHz); + const maxLag = Math.floor(sampleRate / minHz); + const expectedLag = Math.floor(sampleRate / expectedHz); + const searchRadius = Math.floor(expectedLag * 0.5); + const lagStart = Math.max(minLag, expectedLag - searchRadius); + const lagEnd = Math.min(maxLag, expectedLag + searchRadius); + + const x = new Float64Array(samples.length); + let mean = 0; + for (let i = 0; i < samples.length; i++) mean += samples[i]!; + mean /= samples.length; + for (let i = 0; i < samples.length; i++) x[i] = samples[i]! - mean; + + let bestLag = 0; + let bestCorr = -Infinity; + for (let lag = lagStart; lag <= lagEnd; lag++) { + let corr = 0; + for (let i = 0; i < x.length - lag; i++) corr += x[i]! * x[i + lag]!; + if (corr > bestCorr) { + bestCorr = corr; + bestLag = lag; + } + } + + return bestLag > 0 ? sampleRate / bestLag : 0; +} + +describeE2E('livekit-rtc e2e', () => { + afterAll(async () => { + await dispose(); + }); + + it( + 'connects to a room', + async () => { + const { roomName, rooms } = await connectTestRooms(1); + const room = rooms[0]!; + + expect(room.connectionState).toBe(ConnectionState.CONN_CONNECTED); + expect(room.name).toBe(roomName); + expect(room.remoteParticipants.size).toBe(0); + + expect(room.creationTime.getTime()).toBeGreaterThan(0); + expect(Math.abs(room.creationTime.getTime() - Date.now())).toBeLessThanOrEqual(10_000); + + expect(room.localParticipant?.sid).toMatch(/^PA_/); + expect(room.localParticipant?.identity).toBe('p0'); + expect(room.localParticipant?.name).toBe('Participant 0'); + expect(room.localParticipant?.kind).toBe(ParticipantKind.STANDARD); + + await room.disconnect(); + }, + testTimeoutMs, + ); + + it( + 'connects multiple participants to the same room', + async () => { + const { roomName, rooms } = await connectTestRooms(2); + const [first, second] = rooms; + + expect(first?.name).toBe(roomName); + expect(second?.name).toBe(roomName); + expect(first?.remoteParticipants.get(second!.localParticipant!.identity)).toBeTruthy(); + expect(second?.remoteParticipants.get(first!.localParticipant!.identity)).toBeTruthy(); + + await Promise.all(rooms.map((r) => r.disconnect())); + }, + testTimeoutMs, + ); + + it( + 'emits participantDisconnected when a participant leaves', + async () => { + const { rooms } = await connectTestRooms(2); + const [first, second] = rooms; + const secondIdentity = second!.localParticipant!.identity; + const secondName = second!.localParticipant!.name!; + + const disconnected = waitForRoomEvent( + first!, + RoomEvent.ParticipantDisconnected, + testTimeoutMs, + (p: { identity: string; name?: string }) => ({ identity: p.identity, name: p.name ?? '' }), + ); + + await second!.disconnect(); + + const ev = await disconnected; + expect(ev.identity).toBe(secondIdentity); + expect(ev.name).toBe(secondName); + + await first!.disconnect(); + }, + testTimeoutMs, + ); + + it( + 'transfers audio between two participants (sine detection)', + async () => { + const cases = [ + { pubRateHz: 48_000, pubChannels: 1, subRateHz: 48_000, subChannels: 1 }, + { pubRateHz: 48_000, pubChannels: 2, subRateHz: 48_000, subChannels: 2 }, + { pubRateHz: 48_000, pubChannels: 2, subRateHz: 24_000, subChannels: 2 }, + { pubRateHz: 24_000, pubChannels: 2, subRateHz: 24_000, subChannels: 1 }, + ] as const; + + for (const params of cases) { + const { rooms } = await connectTestRooms(2); + const [subRoom, pubRoom] = rooms; + + const subscribed = waitForRoomEvent( + subRoom!, + RoomEvent.TrackSubscribed, + 15_000, + (track: unknown) => track, + ); + + const source = new AudioSource(params.pubRateHz, params.pubChannels); + const track = LocalAudioTrack.createAudioTrack('sine', source); + const options = new TrackPublishOptions(); + options.source = TrackSource.SOURCE_MICROPHONE; + await pubRoom!.localParticipant!.publishTrack(track, options); + + const remoteTrack = await subscribed; + const stream = new AudioStream(remoteTrack as any, { + sampleRate: params.subRateHz, + numChannels: params.subChannels, + }); + const reader = stream.getReader(); + + const sineHz = 60; + const framesToAnalyze = 100; + const collected: Int16Array[] = Array.from( + { length: params.subChannels }, + () => new Int16Array(0), + ); + + const readTask = (async () => { + let frames = 0; + while (frames < framesToAnalyze) { + const { done, value } = await reader.read(); + if (done) break; + expect(value.sampleRate).toBe(params.subRateHz); + expect(value.channels).toBe(params.subChannels); + for (let ch = 0; ch < params.subChannels; ch++) { + const s = channelSamples(value, ch); + const prev = collected[ch]!; + const next = new Int16Array(prev.length + s.length); + next.set(prev, 0); + next.set(s, prev.length); + collected[ch] = next; + } + frames++; + } + expect(frames).toBe(framesToAnalyze); + })(); + + const samplesPer10ms = Math.floor(params.pubRateHz / 100); + const amplitude = 0.8 * 32767; + const publishTask = (async () => { + let t = 0; + for (let i = 0; i < framesToAnalyze + 20; i++) { + const frame = AudioFrame.create(params.pubRateHz, params.pubChannels, samplesPer10ms); + for (let s = 0; s < samplesPer10ms; s++) { + const v = Math.round( + amplitude * Math.sin((2 * Math.PI * sineHz * t) / params.pubRateHz), + ); + t++; + for (let ch = 0; ch < params.pubChannels; ch++) { + frame.data[s * params.pubChannels + ch] = v; + } + } + await source.captureFrame(frame); + } + await source.waitForPlayout(); + })(); + + await withTimeout( + Promise.all([readTask, publishTask]), + 20_000, + 'Timed out during audio test', + ); + + for (let ch = 0; ch < params.subChannels; ch++) { + const detected = estimateFreqHz(collected[ch]!, params.subRateHz); + expect(Math.abs(detected - sineHz)).toBeLessThan(20); + } + + reader.releaseLock(); + await track.close(); + await Promise.all(rooms.map((r) => r.disconnect())); + } + }, + testTimeoutMs * 2, + ); + + it( + 'publishes and receives reliable data packets', + async () => { + const { rooms } = await connectTestRooms(2); + const [receivingRoom, sendingRoom] = rooms; + const receiverIdentity = receivingRoom!.localParticipant!.identity; + + const iterations = 128; + const payload = new Uint8Array(4096).fill(0xfa); + let received = 0; + + const receiveTask = withTimeout( + new Promise((resolve) => { + receivingRoom!.on(RoomEvent.DataReceived, (data, participant) => { + if (participant?.identity !== sendingRoom!.localParticipant!.identity) return; + if (data.byteLength !== payload.byteLength) return; + received++; + if (received === iterations) resolve(); + }); + }), + testTimeoutMs, + 'Timed out waiting for all reliable packets', + ); + + const sendTask = (async () => { + for (let i = 0; i < iterations; i++) { + await sendingRoom!.localParticipant!.publishData(payload, { + reliable: true, + destination_identities: [receiverIdentity], + }); + await delay(10); + } + })(); + + await Promise.all([receiveTask, sendTask]); + await Promise.all(rooms.map((r) => r.disconnect())); + }, + testTimeoutMs, + ); + + it( + 'sends and receives text and byte streams', + async () => { + const { rooms } = await connectTestRooms(2); + const [receivingRoom, sendingRoom] = rooms; + const senderIdentity = sendingRoom!.localParticipant!.identity; + + const topic = 'some-topic'; + + const textToSend = 'some-text'; + const receivedText = withTimeout( + new Promise((resolve) => { + receivingRoom!.registerTextStreamHandler(topic, async (reader, sender) => { + expect(sender.identity).toBe(senderIdentity); + resolve(await reader.readAll()); + }); + }), + testTimeoutMs, + 'Timed out waiting for text stream', + ); + + const textInfo = await sendingRoom!.localParticipant!.sendText(textToSend, { topic }); + expect(textInfo.streamId).toBeTruthy(); + expect(Math.abs(textInfo.timestamp - Date.now())).toBeLessThanOrEqual(1_000); + expect(textInfo.mimeType).toBe('text/plain'); + expect(textInfo.topic).toBe(topic); + + expect(await receivedText).toBe(textToSend); + + const bytesToSend = new Uint8Array(16).fill(0xfa); + const receivedBytes = withTimeout( + new Promise((resolve) => { + receivingRoom!.registerByteStreamHandler(topic, async (reader, sender) => { + expect(sender.identity).toBe(senderIdentity); + const chunks = await reader.readAll(); + resolve(concatUint8(chunks)); + }); + }), + testTimeoutMs, + 'Timed out waiting for byte stream', + ); + + const writer = await sendingRoom!.localParticipant!.streamBytes({ + topic, + totalSize: bytesToSend.byteLength, + }); + await writer.write(bytesToSend); + await writer.close(); + + const byteInfo = writer.info; + expect(byteInfo.streamId).toBeTruthy(); + expect(Math.abs(byteInfo.timestamp - Date.now())).toBeLessThanOrEqual(1_000); + expect(byteInfo.mimeType).toBe('application/octet-stream'); + expect(byteInfo.topic).toBe(topic); + + expect(await receivedBytes).toEqual(bytesToSend); + + await Promise.all(rooms.map((r) => r.disconnect())); + }, + testTimeoutMs, + ); + + it( + 'invokes RPC methods and returns structured errors', + async () => { + const { rooms } = await connectTestRooms(2); + const [callerRoom, calleeRoom] = rooms; + + const method = 'test-method'; + const payload = 'test-payload'; + + calleeRoom!.localParticipant!.registerRpcMethod(method, async (data) => data.payload); + + await expect( + callerRoom!.localParticipant!.performRpc({ + destinationIdentity: calleeRoom!.localParticipant!.identity, + method, + payload, + responseTimeout: 500, + }), + ).resolves.toBe(payload); + + await expect( + callerRoom!.localParticipant!.performRpc({ + destinationIdentity: calleeRoom!.localParticipant!.identity, + method: 'unregistered-method', + payload, + responseTimeout: 500, + }), + ).rejects.toMatchObject({ code: RpcError.ErrorCode.UNSUPPORTED_METHOD }); + + await expect( + callerRoom!.localParticipant!.performRpc({ + destinationIdentity: 'unknown-participant', + method, + payload, + responseTimeout: 500, + }), + ).rejects.toMatchObject({ code: RpcError.ErrorCode.CONNECTION_TIMEOUT }); + + await Promise.all(rooms.map((r) => r.disconnect())); + }, + testTimeoutMs * 2, + ); +}); diff --git a/packages/livekit-rtc/tsconfig.test.json b/packages/livekit-rtc/tsconfig.test.json new file mode 100644 index 00000000..234fdbd1 --- /dev/null +++ b/packages/livekit-rtc/tsconfig.test.json @@ -0,0 +1,9 @@ +{ + "extends": "../../tsconfig.json", + "compilerOptions": { + "lib": ["es2022", "dom"], + "types": ["node", "vitest"] + }, + "include": ["src/**/*.test.ts", "src/e2e/**/*.ts", "vite.config.js"], + "exclude": ["dist", "target", "node_modules"] +} diff --git a/packages/livekit-rtc/vite.config.js b/packages/livekit-rtc/vite.config.js index cf74485c..022d5f31 100644 --- a/packages/livekit-rtc/vite.config.js +++ b/packages/livekit-rtc/vite.config.js @@ -2,7 +2,7 @@ // // SPDX-License-Identifier: Apache-2.0 -import { defineConfig } from 'vite'; +import { defineConfig } from 'vitest/config'; export default defineConfig({ test: { diff --git a/packages/livekit-server-sdk/package.json b/packages/livekit-server-sdk/package.json index 692f90d7..1a6eeeb4 100644 --- a/packages/livekit-server-sdk/package.json +++ b/packages/livekit-server-sdk/package.json @@ -30,7 +30,7 @@ "src" ], "scripts": { - "build": "tsup --onSuccess \"tsc --declaration --emitDeclarationOnly\"", + "build": "tsup --onSuccess \"tsc -p tsconfig.json --declaration --emitDeclarationOnly --skipLibCheck\"", "build:watch": "tsc --watch", "build-docs": "typedoc", "changeset": "changeset", diff --git a/packages/livekit-server-sdk/tsconfig.json b/packages/livekit-server-sdk/tsconfig.json index 9d439bb7..ec1f47b4 100644 --- a/packages/livekit-server-sdk/tsconfig.json +++ b/packages/livekit-server-sdk/tsconfig.json @@ -2,7 +2,10 @@ "extends": "../../tsconfig.json", "compilerOptions": { "outDir": "dist", - "declarationDir": "dist" + "declarationDir": "dist", + "module": "esnext", + "moduleResolution": "bundler", + "lib": ["es2020"] }, "include": ["src/**/*.ts"], "exclude": ["src/**/*.test.ts", "vite.config.ts"], diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 0d330458..0909343e 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -74,6 +74,9 @@ importers: examples/agent-dispatch: dependencies: + '@livekit/protocol': + specifier: ^1.43.4 + version: 1.43.4 dotenv: specifier: ^16.4.5 version: 16.4.5 @@ -228,6 +231,9 @@ importers: '@types/node': specifier: ^22.13.10 version: 22.15.29 + livekit-server-sdk: + specifier: workspace:* + version: link:../livekit-server-sdk prettier: specifier: ^3.0.3 version: 3.6.2 @@ -237,6 +243,9 @@ importers: typescript: specifier: 5.8.2 version: 5.8.2 + vitest: + specifier: ^3.0.0 + version: 3.2.4(@edge-runtime/vm@5.0.0)(@types/node@22.15.29)(happy-dom@20.0.0) optionalDependencies: '@livekit/rtc-node-darwin-arm64': specifier: workspace:* @@ -998,6 +1007,9 @@ packages: '@livekit/protocol@1.43.4': resolution: {integrity: sha512-mJDFt/p+G2OKmIGizYiACK7Jb06wd42m9Pe7Y9cAOfdYpvwCqHlw4yul5Z7iRU3VKPsYJ27WL3oeHEoiu+HuAA==} + '@livekit/protocol@1.43.4': + resolution: {integrity: sha512-mJDFt/p+G2OKmIGizYiACK7Jb06wd42m9Pe7Y9cAOfdYpvwCqHlw4yul5Z7iRU3VKPsYJ27WL3oeHEoiu+HuAA==} + '@livekit/typed-emitter@3.0.0': resolution: {integrity: sha512-9bl0k4MgBPZu3Qu3R3xy12rmbW17e3bE9yf4YY85gJIQ3ezLEj/uzpKHWBsLaDoL5Mozz8QCgggwIBudYQWeQg==} @@ -4417,6 +4429,10 @@ snapshots: '@livekit/mutex@1.1.1': {} + '@livekit/protocol@1.43.4': + dependencies: + '@bufbuild/protobuf': 1.10.1 + '@livekit/protocol@1.43.4': dependencies: '@bufbuild/protobuf': 1.10.1