Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RPC implementation via FFI #276

Merged
merged 141 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from 88 commits
Commits
Show all changes
141 commits
Select commit Hold shift + click to select a range
d7e3a4d
draft
bcherry Sep 20, 2024
bc4d82c
wip
bcherry Sep 20, 2024
9e052e9
wip
bcherry Sep 20, 2024
d64a4b5
wip
bcherry Sep 20, 2024
a55c07e
harness
bcherry Sep 20, 2024
6a36334
working
bcherry Sep 20, 2024
d7eae1a
working
bcherry Sep 20, 2024
0083605
better
bcherry Sep 20, 2024
6aa8a61
better
bcherry Sep 20, 2024
dd659ce
better
bcherry Sep 20, 2024
bb870a2
fixes
bcherry Sep 20, 2024
e648522
Do it
bcherry Sep 20, 2024
69719dd
comments
bcherry Sep 20, 2024
ead34bb
Create itchy-cheetahs-taste.md
bcherry Sep 20, 2024
b7a36aa
switch to registers
bcherry Sep 21, 2024
c5d1e52
cleanup
bcherry Sep 21, 2024
c6d80a8
autoack
bcherry Sep 21, 2024
4c70522
Clean up
bcherry Sep 21, 2024
45a819f
Merge remote-tracking branch 'origin/bcherry/rpc' into bcherry/rpc
bcherry Sep 21, 2024
3a925ea
Merge remote-tracking branch 'origin/main' into bcherry/rpc
bcherry Sep 21, 2024
f216ac1
updates
bcherry Sep 21, 2024
710800e
fixes
bcherry Sep 21, 2024
d963dc5
Clean up exampeles
bcherry Sep 21, 2024
a4e441f
simplify
bcherry Sep 21, 2024
208993a
logs
bcherry Sep 21, 2024
5fae10f
simple
bcherry Sep 21, 2024
7dba8ba
readme
bcherry Sep 21, 2024
319ea58
Update README.md
bcherry Sep 21, 2024
0c5cbe8
Cleanup
bcherry Sep 22, 2024
912638c
reuse
bcherry Sep 23, 2024
30ad557
lint
bcherry Sep 23, 2024
71dde9f
format
bcherry Sep 23, 2024
00a06ca
better error handling
bcherry Sep 23, 2024
e4320f3
fmt
bcherry Sep 23, 2024
5fe1317
max length
bcherry Sep 23, 2024
924108a
add participant disconnected
bcherry Sep 23, 2024
d4ac570
docs
bcherry Sep 23, 2024
70800ef
Merge remote-tracking branch 'origin/main' into bcherry/rpc
bcherry Sep 23, 2024
0697269
Error updates
bcherry Sep 24, 2024
5506684
int
bcherry Sep 24, 2024
fe6eba6
remove stub
bcherry Sep 24, 2024
9051646
tests
bcherry Sep 24, 2024
5bc0387
tests
bcherry Sep 24, 2024
805c590
ther
bcherry Sep 24, 2024
5048cf3
other tests
bcherry Sep 24, 2024
ed3f991
update workflows
bcherry Sep 24, 2024
4297d95
reuse
bcherry Sep 24, 2024
b3bd24b
Improvement
bcherry Sep 24, 2024
d27b9ae
fmt
bcherry Sep 24, 2024
e800bf5
Fix error in example
bcherry Sep 24, 2024
b186125
Cleanup
bcherry Sep 24, 2024
bff53e5
fix
bcherry Sep 24, 2024
7e8e23b
Format
bcherry Sep 24, 2024
abb1c6a
Comments
bcherry Sep 24, 2024
a872a41
rm
bcherry Sep 24, 2024
597e889
fix
bcherry Sep 24, 2024
0d9da4b
More errors
bcherry Sep 24, 2024
1a2fe78
things
bcherry Sep 24, 2024
23fd42f
simplify
bcherry Sep 24, 2024
2d6c3e4
Merge branch 'main' into bcherry/rpc
bcherry Oct 2, 2024
b961a1a
Merge remote-tracking branch 'origin/main' into bcherry/rpc
bcherry Oct 4, 2024
085e130
proto
bcherry Oct 4, 2024
ba4e8f3
almost
bcherry Oct 4, 2024
a99a6eb
tests
bcherry Oct 4, 2024
564ba85
rust
bcherry Oct 4, 2024
506708a
proto
bcherry Oct 4, 2024
0dd7e2d
fixes
bcherry Oct 4, 2024
9661339
Fixing example code
bcherry Oct 4, 2024
1a33b02
It works
bcherry Oct 5, 2024
2d4b7e6
amost
bcherry Oct 7, 2024
092ff98
wip
bcherry Oct 7, 2024
5248bd2
update
bcherry Oct 7, 2024
0b4affe
rst
bcherry Oct 7, 2024
3009afe
logging
bcherry Oct 7, 2024
bcb80a1
fixes
bcherry Oct 8, 2024
928808a
fixes
bcherry Oct 8, 2024
6c85ff4
it works
bcherry Oct 8, 2024
c4271b9
updates
bcherry Oct 8, 2024
246af97
cleanup
bcherry Oct 8, 2024
5a81ab9
fixes
bcherry Oct 8, 2024
ddd92cc
Merge remote-tracking branch 'origin/main' into bcherry/rpc
bcherry Oct 8, 2024
c023906
rust
bcherry Oct 8, 2024
cea2fcf
readme
bcherry Oct 8, 2024
7532f01
remove tests
bcherry Oct 8, 2024
f8bd869
fmt
bcherry Oct 8, 2024
ea7dec3
lint
bcherry Oct 8, 2024
b0c4879
opt
bcherry Oct 8, 2024
bebe539
number
bcherry Oct 8, 2024
d9d8a96
sender->caller
bcherry Oct 8, 2024
9d98f01
performRpcRequest->performRpc
bcherry Oct 8, 2024
b5102e1
fmt
bcherry Oct 8, 2024
453c6da
Updated lint
bcherry Oct 8, 2024
b506416
timeoutMs
bcherry Oct 8, 2024
f10ac1a
r
bcherry Oct 8, 2024
9ddfeb5
Changes
bcherry Oct 8, 2024
1439802
Merge branch 'main' into bcherry/rpc
bcherry Oct 8, 2024
71f935c
fix
bcherry Oct 8, 2024
3ab2ddf
dv
bcherry Oct 8, 2024
d2e329d
lint
bcherry Oct 8, 2024
d8d69be
typo
bcherry Oct 10, 2024
3193e31
rust
bcherry Oct 11, 2024
6834b0e
Merge remote-tracking branch 'origin/main' into bcherry/rpc
bcherry Oct 11, 2024
aa9c8cf
fix
bcherry Oct 11, 2024
8f065a2
use identity
bcherry Oct 15, 2024
902f3d7
Merge remote-tracking branch 'origin/main' into bcherry/rpc
bcherry Oct 15, 2024
b39f61a
Merge remote-tracking branch 'origin/main' into bcherry/rpc
bcherry Oct 17, 2024
85bda31
r
bcherry Oct 18, 2024
a21904f
p
bcherry Oct 18, 2024
386b7a2
r
bcherry Oct 18, 2024
fa6266c
Merge remote-tracking branch 'origin/main' into bcherry/rpc
bcherry Oct 21, 2024
8b4c1e8
p
bcherry Oct 21, 2024
b9573c4
fmt
bcherry Oct 21, 2024
fe07b80
fixes
bcherry Oct 21, 2024
aea624c
p
bcherry Oct 21, 2024
052a4a5
r
bcherry Oct 22, 2024
aad63ef
pb
bcherry Oct 22, 2024
ea3af0e
Update comment in livekit-rtc/generate_proto.sh
bcherry Oct 22, 2024
f062c4e
nl
bcherry Oct 22, 2024
c3e401c
Merge branch 'bcherry/protoc-2' into bcherry/rpc
bcherry Oct 22, 2024
1ee7857
fixes
bcherry Oct 22, 2024
7c2a356
wip
lukasIO Oct 23, 2024
c26fb11
fix remaining issues
lukasIO Oct 23, 2024
e4a202a
Merge branch 'lukas/strict-fixes' into bcherry/rpc
bcherry Oct 23, 2024
2778e79
restore
bcherry Oct 23, 2024
522a7d7
fix
bcherry Oct 23, 2024
30bc129
fmt
bcherry Oct 23, 2024
912c065
remove callbacks
bcherry Oct 23, 2024
4e23899
more
bcherry Oct 23, 2024
f7d2ada
proto
bcherry Oct 23, 2024
d63c079
2
bcherry Oct 23, 2024
020a3d3
e
bcherry Oct 23, 2024
0eebf8a
Merge remote-tracking branch 'origin/main' into bcherry/rpc
bcherry Oct 24, 2024
70c1f2b
c
bcherry Oct 24, 2024
b196668
clean
bcherry Oct 24, 2024
90a6688
unneeded
bcherry Oct 24, 2024
3907a89
r
bcherry Oct 24, 2024
d64e40c
fmt
bcherry Oct 24, 2024
f1f1648
fix
bcherry Oct 24, 2024
bd305d2
fmt
bcherry Oct 24, 2024
1afbd44
lint
bcherry Oct 24, 2024
f6d6dfa
fmt
bcherry Oct 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/itchy-cheetahs-taste.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"example-rpc": minor
"@livekit/rtc-node": minor
---

