Skip to content
242 changes: 235 additions & 7 deletions packages/kernel-browser-runtime/src/PlatformServicesClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,19 +108,72 @@ describe('PlatformServicesClient', () => {
await expect(resultP).rejects.toThrow('foo');
});

it('calls logger.debug when receiving an unexpected reply', async () => {
const debugSpy = vi.spyOn(clientLogger, 'debug');
const unexpectedReply = makeNullReply('m9');
it('calls logger.error when receiving response with non-string id', async () => {
const errorSpy = vi.spyOn(clientLogger, 'error');
const unexpectedReply = new MessageEvent('message', {
data: {
id: 123,
result: null,
jsonrpc: '2.0',
},
});

await stream.receiveInput(unexpectedReply);
await delay(10);

expect(debugSpy).toHaveBeenCalledOnce();
expect(debugSpy).toHaveBeenLastCalledWith(
'Received response with unexpected id "m9".',
expect(errorSpy).toHaveBeenCalledOnce();
expect(errorSpy).toHaveBeenLastCalledWith(
'Received response with unexpected id:',
expect.any(String),
);
});

it('calls logger.error when receiving message with unexpected port', async () => {
const errorSpy = vi.spyOn(clientLogger, 'error');
const unexpectedPortReply = makeMessageEvent(
'm9',
{ result: null },
new MessageChannel().port1,
);

await stream.receiveInput(unexpectedPortReply);
await delay(10);

expect(errorSpy).toHaveBeenCalledOnce();
expect(errorSpy).toHaveBeenLastCalledWith(
'Received message with unexpected port:',
expect.any(String),
);
});

it('handles request with unknown method by sending error response', async () => {
const outputs: MessageEventWithPayload[] = [];
const newStream = await TestDuplexStream.make((message) => {
outputs.push(message as unknown as MessageEventWithPayload);
});
// eslint-disable-next-line no-new -- test setup
new PlatformServicesClient(
newStream as unknown as PlatformServicesClientStream,
);

await newStream.receiveInput(
new MessageEvent('message', {
data: {
id: 'm1',
jsonrpc: '2.0',
method: 'unknownMethod',
params: {},
},
}),
);
await delay(10);

const errorResponse = outputs.find(
(message) => message.payload?.id === 'm1' && 'error' in message.payload,
);
expect(errorResponse).toBeDefined();
});

describe('launch', () => {
it('resolves with a duplex stream when receiving a launch reply', async () => {
const vatId: VatId = 'v0';
Expand Down Expand Up @@ -191,7 +244,50 @@ describe('PlatformServicesClient', () => {
const remoteHandler = vi.fn(async () => 'response');
const result = client.initializeRemoteComms(
'0xabcd',
['/dns4/relay.example/tcp/443/wss/p2p/relayPeer'],
{ relays: ['/dns4/relay.example/tcp/443/wss/p2p/relayPeer'] },
remoteHandler,
);
await stream.receiveInput(makeNullReply('m1'));
expect(await result).toBeUndefined();
});

it('sends initializeRemoteComms with all options and resolves', async () => {
const remoteHandler = vi.fn(async () => 'response');
const result = client.initializeRemoteComms(
'0xabcd',
{
relays: ['/dns4/relay.example/tcp/443/wss/p2p/relayPeer'],
maxRetryAttempts: 5,
maxQueue: 100,
},
remoteHandler,
);
await stream.receiveInput(makeNullReply('m1'));
expect(await result).toBeUndefined();
});

it('sends initializeRemoteComms with onRemoteGiveUp callback', async () => {
const remoteHandler = vi.fn(async () => 'response');
const giveUpHandler = vi.fn();
const result = client.initializeRemoteComms(
'0xabcd',
{ relays: ['/dns4/relay.example/tcp/443/wss/p2p/relayPeer'] },
remoteHandler,
giveUpHandler,
);
await stream.receiveInput(makeNullReply('m1'));
expect(await result).toBeUndefined();
});

it('filters undefined values from options', async () => {
const remoteHandler = vi.fn(async () => 'response');
const result = client.initializeRemoteComms(
'0xabcd',
{
relays: ['/dns4/relay.example/tcp/443/wss/p2p/relayPeer'],
maxRetryAttempts: undefined,
maxQueue: undefined,
},
remoteHandler,
);
await stream.receiveInput(makeNullReply('m1'));
Expand Down Expand Up @@ -283,6 +379,138 @@ describe('PlatformServicesClient', () => {
);
expect(errorResponse).toBeDefined();
});

it('calls handler and returns response when handler is set', async () => {
const outputs: MessageEventWithPayload[] = [];
const testStream = await TestDuplexStream.make((message) => {
outputs.push(message as unknown as MessageEventWithPayload);
});
const testClient = new PlatformServicesClient(
testStream as unknown as PlatformServicesClientStream,
clientLogger,
);
// Wait for client to be ready
await delay(10);

const remoteHandler = vi.fn(async () => 'response-message');
const initP = testClient.initializeRemoteComms(
'0xabcd',
{},
remoteHandler,
);
await testStream.receiveInput(makeNullReply('m1'));
await initP;

await testStream.receiveInput(
new MessageEvent('message', {
data: {
id: 'm2',
jsonrpc: '2.0',
method: 'remoteDeliver',
params: {
from: 'peer-123',
message: 'test-message',
},
},
}),
);
await delay(50);

expect(remoteHandler).toHaveBeenCalledOnce();
expect(remoteHandler).toHaveBeenCalledWith(
'peer-123',
'test-message',
);

const successResponse = outputs.find(
(message) =>
message.payload?.id === 'm2' &&
'result' in message.payload &&
message.payload.result === 'response-message',
);
expect(successResponse).toBeDefined();
});
});

describe('remoteGiveUp', () => {
it('calls handler when handler is set', async () => {
const outputs: MessageEventWithPayload[] = [];
const testStream = await TestDuplexStream.make((message) => {
outputs.push(message as unknown as MessageEventWithPayload);
});
const testClient = new PlatformServicesClient(
testStream as unknown as PlatformServicesClientStream,
clientLogger,
);
// Wait for client to be ready
await delay(10);

const remoteHandler = vi.fn(async () => 'response');
const giveUpHandler = vi.fn();
const initP = testClient.initializeRemoteComms(
'0xabcd',
{},
remoteHandler,
giveUpHandler,
);
await testStream.receiveInput(makeNullReply('m1'));
await initP;

await testStream.receiveInput(
new MessageEvent('message', {
data: {
id: 'm2',
jsonrpc: '2.0',
method: 'remoteGiveUp',
params: { peerId: 'peer-456' },
},
}),
);
await delay(50);

expect(giveUpHandler).toHaveBeenCalledOnce();
expect(giveUpHandler).toHaveBeenCalledWith('peer-456');

const successResponse = outputs.find(
(message) =>
message.payload?.id === 'm2' &&
'result' in message.payload &&
message.payload.result === null,
);
expect(successResponse).toBeDefined();
});

it('does not throw when handler is not set', async () => {
const outputs: MessageEventWithPayload[] = [];
const newStream = await TestDuplexStream.make((message) => {
outputs.push(message as unknown as MessageEventWithPayload);
});
// eslint-disable-next-line no-new -- test setup
new PlatformServicesClient(
newStream as unknown as PlatformServicesClientStream,
);

await newStream.receiveInput(
new MessageEvent('message', {
data: {
id: 'm1',
jsonrpc: '2.0',
method: 'remoteGiveUp',
params: { peerId: 'peer-789' },
},
}),
);
await delay(10);

// Should have sent success response with null result
const successResponse = outputs.find(
(message) =>
message.payload?.id === 'm1' &&
'result' in message.payload &&
message.payload.result === null,
);
expect(successResponse).toBeDefined();
});
});
});
});
Expand Down
31 changes: 28 additions & 3 deletions packages/kernel-browser-runtime/src/PlatformServicesClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type {
RemoteMessageHandler,
VatId,
VatConfig,
RemoteCommsOptions,
} from '@metamask/ocap-kernel';
import {
platformServicesMethodSpecs,
Expand Down Expand Up @@ -47,6 +48,8 @@ export class PlatformServicesClient implements PlatformServices {

#remoteMessageHandler: RemoteMessageHandler | undefined = undefined;

#remoteGiveUpHandler: ((peerId: string) => void) | undefined = undefined;

/**
* **ATTN:** Prefer {@link PlatformServicesClient.make} over constructing
* this class directly.
Expand Down Expand Up @@ -80,6 +83,7 @@ export class PlatformServicesClient implements PlatformServices {
);
this.#rpcServer = new RpcService(kernelRemoteHandlers, {
remoteDeliver: this.#remoteDeliver.bind(this),
remoteGiveUp: this.#remoteGiveUp.bind(this),
});

// Start draining messages immediately after construction
Expand Down Expand Up @@ -171,21 +175,29 @@ export class PlatformServicesClient implements PlatformServices {
* Initialize network communications.
*
* @param keySeed - The seed for generating this kernel's secret key.
* @param knownRelays - Array of the peerIDs of relay nodes that can be used to listen for incoming
* @param options - Options for remote communications initialization.
* @param options.relays - Array of the peerIDs of relay nodes that can be used to listen for incoming
* connections from other kernels.
* @param options.maxRetryAttempts - Maximum number of reconnection attempts. 0 = infinite (default).
* @param options.maxQueue - Maximum number of messages to queue per peer while reconnecting (default: 200).
* @param remoteMessageHandler - A handler function to receive remote messages.
* @param onRemoteGiveUp - Optional callback to be called when we give up on a remote.
* @returns A promise that resolves once network access has been established
* or rejects if there is some problem doing so.
*/
async initializeRemoteComms(
keySeed: string,
knownRelays: string[],
options: RemoteCommsOptions,
remoteMessageHandler: (from: string, message: string) => Promise<string>,
onRemoteGiveUp?: (peerId: string) => void,
): Promise<void> {
this.#remoteMessageHandler = remoteMessageHandler;
this.#remoteGiveUpHandler = onRemoteGiveUp;
await this.#rpcClient.call('initializeRemoteComms', {
keySeed,
knownRelays,
...Object.fromEntries(
Object.entries(options).filter(([, value]) => value !== undefined),
),
});
}

Expand Down Expand Up @@ -252,6 +264,19 @@ export class PlatformServicesClient implements PlatformServices {
throw Error(`remote message handler not set`);
}

/**
* Handle a remote give up notification from the server.
*
* @param peerId - The peer ID of the remote we're giving up on.
* @returns A promise that resolves when handling is complete.
*/
async #remoteGiveUp(peerId: string): Promise<null> {
if (this.#remoteGiveUpHandler) {
this.#remoteGiveUpHandler(peerId);
}
return null;
}

/**
* Send a message to the server.
*
Expand Down
Loading
Loading