Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RPC implementation via FFI #276

Merged
merged 141 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
141 commits
Select commit Hold shift + click to select a range
d7e3a4d
draft
bcherry Sep 20, 2024
bc4d82c
wip
bcherry Sep 20, 2024
9e052e9
wip
bcherry Sep 20, 2024
d64a4b5
wip
bcherry Sep 20, 2024
a55c07e
harness
bcherry Sep 20, 2024
6a36334
working
bcherry Sep 20, 2024
d7eae1a
working
bcherry Sep 20, 2024
0083605
better
bcherry Sep 20, 2024
6aa8a61
better
bcherry Sep 20, 2024
dd659ce
better
bcherry Sep 20, 2024
bb870a2
fixes
bcherry Sep 20, 2024
e648522
Do it
bcherry Sep 20, 2024
69719dd
comments
bcherry Sep 20, 2024
ead34bb
Create itchy-cheetahs-taste.md
bcherry Sep 20, 2024
b7a36aa
switch to registers
bcherry Sep 21, 2024
c5d1e52
cleanup
bcherry Sep 21, 2024
c6d80a8
autoack
bcherry Sep 21, 2024
4c70522
Clean up
bcherry Sep 21, 2024
45a819f
Merge remote-tracking branch 'origin/bcherry/rpc' into bcherry/rpc
bcherry Sep 21, 2024
3a925ea
Merge remote-tracking branch 'origin/main' into bcherry/rpc
bcherry Sep 21, 2024
f216ac1
updates
bcherry Sep 21, 2024
710800e
fixes
bcherry Sep 21, 2024
d963dc5
Clean up exampeles
bcherry Sep 21, 2024
a4e441f
simplify
bcherry Sep 21, 2024
208993a
logs
bcherry Sep 21, 2024
5fae10f
simple
bcherry Sep 21, 2024
7dba8ba
readme
bcherry Sep 21, 2024
319ea58
Update README.md
bcherry Sep 21, 2024
0c5cbe8
Cleanup
bcherry Sep 22, 2024
912638c
reuse
bcherry Sep 23, 2024
30ad557
lint
bcherry Sep 23, 2024
71dde9f
format
bcherry Sep 23, 2024
00a06ca
better error handling
bcherry Sep 23, 2024
e4320f3
fmt
bcherry Sep 23, 2024
5fe1317
max length
bcherry Sep 23, 2024
924108a
add participant disconnected
bcherry Sep 23, 2024
d4ac570
docs
bcherry Sep 23, 2024
70800ef
Merge remote-tracking branch 'origin/main' into bcherry/rpc
bcherry Sep 23, 2024
0697269
Error updates
bcherry Sep 24, 2024
5506684
int
bcherry Sep 24, 2024
fe6eba6
remove stub
bcherry Sep 24, 2024
9051646
tests
bcherry Sep 24, 2024
5bc0387
tests
bcherry Sep 24, 2024
805c590
ther
bcherry Sep 24, 2024
5048cf3
other tests
bcherry Sep 24, 2024
ed3f991
update workflows
bcherry Sep 24, 2024
4297d95
reuse
bcherry Sep 24, 2024
b3bd24b
Improvement
bcherry Sep 24, 2024
d27b9ae
fmt
bcherry Sep 24, 2024
e800bf5
Fix error in example
bcherry Sep 24, 2024
b186125
Cleanup
bcherry Sep 24, 2024
bff53e5
fix
bcherry Sep 24, 2024
7e8e23b
Format
bcherry Sep 24, 2024
abb1c6a
Comments
bcherry Sep 24, 2024
a872a41
rm
bcherry Sep 24, 2024
597e889
fix
bcherry Sep 24, 2024
0d9da4b
More errors
bcherry Sep 24, 2024
1a2fe78
things
bcherry Sep 24, 2024
23fd42f
simplify
bcherry Sep 24, 2024
2d6c3e4
Merge branch 'main' into bcherry/rpc
bcherry Oct 2, 2024
b961a1a
Merge remote-tracking branch 'origin/main' into bcherry/rpc
bcherry Oct 4, 2024
085e130
proto
bcherry Oct 4, 2024
ba4e8f3
almost
bcherry Oct 4, 2024
a99a6eb
tests
bcherry Oct 4, 2024
564ba85
rust
bcherry Oct 4, 2024
506708a
proto
bcherry Oct 4, 2024
0dd7e2d
fixes
bcherry Oct 4, 2024
9661339
Fixing example code
bcherry Oct 4, 2024
1a33b02
It works
bcherry Oct 5, 2024
2d4b7e6
amost
bcherry Oct 7, 2024
092ff98
wip
bcherry Oct 7, 2024
5248bd2
update
bcherry Oct 7, 2024
0b4affe
rst
bcherry Oct 7, 2024
3009afe
logging
bcherry Oct 7, 2024
bcb80a1
fixes
bcherry Oct 8, 2024
928808a
fixes
bcherry Oct 8, 2024
6c85ff4
it works
bcherry Oct 8, 2024
c4271b9
updates
bcherry Oct 8, 2024
246af97
cleanup
bcherry Oct 8, 2024
5a81ab9
fixes
bcherry Oct 8, 2024
ddd92cc
Merge remote-tracking branch 'origin/main' into bcherry/rpc
bcherry Oct 8, 2024
c023906
rust
bcherry Oct 8, 2024
cea2fcf
readme
bcherry Oct 8, 2024
7532f01
remove tests
bcherry Oct 8, 2024
f8bd869
fmt
bcherry Oct 8, 2024
ea7dec3
lint
bcherry Oct 8, 2024
b0c4879
opt
bcherry Oct 8, 2024
bebe539
number
bcherry Oct 8, 2024
d9d8a96
sender->caller
bcherry Oct 8, 2024
9d98f01
performRpcRequest->performRpc
bcherry Oct 8, 2024
b5102e1
fmt
bcherry Oct 8, 2024
453c6da
Updated lint
bcherry Oct 8, 2024
b506416
timeoutMs
bcherry Oct 8, 2024
f10ac1a
r
bcherry Oct 8, 2024
9ddfeb5
Changes
bcherry Oct 8, 2024
1439802
Merge branch 'main' into bcherry/rpc
bcherry Oct 8, 2024
71f935c
fix
bcherry Oct 8, 2024
3ab2ddf
dv
bcherry Oct 8, 2024
d2e329d
lint
bcherry Oct 8, 2024
d8d69be
typo
bcherry Oct 10, 2024
3193e31
rust
bcherry Oct 11, 2024
6834b0e
Merge remote-tracking branch 'origin/main' into bcherry/rpc
bcherry Oct 11, 2024
aa9c8cf
fix
bcherry Oct 11, 2024
8f065a2
use identity
bcherry Oct 15, 2024
902f3d7
Merge remote-tracking branch 'origin/main' into bcherry/rpc
bcherry Oct 15, 2024
b39f61a
Merge remote-tracking branch 'origin/main' into bcherry/rpc
bcherry Oct 17, 2024
85bda31
r
bcherry Oct 18, 2024
a21904f
p
bcherry Oct 18, 2024
386b7a2
r
bcherry Oct 18, 2024
fa6266c
Merge remote-tracking branch 'origin/main' into bcherry/rpc
bcherry Oct 21, 2024
8b4c1e8
p
bcherry Oct 21, 2024
b9573c4
fmt
bcherry Oct 21, 2024
fe07b80
fixes
bcherry Oct 21, 2024
aea624c
p
bcherry Oct 21, 2024
052a4a5
r
bcherry Oct 22, 2024
aad63ef
pb
bcherry Oct 22, 2024
ea3af0e
Update comment in livekit-rtc/generate_proto.sh
bcherry Oct 22, 2024
f062c4e
nl
bcherry Oct 22, 2024
c3e401c
Merge branch 'bcherry/protoc-2' into bcherry/rpc
bcherry Oct 22, 2024
1ee7857
fixes
bcherry Oct 22, 2024
7c2a356
wip
lukasIO Oct 23, 2024
c26fb11
fix remaining issues
lukasIO Oct 23, 2024
e4a202a
Merge branch 'lukas/strict-fixes' into bcherry/rpc
bcherry Oct 23, 2024
2778e79
restore
bcherry Oct 23, 2024
522a7d7
fix
bcherry Oct 23, 2024
30bc129
fmt
bcherry Oct 23, 2024
912c065
remove callbacks
bcherry Oct 23, 2024
4e23899
more
bcherry Oct 23, 2024
f7d2ada
proto
bcherry Oct 23, 2024
d63c079
2
bcherry Oct 23, 2024
020a3d3
e
bcherry Oct 23, 2024
0eebf8a
Merge remote-tracking branch 'origin/main' into bcherry/rpc
bcherry Oct 24, 2024
70c1f2b
c
bcherry Oct 24, 2024
b196668
clean
bcherry Oct 24, 2024
90a6688
unneeded
bcherry Oct 24, 2024
3907a89
r
bcherry Oct 24, 2024
d64e40c
fmt
bcherry Oct 24, 2024
f1f1648
fix
bcherry Oct 24, 2024
bd305d2
fmt
bcherry Oct 24, 2024
1afbd44
lint
bcherry Oct 24, 2024
f6d6dfa
fmt
bcherry Oct 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/itchy-cheetahs-taste.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"example-rpc": minor
"@livekit/rtc-node": minor
---

