diff --git a/packages/errors/src/errors/StreamReadError.test.ts b/packages/errors/src/errors/StreamReadError.test.ts index fd013fc5f..a733f5550 100644 --- a/packages/errors/src/errors/StreamReadError.test.ts +++ b/packages/errors/src/errors/StreamReadError.test.ts @@ -34,6 +34,18 @@ describe('StreamReadError', () => { expect(error.cause).toBe(mockOriginalError); }); + it('creates a StreamReadError for Kernel with the correct properties', () => { + const error = new StreamReadError( + { kernelId: 'kernel' }, + { cause: mockOriginalError }, + ); + expect(error).toBeInstanceOf(StreamReadError); + expect(error.code).toBe(ErrorCode.StreamReadError); + expect(error.message).toBe('Unexpected stream read error.'); + expect(error.data).toStrictEqual({ kernelId: 'kernel' }); + expect(error.cause).toBe(mockOriginalError); + }); + it('unmarshals a valid marshaled StreamReadError for Vat', () => { const data = { vatId: mockVatId }; const marshaledError: MarshaledOcapError = { @@ -106,7 +118,7 @@ describe('StreamReadError', () => { expect(() => StreamReadError.unmarshal(marshaledError, unmarshalErrorOptions), ).toThrow( - 'At path: data -- Expected the value to satisfy a union of `object | object`, but received: "invalid data"', + 'At path: data -- Expected the value to satisfy a union of `object | object | object`, but received: "invalid data"', ); }); @@ -127,7 +139,7 @@ describe('StreamReadError', () => { expect(() => StreamReadError.unmarshal(marshaledError, unmarshalErrorOptions), ).toThrow( - 'At path: data -- Expected the value to satisfy a union of `object | object`, but received: [object Object]', + 'At path: data -- Expected the value to satisfy a union of `object | object | object`, but received: [object Object]', ); }); }); diff --git a/packages/errors/src/errors/StreamReadError.ts b/packages/errors/src/errors/StreamReadError.ts index ec81a52b0..b43c3202a 100644 --- a/packages/errors/src/errors/StreamReadError.ts +++ b/packages/errors/src/errors/StreamReadError.ts @@ -16,7 +16,10 @@ import { } from '../constants.js'; import type { ErrorOptionsWithStack, MarshaledOcapError } from '../types.js'; -type StreamReadErrorData = { vatId: string } | { supervisorId: string }; +type StreamReadErrorData = + | { vatId: string } + | { supervisorId: string } + | { kernelId: string }; type StreamReadErrorOptions = Required & Pick; @@ -36,8 +39,21 @@ export class StreamReadError extends BaseError { ...marshaledErrorSchema, code: literal(ErrorCode.StreamReadError), data: union([ - object({ vatId: string(), supervisorId: optional(never()) }), - object({ supervisorId: string(), vatId: optional(never()) }), + object({ + vatId: string(), + supervisorId: optional(never()), + kernelId: optional(never()), + }), + object({ + supervisorId: string(), + vatId: optional(never()), + kernelId: optional(never()), + }), + object({ + kernelId: string(), + vatId: optional(never()), + supervisorId: optional(never()), + }), ]), cause: MarshaledErrorStruct, }); diff --git a/packages/extension/src/kernel-worker.ts b/packages/extension/src/kernel-worker.ts index 96a921f97..ade802455 100644 --- a/packages/extension/src/kernel-worker.ts +++ b/packages/extension/src/kernel-worker.ts @@ -1,19 +1,18 @@ import './kernel-worker-trusted-prelude.js'; +import type { NonEmptyArray } from '@metamask/utils'; import type { KernelCommand, KernelCommandReply, VatId } from '@ocap/kernel'; -import { Kernel } from '@ocap/kernel'; +import { Kernel, VatCommandMethod } from '@ocap/kernel'; import { MessagePortDuplexStream, receiveMessagePort } from '@ocap/streams'; import { makeSQLKVStore } from './sqlite-kv-store.js'; import { ExtensionVatWorkerClient } from './VatWorkerClient.js'; -main('v0').catch(console.error); +main().catch(console.error); /** * The main function for the kernel worker. - * - * @param defaultVatId - The id to give the default vat. */ -async function main(defaultVatId: VatId): Promise { +async function main(): Promise { const kernelStream = await receiveMessagePort( (listener) => globalThis.addEventListener('message', listener), (listener) => globalThis.removeEventListener('message', listener), @@ -27,11 +26,55 @@ async function main(defaultVatId: VatId): Promise { ); // Initialize kernel store. - const kvStore = await makeSQLKVStore(); // Create and start kernel. - const kernel = new Kernel(kernelStream, vatWorkerClient, kvStore); - await kernel.init({ defaultVatId }); + await kernel.init(); + + // Handle the lifecycle of multiple vats. + await runVatLifecycle(kernel, ['v1', 'v2', 'v3']); + + // Add default vat. + await kernel.launchVat({ id: 'v0' }); +} + +/** + * Runs the full lifecycle of an array of vats, including their creation, + * restart, message passing, and termination. + * + * @param kernel The kernel instance. + * @param vats An array of VatIds to be managed. + */ +async function runVatLifecycle( + kernel: Kernel, + vats: NonEmptyArray, +): Promise { + console.time(`Created vats: ${vats.join(', ')}`); + await Promise.all(vats.map(async (id) => kernel.launchVat({ id }))); + console.timeEnd(`Created vats: ${vats.join(', ')}`); + + console.log('Kernel vats:', kernel.getVatIds().join(', ')); + + // Restart a randomly selected vat from the array. + const vatToRestart = vats[Math.floor(Math.random() * vats.length)] as VatId; + console.time(`Vat "${vatToRestart}" restart`); + await kernel.restartVat(vatToRestart); + console.timeEnd(`Vat "${vatToRestart}" restart`); + + // Send a "Ping" message to a randomly selected vat. + const vatToPing = vats[Math.floor(Math.random() * vats.length)] as VatId; + console.time(`Ping Vat "${vatToPing}"`); + await kernel.sendMessage(vatToPing, { + method: VatCommandMethod.Ping, + params: null, + }); + console.timeEnd(`Ping Vat "${vatToPing}"`); + + const vatIds = kernel.getVatIds().join(', '); + console.time(`Terminated vats: ${vatIds}`); + await kernel.terminateAllVats(); + console.timeEnd(`Terminated vats: ${vatIds}`); + + console.log(`Kernel has ${kernel.getVatIds().length} vats`); } diff --git a/packages/kernel/src/Kernel.test.ts b/packages/kernel/src/Kernel.test.ts index 0ae90b0bc..7ade18203 100644 --- a/packages/kernel/src/Kernel.test.ts +++ b/packages/kernel/src/Kernel.test.ts @@ -40,6 +40,7 @@ describe('Kernel', () => { mockWorkerService = { launch: async () => ({}), terminate: async () => undefined, + terminateAll: async () => undefined, } as unknown as VatWorkerService; launchWorkerMock = vi @@ -88,6 +89,15 @@ describe('Kernel', () => { expect(kernel.getVatIds()).toStrictEqual(['v0']); }); + it('adds multiple vats to the kernel without errors when no vat with the same ID exists', async () => { + const kernel = new Kernel(mockStream, mockWorkerService, mockKVStore); + await kernel.launchVat({ id: 'v0' }); + await kernel.launchVat({ id: 'v1' }); + expect(initMock).toHaveBeenCalledTimes(2); + expect(launchWorkerMock).toHaveBeenCalledTimes(2); + expect(kernel.getVatIds()).toStrictEqual(['v0', 'v1']); + }); + it('throws an error when launching a vat that already exists in the kernel', async () => { const kernel = new Kernel(mockStream, mockWorkerService, mockKVStore); await kernel.launchVat({ id: 'v0' }); @@ -101,12 +111,12 @@ describe('Kernel', () => { }); }); - describe('deleteVat()', () => { + describe('terminateVat()', () => { it('deletes a vat from the kernel without errors when the vat exists', async () => { const kernel = new Kernel(mockStream, mockWorkerService, mockKVStore); await kernel.launchVat({ id: 'v0' }); expect(kernel.getVatIds()).toStrictEqual(['v0']); - await kernel.deleteVat('v0'); + await kernel.terminateVat('v0'); expect(terminateMock).toHaveBeenCalledOnce(); expect(terminateWorkerMock).toHaveBeenCalledOnce(); expect(kernel.getVatIds()).toStrictEqual([]); @@ -116,7 +126,7 @@ describe('Kernel', () => { const kernel = new Kernel(mockStream, mockWorkerService, mockKVStore); const nonExistentVatId: VatId = 'v9'; await expect(async () => - kernel.deleteVat(nonExistentVatId), + kernel.terminateVat(nonExistentVatId), ).rejects.toThrow(VatNotFoundError); expect(terminateMock).not.toHaveBeenCalled(); }); @@ -125,12 +135,41 @@ describe('Kernel', () => { const kernel = new Kernel(mockStream, mockWorkerService, mockKVStore); await kernel.launchVat({ id: 'v0' }); vi.spyOn(Vat.prototype, 'terminate').mockRejectedValueOnce('Test error'); - await expect(async () => kernel.deleteVat('v0')).rejects.toThrow( + await expect(async () => kernel.terminateVat('v0')).rejects.toThrow( 'Test error', ); }); }); + describe('terminateAllVats()', () => { + it('deletes all vats from the kernel without errors', async () => { + const workerTerminateAllMock = vi + .spyOn(mockWorkerService, 'terminateAll') + .mockResolvedValue(undefined); + const kernel = new Kernel(mockStream, mockWorkerService, mockKVStore); + await kernel.launchVat({ id: 'v0' }); + await kernel.launchVat({ id: 'v1' }); + expect(kernel.getVatIds()).toStrictEqual(['v0', 'v1']); + await kernel.terminateAllVats(); + expect(terminateMock).toHaveBeenCalledTimes(2); + expect(workerTerminateAllMock).toHaveBeenCalledOnce(); + expect(kernel.getVatIds()).toStrictEqual([]); + }); + }); + + describe('restartVat()', () => { + it('restarts a vat', async () => { + const kernel = new Kernel(mockStream, mockWorkerService, mockKVStore); + await kernel.launchVat({ id: 'v0' }); + expect(kernel.getVatIds()).toStrictEqual(['v0']); + await kernel.restartVat('v0'); + expect(terminateMock).toHaveBeenCalledOnce(); + expect(terminateWorkerMock).toHaveBeenCalledOnce(); + expect(kernel.getVatIds()).toStrictEqual(['v0']); + expect(initMock).toHaveBeenCalledTimes(2); + }); + }); + describe('sendMessage()', () => { it('sends a message to the vat without errors when the vat exists', async () => { const kernel = new Kernel(mockStream, mockWorkerService, mockKVStore); @@ -174,5 +213,7 @@ describe('Kernel', () => { it.todo('initializes the kernel store'); it.todo('starts receiving messages'); + + it.todo('throws if the stream throws'); }); }); diff --git a/packages/kernel/src/Kernel.ts b/packages/kernel/src/Kernel.ts index 784bd53ba..8e6d7f31e 100644 --- a/packages/kernel/src/Kernel.ts +++ b/packages/kernel/src/Kernel.ts @@ -1,7 +1,10 @@ import '@ocap/shims/endoify'; -import type { PromiseKit } from '@endo/promise-kit'; -import { makePromiseKit } from '@endo/promise-kit'; -import { VatAlreadyExistsError, VatNotFoundError, toError } from '@ocap/errors'; +import { + StreamReadError, + VatAlreadyExistsError, + VatNotFoundError, + toError, +} from '@ocap/errors'; import type { DuplexStream } from '@ocap/streams'; import type { Logger } from '@ocap/utils'; import { makeLogger, stringify } from '@ocap/utils'; @@ -29,9 +32,6 @@ export class Kernel { readonly #storage: KVStore; - // Hopefully removed when we get to n+1 vats. - readonly #defaultVatKit: PromiseKit; - readonly #logger: Logger; constructor( @@ -44,16 +44,19 @@ export class Kernel { this.#vats = new Map(); this.#vatWorkerService = vatWorkerService; this.#storage = storage; - this.#defaultVatKit = makePromiseKit(); this.#logger = logger ?? makeLogger('[ocap kernel]'); } - async init({ defaultVatId }: { defaultVatId: VatId }): Promise { - await this.launchVat({ id: defaultVatId }) - .then(this.#defaultVatKit.resolve) - .catch(this.#defaultVatKit.reject); - - return this.#receiveMessages(); + async init(): Promise { + this.#receiveMessages().catch((error) => { + this.#logger.error('Stream read error occurred:', error); + // Errors thrown here will not be surfaced in the usual synchronous manner + // because #receiveMessages() is awaited within the constructor. + // Any error thrown inside the async loop is 'caught' within this constructor + // call stack but will be displayed as 'Uncaught (in promise)' + // since they occur after the constructor has returned. + throw new StreamReadError({ kernelId: 'kernel' }, error); + }); } async #receiveMessages(): Promise { @@ -72,14 +75,20 @@ export class Kernel { await this.#reply({ method, params: 'pong' }); break; case KernelCommandMethod.Evaluate: - vat = await this.#defaultVatKit.promise; + if (!this.#vats.size) { + throw new Error('No vats available to call'); + } + vat = this.#vats.values().next().value as Vat; await this.#reply({ method, params: await this.evaluate(vat.id, params), }); break; case KernelCommandMethod.CapTpCall: - vat = await this.#defaultVatKit.promise; + if (!this.#vats.size) { + throw new Error('No vats available to call'); + } + vat = this.#vats.values().next().value as Vat; await this.#reply({ method, params: stringify(await vat.callCapTp(params)), @@ -155,7 +164,7 @@ export class Kernel { } /** - * Gets the vat IDs in the kernel. + * Gets the vat IDs. * * @returns An array of vat IDs. */ @@ -164,7 +173,7 @@ export class Kernel { } /** - * Launches a vat in the kernel. + * Launches a vat. * * @param options - The options for launching the vat. * @param options.id - The ID of the vat. @@ -182,17 +191,41 @@ export class Kernel { } /** - * Deletes a vat from the kernel. + * Restarts a vat. * * @param id - The ID of the vat. */ - async deleteVat(id: VatId): Promise { + async restartVat(id: VatId): Promise { + await this.terminateVat(id); + await this.launchVat({ id }); + } + + /** + * Terminate a vat. + * + * @param id - The ID of the vat. + */ + async terminateVat(id: VatId): Promise { const vat = this.#getVat(id); await vat.terminate(); await this.#vatWorkerService.terminate(id).catch(console.error); this.#vats.delete(id); } + /** + * Terminate all vats. + */ + async terminateAllVats(): Promise { + await Promise.all( + this.getVatIds().map(async (id) => { + const vat = this.#getVat(id); + await vat.terminate(); + this.#vats.delete(id); + }), + ); + await this.#vatWorkerService.terminateAll(); + } + /** * Send a message to a vat. * @@ -209,7 +242,7 @@ export class Kernel { } /** - * Gets a vat from the kernel. + * Gets a vat. * * @param id - The ID of the vat. * @returns The vat.