Native RPC support
6 changes: 6 additions & 0 deletions examples/rpc/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# 1. Copy this file and rename it to .env
# 2. Update the enviroment variables below.

LIVEKIT_API_KEY=mykey
LIVEKIT_API_SECRET=mysecret
LIVEKIT_URL=wss://myproject.livekit.cloud
39 changes: 39 additions & 0 deletions examples/rpc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# RPC Example

This example demonstrates how to use RPC between two participants with LiveKit.

The example includes two scenarios:
1. A simple greeting exchange.
2. A contrived function-calling service with JSON data payloads and multiple method types.

## Prerequisites

Before running this example, make sure you have:

1. Node.js installed on your machine.
2. A LiveKit server running (either locally or remotely).
3. LiveKit API key and secret.

## Setup

1. Install dependencies:
```
pnpm install
```

2. Create a `.env.local` file in the example directory with your LiveKit credentials:
```
LIVEKIT_API_KEY=your_api_key
LIVEKIT_API_SECRET=your_api_secret
LIVEKIT_URL=your_livekit_url
```

## Running the Example

To run the example, use the following command:

```
pnpm run start
```

The example will log to your terminal.
176 changes: 176 additions & 0 deletions examples/rpc/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
import { Room, RpcError } from '@livekit/rtc-node';
import type { RemoteParticipant } from '@livekit/rtc-node';
import { randomBytes } from 'crypto';
import { config } from 'dotenv';
import { AccessToken } from 'livekit-server-sdk';

