Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 60 additions & 17 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout Repo
uses: actions/checkout@v5
uses: actions/checkout@v6
- uses: pnpm/action-setup@v4
- name: Use Node.js 24
uses: actions/setup-node@v6
Expand Down Expand Up @@ -81,7 +81,7 @@ jobs:
node-version: [20, 22, 24, latest]
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v6
- uses: pnpm/action-setup@v4
- name: Setup Node.js
uses: actions/setup-node@v6
Expand All @@ -90,8 +90,7 @@ jobs:
cache: pnpm
- name: Install dependencies
run: pnpm install
- name: Test livekit-rtc
run: pnpm --filter="livekit-rtc" test
# RTC tests will be ran after they are built, because builds are so slow
- name: Test livekit-server-sdk (Node)
run: pnpm --filter="livekit-server-sdk" test
- name: Test livekit-server-sdk (Browser)
Expand Down Expand Up @@ -131,7 +130,7 @@ jobs:
RUST_BACKTRACE: full
needs: check-changes
steps:
- uses: actions/checkout@v5
- uses: actions/checkout@v6
with:
submodules: recursive

Expand Down Expand Up @@ -169,26 +168,70 @@ jobs:
- name: Install dependencies
run: pnpm install

- name: Build (Linux)
# on linux, we'll also run tests after building. for the e2e suite, this would
# only run on the main repo, and not forks
- name: Build & test (Linux)
if: ${{ matrix.platform == 'linux' }}
env:
LIVEKIT_URL: ${{ secrets.LIVEKIT_URL }}
LIVEKIT_API_KEY: ${{ secrets.LIVEKIT_API_KEY }}
LIVEKIT_API_SECRET: ${{ secrets.LIVEKIT_API_SECRET }}
run: |
PROTOC_PATH=$(which protoc)
HOST_UID=$(id -u)
HOST_GID=$(id -g)
docker run --rm \
-e HOST_UID=$HOST_UID \
-e HOST_GID=$HOST_GID \
-e TARGET=${{ matrix.target }} \
-v $PWD:/workspace \
-v $PROTOC_PATH:/tmp/protoc:ro \
-w /workspace \
${{ matrix.build_image }} \
bash -c "\
uname -a; \
cp /tmp/protoc /usr/local/bin/protoc && chmod +x /usr/local/bin/protoc; \
export PATH=/root/.cargo/bin:\$PATH; \
export RUST_BACKTRACE=full; \
yum install -y openssl-devel libX11-devel mesa-libGL-devel libXext-devel libva-devel libdrm-devel clang-devel; \
curl --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y; \
curl -fsSL https://rpm.nodesource.com/setup_20.x | bash -; \
yum install -y nodejs --setopt=nodesource-nodejs.module_hotfixes=1; \
npm install --global pnpm && pnpm install; \
cd packages/livekit-rtc && pnpm build --target ${{ matrix.target }}"
bash -lc '
set -euo pipefail

uname -a
cp /tmp/protoc /usr/local/bin/protoc
chmod +x /usr/local/bin/protoc

yum install -y openssl-devel libX11-devel mesa-libGL-devel libXext-devel libva-devel libdrm-devel clang clang-devel
yum install -y gcc-toolset-12-gcc gcc-toolset-12-gcc-c++ gcc-toolset-12-libstdc++-devel libstdc++-devel
source /opt/rh/gcc-toolset-12/enable
gcc --version
g++ --version
clang --version
clang++ --version
curl -fsSL https://rpm.nodesource.com/setup_20.x | bash -
yum install -y nodejs --setopt=nodesource-nodejs.module_hotfixes=1
npm install --global pnpm

groupadd -g "$HOST_GID" hostgroup 2>/dev/null || true
useradd -m -u "$HOST_UID" -g "$HOST_GID" hostuser 2>/dev/null || true