Native RPC support
6 changes: 6 additions & 0 deletions examples/rpc/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# 1. Copy this file and rename it to .env
# 2. Update the enviroment variables below.

LIVEKIT_API_KEY=mykey
LIVEKIT_API_SECRET=mysecret
LIVEKIT_URL=wss://myproject.livekit.cloud
39 changes: 39 additions & 0 deletions examples/rpc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# RPC Example

This example demonstrates how to use RPC between two participants with LiveKit.

The example includes two scenarios:
1. A simple greeting exchange.
2. A contrived function-calling service with JSON data payloads and multiple method types.

## Prerequisites

Before running this example, make sure you have:

1. Node.js installed on your machine.
2. A LiveKit server running (either locally or remotely).
3. LiveKit API key and secret.

## Setup

1. Install dependencies:
```
npm install
```

2. Create a `.env.local` file in the example directory with your LiveKit credentials:
```
LIVEKIT_API_KEY=your_api_key
LIVEKIT_API_SECRET=your_api_secret
LIVEKIT_URL=your_livekit_url
```

## Running the Example

To run the example, use the following command:

```
npm run start
```

The example will log to your terminal.
172 changes: 172 additions & 0 deletions examples/rpc/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
import { RemoteParticipant, Room, RoomEvent, RpcRequest } from '@livekit/rtc-node';
import { RPC_ERROR_UNSUPPORTED_METHOD } from '@livekit/rtc-node/dist/rpc';
import { randomBytes } from 'crypto';
import { config } from 'dotenv';
import { AccessToken } from 'livekit-server-sdk';