config({ path: '.env.local', override: false });
const LIVEKIT_API_KEY = process.env.LIVEKIT_API_KEY;
const LIVEKIT_API_SECRET = process.env.LIVEKIT_API_SECRET;
const LIVEKIT_URL = process.env.LIVEKIT_URL;
if (!LIVEKIT_API_KEY || !LIVEKIT_API_SECRET || !LIVEKIT_URL) {
throw new Error('Missing required environment variables. Please check your .env.local file.');
}

async function main() {
const roomName = `rpc-test-${randomBytes(4).toString('hex')}`;

console.log(`Connecting participants to room: ${roomName}`);

const [requestersRoom, greetersRoom, mathGeniusRoom] = await Promise.all([
connectParticipant('requester', roomName),
connectParticipant('greeter', roomName),
connectParticipant('math-genius', roomName),
]);

// Register all methods for the receiving participant
await registerReceiverMethods(greetersRoom, mathGeniusRoom);

try {
console.log('\n\nRunning greeting example...');
await Promise.all([performGreeting(requestersRoom)]);
} catch (error) {
console.error('Error:', error);
}

try {
console.log('\n\nRunning math example...');
await Promise.all([
performSquareRoot(requestersRoom)
.then(() => new Promise<void>((resolve) => setTimeout(resolve, 2000)))
.then(() => performQuantumHypergeometricSeries(requestersRoom)),
]);
} catch (error) {
console.error('Error:', error);
}

console.log('\n\nParticipants done, disconnecting...');
await requestersRoom.disconnect();
await greetersRoom.disconnect();
await mathGeniusRoom.disconnect();

console.log('Participants disconnected. Example completed.');

process.exit(0);
}