su - hostuser -c "
set -euo pipefail
export RUST_BACKTRACE=full
curl --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y
export PATH=\$HOME/.cargo/bin:\$PATH
source /opt/rh/gcc-toolset-12/enable
export CC=clang
export CXX=clang++
TOOLCHAIN_ROOT=/opt/rh/gcc-toolset-12/root/usr
export CFLAGS=\"--gcc-toolchain=\$TOOLCHAIN_ROOT\"
export CXXFLAGS=\"--gcc-toolchain=\$TOOLCHAIN_ROOT\"
cd /workspace
CI=true pnpm install
cd packages/livekit-rtc
pnpm build --target $TARGET
cd ../..
pnpm --filter "./packages/livekit-server-sdk" build
export LIVEKIT_URL=${{ secrets.LIVEKIT_URL }}
export LIVEKIT_API_KEY=${{ secrets.LIVEKIT_API_KEY }}
export LIVEKIT_API_SECRET=${{ secrets.LIVEKIT_API_SECRET }}
pnpm --filter "@livekit/rtc-node" test
"
'

- name: Build (macOS)
if: ${{ matrix.platform == 'macos' }}
Expand Down
3 changes: 2 additions & 1 deletion examples/agent-dispatch/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
},
"dependencies": {
"dotenv": "^16.4.5",
"livekit-server-sdk": "workspace:*"
"livekit-server-sdk": "workspace:*",
"@livekit/protocol": "^1.43.4"
},
"devDependencies": {
"@types/node": "^20.10.4",
Expand Down
6 changes: 5 additions & 1 deletion packages/livekit-rtc/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,11 @@
"@bufbuild/protoc-gen-es": "^1.10.1",
"@napi-rs/cli": "^2.18.0",
"@types/node": "^22.13.10",
"vitest": "^3.0.0",
"prettier": "^3.0.3",
"tsup": "^8.3.5",
"typescript": "5.8.2"
"typescript": "5.8.2",
"livekit-server-sdk": "workspace:*"
},
"optionalDependencies": {
"@livekit/rtc-node-darwin-arm64": "workspace:*",
Expand All @@ -78,6 +80,8 @@
"artifacts": "pnpm build:ts && napi artifacts",
"build:debug": "napi build --platform",
"lint": "eslint -f unix \"src/**/*.ts\" --ignore-pattern \"src/proto/*\"",
"test": "vitest run",
"test:e2e": "vitest run src/tests/e2e.test.ts",
"universal": "napi universal",
"version": "napi version"
}
Expand Down
19 changes: 15 additions & 4 deletions packages/livekit-rtc/src/audio_mixer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// SPDX-License-Identifier: Apache-2.0
import { AsyncQueue } from './async_queue.js';
import { AudioFrame } from './audio_frame.js';
import { log } from './log.js';