config({ path: '.env.local', override: false });
const LIVEKIT_API_KEY = process.env.LIVEKIT_API_KEY;
const LIVEKIT_API_SECRET = process.env.LIVEKIT_API_SECRET;
const LIVEKIT_URL = process.env.LIVEKIT_URL;
if (!LIVEKIT_API_KEY || !LIVEKIT_API_SECRET || !LIVEKIT_URL) {
throw new Error('Missing required environment variables. Please check your .env.local file.');
}

async function main() {
const roomName = `rpc-test-${randomBytes(4).toString('hex')}`;

console.log(`Connecting participants to room: ${roomName}`);

const [requestersRoom, greetersRoom, mathGeniusRoom] = await Promise.all([
connectParticipant('requester', roomName),
connectParticipant('greeter', roomName),
connectParticipant('math-genius', roomName),
]);

// Register all methods for the receiving participant
await registerReceiverMethods(greetersRoom, mathGeniusRoom);

try {
console.log('\n\nRunning greeting example...');
await Promise.all([performGreeting(requestersRoom)]);
} catch (error) {
console.error('Error:', error);
}

try {
console.log('\n\nRunning math example...');
await Promise.all([
performSquareRoot(requestersRoom)
.then(() => new Promise<void>((resolve) => setTimeout(resolve, 2000)))
.then(() => performQuantumHypergeometricSeries(requestersRoom)),
]);
} catch (error) {
console.error('Error:', error);
}

console.log('\n\nParticipants done, disconnecting...');
await requestersRoom.disconnect();
await greetersRoom.disconnect();
await mathGeniusRoom.disconnect();

console.log('Participants disconnected. Example completed.');

process.exit(0);
}