const registerReceiverMethods = async (greetersRoom: Room, mathGeniusRoom: Room): Promise<void> => {
await greetersRoom.localParticipant?.registerRpcMethod(
'arrival',
// eslint-disable-next-line @typescript-eslint/no-unused-vars
async (requestId: string, sender: RemoteParticipant, payload: string, responseTimeoutMs: number) => {
console.log(`[Greeter] Oh ${sender.identity} arrived and said "${payload}"`);
await new Promise((resolve) => setTimeout(resolve, 2000));
return "Welcome and have a wonderful day!";
},
bcherry marked this conversation as resolved.
Show resolved Hide resolved
);

await mathGeniusRoom.localParticipant?.registerRpcMethod(
'square-root',
async (requestId: string, sender: RemoteParticipant, payload: string, responseTimeoutMs: number) => {
const jsonData = JSON.parse(payload);
const number = jsonData.number;
console.log(
`[Math Genius] I guess ${sender.identity} wants the square root of ${number}. I've only got ${responseTimeoutMs / 1000} seconds to respond but I think I can pull it off.`,
);

console.log(`[Math Genius] *doing math*…`);
await new Promise((resolve) => setTimeout(resolve, 2000));

const result = Math.sqrt(number);
console.log(`[Math Genius] Aha! It's ${result}`);
return JSON.stringify({ result });
},
);
};

const performGreeting = async (room: Room): Promise<void> => {
console.log('[Requester] Letting the greeter know that I\'ve arrived');
try {
const response = await room.localParticipant!.performRpcRequest('greeter', 'arrival', 'Hello');
console.log(`[Requester] That's nice, the greeter said: "${response}"`);
} catch (error) {
console.error('[Requester] RPC call failed:', error);
throw error;
}
bcherry marked this conversation as resolved.
Show resolved Hide resolved
};

const performSquareRoot = async (room: Room): Promise<void> => {
console.log("[Requester] What's the square root of 16?");
try {
const response = await room.localParticipant!.performRpcRequest('math-genius', 'square-root', JSON.stringify({ number: 16 }));
const parsedResponse = JSON.parse(response);
console.log(`[Requester] Nice, the answer was ${parsedResponse.result}`);
} catch (error) {
console.error('[Requester] RPC call failed:', error);
throw error;
}
};

const performQuantumHypergeometricSeries = async (room: Room): Promise<void> => {
console.log("[Requester] What's the quantum hypergeometric series of 42?");
try {
const response = await room.localParticipant!.performRpcRequest(
'math-genius',
'quantum-hypergeometric-series',
JSON.stringify({ number: 42 })
);
const parsedResponse = JSON.parse(response);
console.log(`[Requester] genius says ${parsedResponse.result}!`);
} catch (error) {
if (error instanceof RpcError) {
if (error.code === RpcError.ErrorCode.UNSUPPORTED_METHOD) {
console.log(`[Requester] Aww looks like the genius doesn't know that one.`);
return;
}
}

console.error('[Requester] Unexpected error:', error);
throw error;
}
};

const createToken = (identity: string, roomName: string) => {
const token = new AccessToken(LIVEKIT_API_KEY, LIVEKIT_API_SECRET, {
identity,
});
token.addGrant({
room: roomName,
roomJoin: true,
canPublish: true,
canSubscribe: true,
});
return token.toJwt();
};

const connectParticipant = async (identity: string, roomName: string): Promise<Room> => {
const room = new Room();
const token = await createToken(identity, roomName);

room.on('disconnected', () => {
console.log(`[${identity}] Disconnected from room`);
});

await room.connect(LIVEKIT_URL, token);

await Promise.race([
new Promise<void>((resolve) => {
if (room.remoteParticipants.size > 0) {
resolve();
} else {
const onParticipantConnected = () => {
room.off('participantConnected', onParticipantConnected);
resolve();
};
room.on('participantConnected', onParticipantConnected);
}
}),
new Promise<void>((_, reject) => {
setTimeout(() => reject(new Error('Timed out waiting for participants')), 5000);
}),
]);

return room;
};