// Re-export AsyncQueue for backward compatibility
export { AsyncQueue } from './async_queue.js';
Expand Down Expand Up @@ -238,7 +239,7 @@ export class AudioMixer {

for (const result of results) {
if (result.status !== 'fulfilled') {
console.warn('AudioMixer: Stream contribution failed:', result.reason);
log.warn('AudioMixer: Stream contribution failed:', result.reason);
continue;
}

Expand All @@ -261,6 +262,12 @@ export class AudioMixer {
this.removeStream(stream);
}

// If all streams are exhausted, end the mixer automatically.
// This keeps `for await...of` consumers from hanging indefinitely when inputs complete.
if (!this.ending && removals.length > 0 && this.streams.size === 0) {
this.ending = true;
}

if (!anyData) {
// No data available from any stream, wait briefly before trying again
await this.sleep(1);
Expand Down Expand Up @@ -372,25 +379,29 @@ export class AudioMixer {
}

const length = this.chunkSize * this.numChannels;
const mixed = new Int16Array(length);
// Use a wider accumulator to avoid int16 overflow while summing.
const acc = new Int32Array(length);

// Sum all contributions
for (const contrib of contributions) {
for (let i = 0; i < length; i++) {
const val = contrib[i];
if (val !== undefined) {
mixed[i] = (mixed[i] ?? 0) + val;
acc[i] = (acc[i] ?? 0) + val;
}
}
}

// Clip to Int16 range
const mixed = new Int16Array(length);
for (let i = 0; i < length; i++) {
const val = mixed[i] ?? 0;
const val = acc[i] ?? 0;
if (val > 32767) {
mixed[i] = 32767;
} else if (val < -32768) {
mixed[i] = -32768;
} else {
mixed[i] = val;
}
}

Expand Down
39 changes: 23 additions & 16 deletions packages/livekit-rtc/src/room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,14 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
}

get creationTime(): Date {
return new Date(Number(this.info?.creationTime ?? 0));
// TODO: workaround for Rust SDK bug, remove after updating to:
// https://github.com/livekit/rust-sdks/pull/822
// check if creationTime looks like seconds (less than year 3000 in ms), convert to ms if needed
let creationTimeMs = Number(this.info?.creationTime ?? 0);
if (creationTimeMs > 0 && creationTimeMs < 1e12) {
creationTimeMs *= 1000;
}
return new Date(creationTimeMs);
}

get isRecording(): boolean {
Expand Down Expand Up @@ -362,7 +369,7 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
participant.info.disconnectReason = ev.value.disconnectReason;
this.emit(RoomEvent.ParticipantDisconnected, participant);
} else {
console.log(`RoomEvent.ParticipantDisconnected: Could not find participant`);
log.warn(`RoomEvent.ParticipantDisconnected: Could not find participant`);
}
} else if (ev.case == 'localTrackPublished') {
const publication = this.localParticipant.trackPublications.get(ev.value.trackSid!);
Expand All @@ -377,15 +384,15 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
publication.resolveFirstSubscription();
this.emit(RoomEvent.LocalTrackSubscribed, publication!.track!);
} else {
console.warn(`RoomEvent.LocalTrackSubscribed: Publication not found: ${ev.value.trackSid}`);
log.warn(`RoomEvent.LocalTrackSubscribed: Publication not found: ${ev.value.trackSid}`);
}
} else if (ev.case == 'trackPublished') {
const participant = this.remoteParticipants.get(ev.value.participantIdentity!);
const publication = new RemoteTrackPublication(ev.value.publication!);
if (participant) {
participant.trackPublications.set(publication.sid!, publication);
} else {
console.warn(
log.warn(
`RoomEvent.TrackPublished: Could not find participant: ${ev.value.participantIdentity}`,
);
}
Expand All @@ -397,7 +404,7 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
if (publication) {
this.emit(RoomEvent.TrackUnpublished, publication, participant);
} else {
console.warn(`RoomEvent.TrackUnpublished: Could not find publication`);
log.warn(`RoomEvent.TrackUnpublished: Could not find publication`);
}
} else if (ev.case == 'trackSubscribed') {
const ownedTrack = ev.value.track!;
Expand All @@ -416,7 +423,7 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>

this.emit(RoomEvent.TrackSubscribed, publication.track!, publication, participant);
} catch (e: unknown) {
console.warn(`RoomEvent.TrackSubscribed: ${(e as Error).message}`);
log.warn(`RoomEvent.TrackSubscribed: ${(e as Error).message}`);
}
} else if (ev.case == 'trackUnsubscribed') {
try {
Expand All @@ -429,7 +436,7 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
publication.subscribed = false;
this.emit(RoomEvent.TrackUnsubscribed, track, publication, participant);
} catch (e: unknown) {
console.warn(`RoomEvent.TrackUnsubscribed: ${(e as Error).message}`);
log.warn(`RoomEvent.TrackUnsubscribed: ${(e as Error).message}`);
}
} else if (ev.case == 'trackSubscriptionFailed') {
try {
Expand All @@ -441,7 +448,7 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
ev.value.error,
);
} catch (e: unknown) {
console.warn(`RoomEvent.TrackSubscriptionFailed: ${(e as Error).message}`);
log.warn(`RoomEvent.TrackSubscriptionFailed: ${(e as Error).message}`);
}
} else if (ev.case == 'trackMuted') {
try {
Expand All @@ -455,7 +462,7 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
}
this.emit(RoomEvent.TrackMuted, publication, participant);
} catch (e: unknown) {
console.warn(`RoomEvent.TrackMuted: ${(e as Error).message}`);
log.warn(`RoomEvent.TrackMuted: ${(e as Error).message}`);
}
} else if (ev.case == 'trackUnmuted') {
try {
Expand All @@ -469,7 +476,7 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
}
this.emit(RoomEvent.TrackUnmuted, publication, participant);
} catch (e: unknown) {
console.warn(`RoomEvent.TrackUnmuted: ${(e as Error).message}`);
log.warn(`RoomEvent.TrackUnmuted: ${(e as Error).message}`);
}
} else if (ev.case == 'activeSpeakersChanged') {
try {
Expand All @@ -478,7 +485,7 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
);
this.emit(RoomEvent.ActiveSpeakersChanged, activeSpeakers);
} catch (e: unknown) {
console.warn(`RoomEvent.ActiveSpeakersChanged: ${(e as Error).message}`);
log.warn(`RoomEvent.ActiveSpeakersChanged: ${(e as Error).message}`);
}
} else if (ev.case == 'roomMetadataChanged') {
this.info.metadata = ev.value.metadata ?? '';
Expand All @@ -489,15 +496,15 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
participant.info.metadata = ev.value.metadata;
this.emit(RoomEvent.ParticipantMetadataChanged, participant.metadata, participant);
} catch (e: unknown) {
console.warn(`RoomEvent.ParticipantMetadataChanged: ${(e as Error).message}`);
log.warn(`RoomEvent.ParticipantMetadataChanged: ${(e as Error).message}`);
}
} else if (ev.case == 'participantNameChanged') {
try {
const participant = this.requireParticipantByIdentity(ev.value.participantIdentity!);
participant.info.name = ev.value.name;
this.emit(RoomEvent.ParticipantNameChanged, participant.name!, participant);
} catch (e: unknown) {
console.warn(`RoomEvent.ParticipantNameChanged: ${(e as Error).message}`);
log.warn(`RoomEvent.ParticipantNameChanged: ${(e as Error).message}`);
}
} else if (ev.case == 'participantAttributesChanged') {
try {
Expand All @@ -520,14 +527,14 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
this.emit(RoomEvent.ParticipantAttributesChanged, changedAttributes, participant);
}
} catch (e: unknown) {
console.warn(`RoomEvent.ParticipantAttributesChanged: ${(e as Error).message}`);
log.warn(`RoomEvent.ParticipantAttributesChanged: ${(e as Error).message}`);
}
} else if (ev.case == 'connectionQualityChanged') {
try {
const participant = this.requireParticipantByIdentity(ev.value.participantIdentity!);
this.emit(RoomEvent.ConnectionQualityChanged, ev.value.quality!, participant);
} catch (e: unknown) {
console.warn(`RoomEvent.ConnectionQualityChanged: ${(e as Error).message}`);
log.warn(`RoomEvent.ConnectionQualityChanged: ${(e as Error).message}`);
}
} else if (ev.case == 'chatMessage') {
const participant = this.retrieveParticipantByIdentity(ev.value.participantIdentity!);
Expand Down Expand Up @@ -612,7 +619,7 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
participant,
);
} catch (e: unknown) {
console.warn(`RoomEvent.ParticipantEncryptionStatusChanged: ${(e as Error).message}`);
log.warn(`RoomEvent.ParticipantEncryptionStatusChanged: ${(e as Error).message}`);
}
}
};
Expand Down
Loading
Loading