const registerReceiverMethods = async (greetersRoom: Room, mathGeniusRoom: Room): Promise<void> => {
greetersRoom.localParticipant?.registerRpcMethod(
'arrival',
async (request: RpcRequest, sender: RemoteParticipant) => {
console.log(`[Greeter] Oh ${sender.identity} arrived and said "${request.payload}"`);
await new Promise((resolve) => setTimeout(resolve, 2000));
return "Welcome and have a wonderful day!";
},
bcherry marked this conversation as resolved.
Show resolved Hide resolved
);

mathGeniusRoom.localParticipant?.registerRpcMethod(
'square-root',
async (request: RpcRequest, sender: RemoteParticipant) => {
const jsonData = JSON.parse(request.payload);
const number = jsonData.number;
console.log(
`[Math Genius] I guess ${sender.identity} wants the square root of ${number}. I've only got ${request.responseTimeoutMs / 1000} seconds to respond but I think I can pull it off.`,
);

console.log(`[Math Genius] *doing math*…`);
await new Promise((resolve) => setTimeout(resolve, 2000));

const result = Math.sqrt(number);
console.log(`[Math Genius] Aha! It's ${result}`);
return JSON.stringify({ result });
},
);
};

const performGreeting = async (room: Room): Promise<void> => {
console.log('[Requester] Letting the greeter know that I\'ve arrived');
try {
const response = await room.localParticipant!.performRpcRequest('greeter', 'arrival', 'Hello');
console.log(`[Requester] That's nice, the greeter said: "${response}"`);
} catch (error) {
console.error('[Requester] RPC call failed:', error);
throw error;
}
bcherry marked this conversation as resolved.
Show resolved Hide resolved
};

const performSquareRoot = async (room: Room): Promise<void> => {
console.log("[Requester] What's the square root of 16?");
try {
const response = await room.localParticipant!.performRpcRequest('math-genius', 'square-root', JSON.stringify({ number: 16 }));
const parsedResponse = JSON.parse(response);
console.log(`[Requester] Nice, the answer was ${parsedResponse.result}`);
} catch (error) {
console.error('[Requester] RPC call failed:', error);
throw error;
}
};

const performQuantumHypergeometricSeries = async (room: Room): Promise<void> => {
console.log("[Requester] What's the quantum hypergeometric series of 42?");
try {
const response = await room.localParticipant!.performRpcRequest(
'math-genius',
'quantum-hypergeometric-series',
JSON.stringify({ number: 42 })
);
const parsedResponse = JSON.parse(response);
console.log(`[Requester] genius says ${parsedResponse.result}!`);
} catch (error) {
if (error instanceof Error && error.message === RPC_ERROR_UNSUPPORTED_METHOD) {
console.log(`[Requester] Aww looks like the genius doesn't know that one.`);
} else {
console.error('[Requester] Unexpected error:', error);
throw error;
}
}
};

const createToken = (identity: string, roomName: string) => {
const token = new AccessToken(LIVEKIT_API_KEY, LIVEKIT_API_SECRET, {
identity,
});
token.addGrant({
room: roomName,
roomJoin: true,
canPublish: true,
canSubscribe: true,
});
return token.toJwt();
};

const connectParticipant = async (identity: string, roomName: string): Promise<Room> => {
const room = new Room();
const token = await createToken(identity, roomName);

room.on('disconnected', () => {
console.log(`[${identity}] Disconnected from room`);
});

await room.connect(LIVEKIT_URL, token);

await Promise.race([
new Promise<void>((resolve) => {
if (room.remoteParticipants.size > 0) {
resolve();
} else {
const onParticipantConnected = () => {
room.off('participantConnected', onParticipantConnected);
resolve();
};
room.on('participantConnected', onParticipantConnected);
}
}),
new Promise<void>((_, reject) => {
setTimeout(() => reject(new Error('Timed out waiting for participants')), 5000);
}),
]);

return room;
};

