diff --git a/sdks/actors/client/src/client.ts b/sdks/actors/client/src/client.ts index b1e4266016..3f96b59c64 100644 --- a/sdks/actors/client/src/client.ts +++ b/sdks/actors/client/src/client.ts @@ -7,6 +7,7 @@ import type { import type { CreateRequest } from "../../manager-protocol/src/query.ts"; import { ActorHandleRaw } from "./handle.ts"; import { logger } from "./log.ts"; +import * as errors from "./errors.ts"; export interface GetOpts { parameters?: unknown; @@ -189,22 +190,25 @@ export class Client { path: string, body?: Request, ): Promise { - const managerEndpoint = await this.#managerEndpointPromise; - const res = await fetch(`${managerEndpoint}${path}`, { - method, - headers: { - "Content-Type": "application/json", - }, - body: body ? JSON.stringify(body) : undefined, - }); + try { + const managerEndpoint = await this.#managerEndpointPromise; + const res = await fetch(`${managerEndpoint}${path}`, { + method, + headers: { + "Content-Type": "application/json", + }, + body: body ? JSON.stringify(body) : undefined, + }); - if (!res.ok) { - throw new Error( - `Manager error (${res.statusText}):\n${await res.text()}`, - ); - } + if (!res.ok) { + throw new errors.ManagerError(`${res.statusText}: ${await res.text()}`); + } + + return res.json(); + } catch (error) { + throw new errors.ManagerError(String(error), { cause: error }); - return res.json(); + } } async #fetchRegion(): Promise { @@ -226,9 +230,12 @@ export class Client { const res = await fetch(url.toString()); if (!res.ok) { - throw new Error( - `Failed to fetch region (${res.statusText}):\n${await res.text()}`, - ); + // Add safe fallback in case we can't fetch the region + logger().error("failed to fetch region, defaulting to manager region", { + status: res.statusText, + body: await res.text(), + }); + return undefined; } const { region }: { region: Region } = await res.json(); diff --git a/sdks/actors/client/src/errors.ts b/sdks/actors/client/src/errors.ts new file mode 100644 index 0000000000..ab36377852 --- /dev/null +++ b/sdks/actors/client/src/errors.ts @@ -0,0 +1,32 @@ +import { MAX_CONN_PARAMS_SIZE } from "../../common/src/network.ts"; + +export class ActorClientError extends Error { + +} + +export class InternalError extends ActorClientError { +} + +export class ManagerError extends ActorClientError { + constructor(error: string, opts?: ErrorOptions) { + super(`Manager error: ${error}`, opts); + } +} + +export class ConnectionParametersTooLong extends ActorClientError { + constructor() { + super(`Connection parameters must be less than ${MAX_CONN_PARAMS_SIZE} bytes`); + } +} + +export class MalformedResponseMessage extends ActorClientError { + constructor(cause?: unknown) { + super(`Malformed response message: ${cause}`, { cause }); + } +} + +export class RpcError extends ActorClientError { + constructor(public readonly code: string, message: string, public readonly metadata?: unknown) { + super(message); + } +} diff --git a/sdks/actors/client/src/handle.ts b/sdks/actors/client/src/handle.ts index d0606561c8..e10e432a05 100644 --- a/sdks/actors/client/src/handle.ts +++ b/sdks/actors/client/src/handle.ts @@ -4,6 +4,7 @@ import { assertUnreachable } from "../../common/src/utils.ts"; import * as wsToClient from "../../protocol/src/ws/to_client.ts"; import type * as wsToServer from "../../protocol/src/ws/to_server.ts"; import { logger } from "./log.ts"; +import * as errors from "./errors.ts"; interface RpcInFlight { resolve: (response: wsToClient.RpcResponseOk) => void; @@ -101,9 +102,7 @@ export class ActorHandleRaw { // TODO: This is an imprecise count since it doesn't count the full URL length & URI encoding expansion in the URL size if (paramsStr.length > MAX_CONN_PARAMS_SIZE) { - throw new Error( - `Connection parameters must be less than ${MAX_CONN_PARAMS_SIZE} bytes`, - ); + throw new errors.ConnectionParametersTooLong(); } url += `¶ms=${encodeURIComponent(paramsStr)}`; @@ -154,7 +153,8 @@ export class ActorHandleRaw { ws.onmessage = (ev) => { const rawData = ev.data; if (typeof rawData !== "string") { - throw new Error("Response data was not string"); + logger().warn("response message was not string"); + throw new errors.MalformedResponseMessage(); } const response: wsToClient.ToClient = JSON.parse(rawData); @@ -167,14 +167,14 @@ export class ActorHandleRaw { response.body.rpcResponseError.id, ); inFlight.reject( - new Error(`RPC error: ${response.body.rpcResponseError.message}`), + new errors.RpcError(response.body.rpcResponseError.code, response.body.rpcResponseError.message, response.body.rpcResponseError.metadata) ); } else if ("event" in response.body) { this.#dispatchEvent(response.body.event); } else if ("error" in response.body) { logger().warn( "unhandled error from actor", - { message: response.body.error.message }, + { code: response.body.error.code, message: response.body.error.message, metadata: response.body.error.metadata } ); } else { assertUnreachable(response.body); @@ -184,7 +184,7 @@ export class ActorHandleRaw { #takeRpcInFlight(id: string): RpcInFlight { const inFlight = this.#websocketRpcInFlight.get(id); - if (!inFlight) throw new Error(`No in flight response for ${id}`); + if (!inFlight) throw new errors.InternalError(`No in flight response for ${id}`); this.#websocketRpcInFlight.delete(id); return inFlight; } diff --git a/sdks/actors/client/src/test.ts b/sdks/actors/client/src/test.ts index 0e83ab9a85..d4f730dcaa 100644 --- a/sdks/actors/client/src/test.ts +++ b/sdks/actors/client/src/test.ts @@ -1,5 +1,6 @@ import { Client } from "./mod.ts"; import { setupLogging } from "../../common/src/log.ts"; +import { InternalError } from "./errors.ts"; /** * Uses the Rivet CLI to read the manager endpoint to connect to. This allows @@ -12,7 +13,7 @@ export async function readEndpointFromCli(): Promise { args: ["manager", "endpoint"], }).output(); if (!output.success) { - throw new Error( + throw new InternalError( `Read endpoint failed with ${output.code}:\n${new TextDecoder().decode( output.stderr, )}`, diff --git a/sdks/actors/common/src/network.ts b/sdks/actors/common/src/network.ts index 01d27f21b8..21d19b4b00 100644 --- a/sdks/actors/common/src/network.ts +++ b/sdks/actors/common/src/network.ts @@ -1,3 +1,4 @@ export const PORT_NAME: string = "http"; +/** Only enforced client-side in order to prevent building malformed URLs. */ export const MAX_CONN_PARAMS_SIZE = 4096; diff --git a/sdks/actors/protocol/src/ws/to_client.ts b/sdks/actors/protocol/src/ws/to_client.ts index 678c079131..b611da36db 100644 --- a/sdks/actors/protocol/src/ws/to_client.ts +++ b/sdks/actors/protocol/src/ws/to_client.ts @@ -13,7 +13,9 @@ export interface RpcResponseOk { export interface RpcResponseError { id: string; + code: string; message: string; + metadata?: unknown; } export interface ToClientEvent { @@ -22,5 +24,7 @@ export interface ToClientEvent { } export interface ToClientError { + code: string; message: string; + metadata?: unknown; } diff --git a/sdks/actors/runtime/src/actor.ts b/sdks/actors/runtime/src/actor.ts index 60873f5e32..302327e69f 100644 --- a/sdks/actors/runtime/src/actor.ts +++ b/sdks/actors/runtime/src/actor.ts @@ -7,7 +7,6 @@ import { type Context as HonoContext, Hono } from "hono"; import { upgradeWebSocket } from "hono/deno"; import type { WSEvents } from "hono/ws"; import onChange from "on-change"; -import { MAX_CONN_PARAMS_SIZE } from "../../common/src/network.ts"; import { assertUnreachable } from "../../common/src/utils.ts"; import type * as wsToClient from "../../protocol/src/ws/to_client.ts"; import type * as wsToServer from "../../protocol/src/ws/to_server.ts"; @@ -17,6 +16,7 @@ import { Rpc } from "./rpc.ts"; import { instanceLogger, logger } from "./log.ts"; import { setupLogging } from "../../common/src/log.ts"; import type { Logger } from "@std/log/get-logger"; +import * as errors from "./errors.ts"; const KEYS = { SCHEDULE: { @@ -26,7 +26,6 @@ const KEYS = { return [...this.EVENT_PREFIX, id]; }, }, - // Shutting down is not part of the state because you can't meaningfully handle the state change STATE: { INITIALIZED: ["actor", "state", "initialized"], DATA: ["actor", "state", "data"], @@ -58,12 +57,12 @@ function isJsonSerializable(value: unknown): boolean { return false; } -export interface OnBeforeConnectOpts { +export interface OnBeforeConnectOptions { request: Request; parameters: ConnParams; } -export interface SaveStateOpts { +export interface SaveStateOptions { /** * Forces the state to be saved immediately. This function will return when the state has save successfully. */ @@ -151,9 +150,7 @@ export abstract class Actor< #validateStateEnabled() { if (!this.#stateEnabled) { - throw new Error( - "State not enabled. Must implement createState to use state.", - ); + throw new errors.StateNotEnabled(); } } @@ -198,7 +195,7 @@ export abstract class Actor< /** Updates the state and creates a new proxy. */ #setStateWithoutChange(value: State) { if (!isJsonSerializable(value)) { - throw new Error("State must be JSON serializable"); + throw new errors.InvalidStateType(); } this.#stateProxy = this.#createStateProxy(value); this.#stateRaw = value; @@ -208,7 +205,7 @@ export abstract class Actor< // If this can't be proxied, return raw value if (target === null || typeof target !== "object") { if (!isJsonSerializable(target)) { - throw new Error("State value must be JSON serializable"); + throw new errors.InvalidStateType(); } return target; } @@ -224,9 +221,7 @@ export abstract class Actor< // biome-ignore lint/suspicious/noExplicitAny: Don't know types in proxy (path: any, value: any, _previousValue: any, _applyData: any) => { if (!isJsonSerializable(value)) { - throw new Error( - `State value at path "${path}" must be JSON serializable`, - ); + throw new errors.InvalidStateType({ path }); } this.#stateChanged = true; @@ -390,16 +385,12 @@ export abstract class Actor< // Validate protocol const protocolVersion = c.req.query("version"); if (protocolVersion !== "1") { - throw new Error(`Invalid protocol version: ${protocolVersion}`); + logger().warn("invalid protocol version", { protocolVersion }); + throw new errors.InvalidProtocolVersion(protocolVersion); } // Validate params size (limiting to 4KB which is reasonable for query params) const paramsStr = c.req.query("params"); - if (paramsStr && paramsStr.length > MAX_CONN_PARAMS_SIZE) { - throw new Error( - `WebSocket params too large (max ${MAX_CONN_PARAMS_SIZE} bytes)`, - ); - } // Parse and validate params let params: ConnParams; @@ -407,8 +398,9 @@ export abstract class Actor< params = typeof paramsStr === "string" ? JSON.parse(paramsStr) : undefined; - } catch (err) { - throw new Error(`Invalid WebSocket params: ${err}`); + } catch (error) { + logger().warn("malformed connection parameters", { error }); + throw new errors.MalformedConnectionParameters(error); } // Authenticate connection @@ -468,9 +460,10 @@ export abstract class Actor< try { const value = evt.data.valueOf(); if (typeof value !== "string") { - throw new Error("message must be string"); + logger().warn("received non-string message"); + throw new errors.MalformedMessage(); } - // TODO: Validate message + const message: wsToServer.ToServer = JSON.parse(value); if ("rpcRequest" in message.body) { @@ -508,6 +501,20 @@ export abstract class Actor< assertUnreachable(message.body); } } catch (err) { + // Build response error information. Only return errors if flagged as public in order to prevent leaking internal behavior. + let code: string; + let message: string; + let metadata: unknown = undefined; + if (err instanceof errors.ActorError && err.public) { + code = err.code; + message = String(err); + metadata = err.metadata; + } else { + code = errors.INTERNAL_ERROR_CODE; + message = errors.INTERNAL_ERROR_DESCRIPTION; + } + + // Build response if (rpcRequestId) { ws.send( JSON.stringify( @@ -515,7 +522,9 @@ export abstract class Actor< body: { rpcResponseError: { id: rpcRequestId, - message: String(err), + code, + message, + metadata, }, }, } satisfies wsToClient.ToClient, @@ -527,7 +536,9 @@ export abstract class Actor< { body: { error: { - message: String(err), + code, + message, + metadata, }, }, } satisfies wsToClient.ToClient, @@ -560,7 +571,7 @@ export abstract class Actor< } #assertReady() { - if (!this.#ready) throw new Error("Actor not ready"); + if (!this.#ready) throw new errors.InternalError("Actor not ready"); } async #executeRpc( @@ -570,14 +581,16 @@ export abstract class Actor< ): Promise { // Prevent calling private or reserved methods if (!this.#isValidRpc(rpcName)) { - throw new Error(`RPC ${rpcName} is not accessible`); + logger().warn("attempted to call invalid rpc", { rpcName }); + throw new errors.RpcNotFound(); } // Check if the method exists on this object // biome-ignore lint/suspicious/noExplicitAny: RPC name is dynamic from client const rpcFunction = (this as any)[rpcName]; if (typeof rpcFunction !== "function") { - throw new Error(`RPC ${rpcName} not found`); + logger().warn("rpc not found", { rpcName }); + throw new errors.RpcNotFound(); } // TODO: pass abortable to the rpc to decide when to abort @@ -592,9 +605,15 @@ export abstract class Actor< } } catch (error) { if (error instanceof DOMException && error.name === "TimeoutError") { - throw new Error(`RPC ${rpcName} timed out`); + logger().error("rpc timed out", { rpcName }); + throw new errors.RpcTimedOut(); } else { - throw new Error(`RPC ${rpcName} failed: ${String(error)}`); + if (error instanceof errors.UserError) { + logger().info("rpc raised user error", { rpcName, error }); + } else { + logger().error("rpc raised error", { rpcName, error }); + } + throw error; } } finally { this.#saveStateThrottled(); @@ -609,7 +628,7 @@ export abstract class Actor< protected _onStateChange?(newState: State): void | Promise; protected _onBeforeConnect?( - opts: OnBeforeConnectOpts, + opts: OnBeforeConnectOptions, ): ConnState | Promise; protected _onConnect?(connection: Connection): void | Promise; @@ -691,7 +710,7 @@ export abstract class Actor< * This is helpful if running a long task that may fail later or when * running a background job that updates the state. */ - protected async _saveState(opts: SaveStateOpts) { + protected async _saveState(opts: SaveStateOptions) { this.#assertReady(); if (this.#stateChanged) { diff --git a/sdks/actors/runtime/src/connection.ts b/sdks/actors/runtime/src/connection.ts index 7919580671..20055e83e7 100644 --- a/sdks/actors/runtime/src/connection.ts +++ b/sdks/actors/runtime/src/connection.ts @@ -2,6 +2,7 @@ import { assertExists } from "@std/assert/exists"; import type { WSContext } from "hono/ws"; import type * as wsToClient from "../../protocol/src/ws/to_client.ts"; import type { Actor, AnyActor } from "./actor.ts"; +import * as errors from "./errors.ts"; // biome-ignore lint/suspicious/noExplicitAny: Must be used for `extends` type GetConnStateType = A extends Actor @@ -37,9 +38,7 @@ export class Connection { #validateStateEnabled() { if (!this.#stateEnabled) { - throw new Error( - "Connection state not enabled. Must implement prepareConnection to use connection state.", - ); + throw new errors.ConnectionStateNotEnabled(); } } diff --git a/sdks/actors/runtime/src/errors.ts b/sdks/actors/runtime/src/errors.ts new file mode 100644 index 0000000000..961c9ec4d3 --- /dev/null +++ b/sdks/actors/runtime/src/errors.ts @@ -0,0 +1,107 @@ +export const INTERNAL_ERROR_CODE = "internal_error"; +export const INTERNAL_ERROR_DESCRIPTION = "Internal error. Read the actor logs for more details."; + +export const USER_ERROR_CODE = "user_error"; + +interface ActorErrorOptions extends ErrorOptions { + /** Error data can safely be serialized in a response to the client. */ + public?: boolean; + /** Metadata assocaited with this error. This will be sent to clients. */ + metadata?: unknown; +} + +export class ActorError extends Error { + public public: boolean; + public metadata?: unknown; + + constructor(public readonly code: string, message: string, opts?: ActorErrorOptions) { + super(message, { cause: opts?.cause }); + this.public = opts?.public ?? false; + this.metadata = opts?.metadata; + } +} + +export class InternalError extends ActorError { + constructor(message: string) { + super(INTERNAL_ERROR_CODE, message); + } +} + +export class Unreachable extends InternalError { + constructor(x: never) { + super(`Unreachable case: ${x}`); + } +} + +export class StateNotEnabled extends ActorError { + constructor() { + super("state_not_enabled", "State not enabled. Must implement `_onInitialize` to use state."); + } +} + +export class ConnectionStateNotEnabled extends ActorError { + constructor() { + super( + "connection_state_not_enabled", + "Connection state not enabled. Must implement `_onBeforeConnect` to use connection state.", + ); + } +} + +export class RpcTimedOut extends ActorError { + constructor() { + super("rpc_timed_out", "RPC timed out.", { public: true }); + } +} + +export class RpcNotFound extends ActorError { + constructor() { + super("rpc_not_found", "RPC not found.", { public: true }); + } +} + +export class InvalidProtocolVersion extends ActorError { + constructor(version?: string) { + super("invalid_protocol_version", `Invalid protocol version \`${version}\`.`, { public: true }); + } +} + +export class MalformedConnectionParameters extends ActorError { + constructor(cause: unknown) { + super("malformed_connnection_parameters", `Malformed connection parameters: ${cause}`, { public: true, cause }); + } +} + +export class MalformedMessage extends ActorError { + constructor(cause?: unknown) { + super("malformed_message", `Malformed message: ${cause}`, { public: true, cause }); + } +} + +export interface InvalidStateTypeOptions { + path?: unknown; +} + +export class InvalidStateType extends ActorError { + constructor(opts?: InvalidStateTypeOptions) { + const msg = + (opts?.path + ? `Attempted to set invalid state at path \`${opts.path}\`.` + : "Attempted to set invalid state.") + + " State must be JSON serializable."; + super("invalid_state_type", msg); + } +} + +export interface UserErrorOptions extends ErrorOptions { + code?: string; + meta: unknown; +} + +/** Error that can be safely returned to the user. Usually used to indicate a user error. */ +export class UserError extends ActorError { + constructor(message: string, opts?: UserErrorOptions) { + super(opts?.code ?? USER_ERROR_CODE, message); + } +} + diff --git a/sdks/actors/runtime/src/mod.ts b/sdks/actors/runtime/src/mod.ts index d7ad508a94..c02e631dbf 100644 --- a/sdks/actors/runtime/src/mod.ts +++ b/sdks/actors/runtime/src/mod.ts @@ -1,6 +1,6 @@ export { Actor } from "./actor.ts"; -export type { OnBeforeConnectOpts } from "./actor.ts"; - +export { UserError } from "./errors.ts"; +export type { OnBeforeConnectOptions } from "./actor.ts"; +export { Connection } from "./connection.ts"; export { Rpc } from "./rpc.ts"; -export { Connection } from "./connection.ts"; diff --git a/sdks/actors/runtime/src/utils.ts b/sdks/actors/runtime/src/utils.ts index 9409e829a4..e2af96e180 100644 --- a/sdks/actors/runtime/src/utils.ts +++ b/sdks/actors/runtime/src/utils.ts @@ -1,3 +1,5 @@ +import * as errors from "./errors.ts"; + export function assertUnreachable(x: never): never { - throw new Error(`Unreachable case: ${x}`); + throw new errors.Unreachable(x); }