main();
23 changes: 23 additions & 0 deletions examples/rpc/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"name": "example-rpc",
"author": "LiveKit",
"private": "true",
"description": "Example of using RPC in LiveKit",
"type": "module",
"main": "index.ts",
"scripts": {
"lint": "eslint -f unix \"**/*.ts\"",
"start": "tsx index.ts"
},
"keywords": [],
"license": "Apache-2.0",
"dependencies": {
"@livekit/rtc-node": "workspace:*",
"dotenv": "^16.4.5",
"livekit-server-sdk": "workspace:*"
},
"devDependencies": {
"@types/node": "^20.10.4",
"tsx": "^4.7.1"
}
}
51 changes: 49 additions & 2 deletions packages/livekit-rtc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,57 @@ await room.localParticipant.publishTrack(track, options);
await source.captureFrame(new AudioFrame(buffer, 16000, 1, buffer.byteLength / 2));
```

### RPC

RPC you to perform your own predefined method calls from one participant to another. This feature is especially powerful when used with [Agents](https://docs.livekit.io/agents), for instance to forward LLM function calls to your client application.
bcherry marked this conversation as resolved.
Show resolved Hide resolved

#### Registering an RPC method

The participant who will receive a call must first register for the specific method:

```typescript
room.localParticipant?.registerRpcMethod(
'greet',
async (requestId: string, sender: RemoteParticipant, payload: string, responseTimeoutMs: number) => {
console.log(`Received greeting from ${sender.identity}: ${payload}`);
return `Hello, ${sender.identity}!`;
}
);
```

The request includes a `responseTimeoutMs` field, which informs you how long you have to return a response. If you are unable to respond in time, you can either send an error or let the request time out on the sender's side.

#### Performing an RPC request

The caller may then initiate a request like so:

```typescript
try {
const response = await room.localParticipant!.performRpcRequest(
'recipient-identity',
'greet',
'Hello from RPC!'
);
console.log('RPC response:', response);
} catch (error) {
console.error('RPC call failed:', error);
}
```

You may find it useful to adjust the `responseTimeoutMs` parameter, which allows you to set the amount of time you will wait for a response. We recommend keeping this value as low as possible while still satisfying the constraints of your application.

#### Errors

LiveKit is a dynamic realtime environment and calls can fail for various reasons.

You may throw errors of the type `RpcError` with a string `message` in an RPC method handler and they will be received on the caller's side with the message intact. Other errors will not be transmitted and will instead arrive to the caller as `1500` ("Application Error"). Other built-in errors are detailed in `RpcError`.

## Examples

- [`publish-wav`](https://github.com/livekit/node-sdks/tree/main/examples/publish-wav): connect to a room and publish a wave file
- [`publish-wav`](https://github.com/livekit/node-sdks/tree/main/examples/publish-wav): connect to a room and publish a .wave file
- [ `rpc`](https://github.com/livekit/node-sdks/tree/main/examples/rpc): simple back-and-forth RPC interaction
bcherry marked this conversation as resolved.
Show resolved Hide resolved


## Getting help / Contributing

Please join us on [Slack](https://livekit.io/join-slack) to get help from our devs/community. We welcome your contributions and details can be discussed there.
Please join us on [Slack](https://livekit.io/join-slack) to get help from our devs & community. We welcome your contributions and details can be discussed there.
3 changes: 2 additions & 1 deletion packages/livekit-rtc/generate_proto.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ protoc \
$FFI_PROTOCOL/track.proto \
$FFI_PROTOCOL/video_frame.proto \
$FFI_PROTOCOL/e2ee.proto \
$FFI_PROTOCOL/stats.proto
$FFI_PROTOCOL/stats.proto \
$FFI_PROTOCOL/rpc.proto
1 change: 1 addition & 0 deletions packages/livekit-rtc/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ export {
TrackPublishOptions,
ConnectionState,
} from './proto/room_pb.js';
export { RpcError } from './rpc.js';
export { EncryptionType, EncryptionState } from './proto/e2ee_pb.js';
export { StreamState, TrackKind, TrackSource } from './proto/track_pb.js';
export { VideoBufferType, VideoRotation, VideoCodec } from './proto/video_frame_pb.js';
Expand Down
13 changes: 8 additions & 5 deletions packages/livekit-rtc/src/napi/native.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@

/* auto-generated by NAPI-RS */

export function livekitInitialize(callback: (data: Uint8Array) => void, captureLogs: boolean): void;
export function livekitFfiRequest(data: Uint8Array): Uint8Array;
export function livekitRetrievePtr(handle: Uint8Array): bigint;
export function livekitCopyBuffer(ptr: bigint, len: number): Uint8Array;
export function livekitDispose(): Promise<void>;
export declare function livekitInitialize(
callback: (data: Uint8Array) => void,
captureLogs: boolean,
): void;
export declare function livekitFfiRequest(data: Uint8Array): Uint8Array;
export declare function livekitRetrievePtr(handle: Uint8Array): bigint;
export declare function livekitCopyBuffer(ptr: bigint, len: number): Uint8Array;
export declare function livekitDispose(): Promise<void>;
export declare class FfiHandle {
constructor(handle: bigint);
dispose(): void;
Expand Down
Loading