main();
23 changes: 23 additions & 0 deletions examples/rpc/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"name": "example-rpc",
"author": "LiveKit",
"private": "true",
"description": "Example of using RPC in LiveKit",
"type": "module",
"main": "index.ts",
"scripts": {
"lint": "eslint -f unix \"**/*.ts\"",
"start": "tsx index.ts"
},
"keywords": [],
"license": "Apache-2.0",
"dependencies": {
"@livekit/rtc-node": "workspace:*",
"dotenv": "^16.4.5",
"livekit-server-sdk": "workspace:*"
},
"devDependencies": {
"@types/node": "^20.10.4",
"tsx": "^4.7.1"
}
}
46 changes: 46 additions & 0 deletions packages/livekit-rtc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,56 @@ await room.localParticipant.publishTrack(track, options);
await source.captureFrame(new AudioFrame(buffer, 16000, 1, buffer.byteLength / 2));
```

### RPC

LiveKit now supports RPC, allowing participants to call methods on other participants in the room. This feature is especially useful in combination with [Agents](https://docs.livekit.io/agents).

#### Registering an RPC method

To make a method available for remote calls, you need to register it (on the participant who will receive the call):

```typescript
room.localParticipant?.registerRpcMethod(
'greet',
async (request: RpcRequest, sender: RemoteParticipant) => {
console.log(`Received greeting from ${sender.identity}: ${request.payload}`);
return `Hello, ${sender.identity}!`;
}
);
```

#### Performing an RPC request

To call a method on a remote participant:

```typescript
try {
const response = await room.localParticipant!.performRpcRequest(
'recipient-identity',
'greet',
'Hello from RPC!'
);
console.log('RPC response:', response);
} catch (error) {
console.error('RPC call failed:', error);
}
```

#### Error Handling

LiveKit is a dynamic realtime environment and calls can fail for various reasons:

The recipient doesn't support the requested method (RPC_ERROR_UNSUPPORTED_METHOD)
The call times out waiting for an acknowledgment (RPC_ERROR_ACK_TIMEOUT)
The call times out waiting for a response (RPC_ERROR_RESPONSE_TIMEOUT)

In addition, you may throw errors in your method handler to return an error back to the caller.

## Examples

- [`publish-wav`](https://github.com/livekit/node-sdks/tree/main/examples/publish-wav): connect to a room and publish a wave file


## Getting help / Contributing

Please join us on [Slack](https://livekit.io/join-slack) to get help from our devs/community. We welcome your contributions and details can be discussed there.
1 change: 1 addition & 0 deletions packages/livekit-rtc/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ export {
TrackPublishOptions,
ConnectionState,
} from './proto/room_pb.js';
export { RpcRequest, RpcAck, RpcResponse } from './rpc.js';
export { EncryptionType, EncryptionState } from './proto/e2ee_pb.js';
export { StreamState, TrackKind, TrackSource } from './proto/track_pb.js';
export { VideoBufferType, VideoRotation } from './proto/video_frame_pb.js';
Expand Down
13 changes: 8 additions & 5 deletions packages/livekit-rtc/src/napi/native.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@

/* auto-generated by NAPI-RS */

export function livekitInitialize(callback: (data: Uint8Array) => void, captureLogs: boolean): void;
export function livekitFfiRequest(data: Uint8Array): Uint8Array;
export function livekitRetrievePtr(handle: Uint8Array): bigint;
export function livekitCopyBuffer(ptr: bigint, len: number): Uint8Array;
export function livekitDispose(): Promise<void>;
export declare function livekitInitialize(
callback: (data: Uint8Array) => void,
captureLogs: boolean,
): void;
export declare function livekitFfiRequest(data: Uint8Array): Uint8Array;
export declare function livekitRetrievePtr(handle: Uint8Array): bigint;
export declare function livekitCopyBuffer(ptr: bigint, len: number): Uint8Array;
export declare function livekitDispose(): Promise<void>;
export declare class FfiHandle {
constructor(handle: bigint);
dispose(): void;
Expand Down
Loading