Skip to content

Commit

Permalink
chore: finalize query syntax for manager
Browse files Browse the repository at this point in the history
  • Loading branch information
NathanFlurry committed Dec 12, 2024
1 parent c704d17 commit 8fdf21d
Show file tree
Hide file tree
Showing 23 changed files with 424 additions and 266 deletions.
4 changes: 2 additions & 2 deletions examples/counter/counter.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Actor } from "@rivet-gg/actors";
import type { Rpc, OnBeforeConnectOpts } from "@rivet-gg/actors";
import type { Rpc, OnBeforeConnectOptions } from "@rivet-gg/actors";

interface State {
count: number;
Expand All @@ -22,7 +22,7 @@ export default class Counter extends Actor<
return { count: 0 };
}

override _onBeforeConnect(opts: OnBeforeConnectOpts<ConnParams>): ConnState {
override _onBeforeConnect(opts: OnBeforeConnectOptions<ConnParams>): ConnState {
this._log.info("parameters", { params: opts.parameters });
return { mod: opts.parameters?.mod ?? 1 };
}
Expand Down
3 changes: 2 additions & 1 deletion examples/counter/deno.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
"hono": "jsr:@hono/hono@^4.6.12",
"@rivet-gg/actors-core": "../../sdks/actors/core/src/mod.ts",
"@rivet-gg/actors": "../../sdks/actors/runtime/src/mod.ts",
"on-change": "npm:on-change@^5.0.1"
"on-change": "npm:on-change@^5.0.1",
"zod": "npm:zod@^3.24.1"
},
"fmt": {
"useTabs": true
Expand Down
9 changes: 7 additions & 2 deletions examples/counter/deno.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

163 changes: 113 additions & 50 deletions sdks/actors/client/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,28 @@ import type {
RivetConfigResponse,
} from "../../manager-protocol/src/mod.ts";
import type { CreateRequest } from "../../manager-protocol/src/query.ts";
import type { AnyActor } from "../../runtime/src/actor.ts";
import * as errors from "./errors.ts";
import { ActorHandleRaw } from "./handle.ts";
import { logger } from "./log.ts";
import * as errors from "./errors.ts";

export interface GetOpts {
export interface QueryOptions {
/** Parameters to pass to the connection. */
parameters?: unknown;
create?: CreateRequest;
}

export interface GetWithIdOptions extends QueryOptions {}

export interface GetOptions extends QueryOptions {
/** Prevents creating a new actor if one does not exist. */
noCreate?: boolean;
/** Config used to create the actor. */
create?: Partial<CreateRequest>;
}

export interface CreateOptions extends QueryOptions {
/** Config used to create the actor. */
create: CreateRequest;
}

/**
Expand All @@ -20,17 +35,15 @@ export interface GetOpts {
*
* Private methods (e.g. those starting with `_`) are automatically excluded.
*/
export type ActorHandle<A = unknown> =
& ActorHandleRaw
& {
[
K in keyof A as K extends string ? K extends `_${string}` ? never
: K
: K
]: A[K] extends (...args: infer Args) => infer Return
? ActorRPCFunction<Args, Return>
: never;
};
export type ActorHandle<A = unknown> = ActorHandleRaw & {
[K in keyof A as K extends string
? K extends `_${string}`
? never
: K
: K]: A[K] extends (...args: infer Args) => infer Return
? ActorRPCFunction<Args, Return>
: never;
};

/**
* RPC function returned by the actor proxy. This will call `ActorHandle.rpc`
Expand Down Expand Up @@ -61,22 +74,103 @@ export class Client {
} else {
// Convert to promise
this.#managerEndpointPromise = new Promise((resolve) =>
resolve(managerEndpointPromise)
resolve(managerEndpointPromise),
);
}

this.#regionPromise = this.#fetchRegion();
}

async get<A = unknown>(
async getWithId<A extends AnyActor = AnyActor>(
actorId: string,
opts?: GetWithIdOptions,
) {
logger().debug("get actor with id ", {
actorId,
parameters: opts?.parameters,
});

const resJson = await this.#sendManagerRequest<
ActorsRequest,
ActorsResponse
>("POST", "/actors", {
query: {
getForId: {
actorId,
},
},
});

const handle = this.#createHandle(resJson.endpoint, opts?.parameters);
return this.#createProxy(handle) as ActorHandle<A>;
}

async get<A extends AnyActor = AnyActor>(
tags: ActorTags,
opts?: GetOpts,
opts?: GetOptions,
): Promise<ActorHandle<A>> {
logger().debug("get actor", { tags, opts });
const handle = await this.#createHandle(tags, opts);
if (!("name" in tags)) throw new Error("Tags must contain name");

// Build create config
let create: CreateRequest | undefined = undefined;
if (!opts?.noCreate) {
create = create ?? {
// Default to the same tags as the request
tags: tags,
};

// Default to the chosen region
if (!create.region) create.region = (await this.#regionPromise)?.id;
}

logger().debug("get actor", { tags, parameters: opts?.parameters, create });

const resJson = await this.#sendManagerRequest<
ActorsRequest,
ActorsResponse
>("POST", "/actors", {
query: {
getOrCreateForTags: {
tags,
create,
},
},
});

const handle = this.#createHandle(resJson.endpoint, opts?.parameters);
return this.#createProxy(handle) as ActorHandle<A>;
}

async create<A extends AnyActor = AnyActor>(
opts: CreateOptions,
): Promise<ActorHandle<A>> {
// Build create config
const create = opts.create;

// Default to the chosen region
if (!create.region) create.region = (await this.#regionPromise)?.id;

logger().debug("create actor", { parameters: opts?.parameters, create });

const resJson = await this.#sendManagerRequest<
ActorsRequest,
ActorsResponse
>("POST", "/actors", {
query: {
create,
},
});

const handle = this.#createHandle(resJson.endpoint, opts?.parameters);
return this.#createProxy(handle) as ActorHandle<A>;
}

#createHandle(endpoint: string, parameters: unknown): ActorHandleRaw {
const handle = new ActorHandleRaw(endpoint, parameters);
handle.connect();
return handle;
}

#createProxy(handle: ActorHandleRaw): ActorHandle {
// Stores returned RPC functions for faster calls
const methodCache = new Map<string, ActorRPCFunction>();
Expand Down Expand Up @@ -154,36 +248,6 @@ export class Client {
}) as ActorHandle;
}

async #createHandle(
tags: ActorTags,
opts?: GetOpts,
): Promise<ActorHandleRaw> {
const create = opts?.create ?? {
tags,
buildTags: {
name: tags.name,
current: "true",
},
region: (await this.#regionPromise)?.id,
};

const resJson = await this.#sendManagerRequest<
ActorsRequest,
ActorsResponse
>("POST", "/actors", {
query: {
getOrCreate: {
tags,
create,
},
},
});

const handle = new ActorHandleRaw(resJson.endpoint, opts?.parameters);
handle.connect();
return handle;
}

/** Sends an HTTP request to the manager actor. */
async #sendManagerRequest<Request, Response>(
method: string,
Expand All @@ -207,7 +271,6 @@ export class Client {
return res.json();
} catch (error) {
throw new errors.ManagerError(String(error), { cause: error });

}
}

Expand Down
17 changes: 10 additions & 7 deletions sdks/actors/client/src/errors.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
import { MAX_CONN_PARAMS_SIZE } from "../../common/src/network.ts";

export class ActorClientError extends Error {
export class ActorClientError extends Error {}

}

export class InternalError extends ActorClientError {
}
export class InternalError extends ActorClientError {}

export class ManagerError extends ActorClientError {
constructor(error: string, opts?: ErrorOptions) {
Expand All @@ -15,7 +12,9 @@ export class ManagerError extends ActorClientError {

export class ConnectionParametersTooLong extends ActorClientError {
constructor() {
super(`Connection parameters must be less than ${MAX_CONN_PARAMS_SIZE} bytes`);
super(
`Connection parameters must be less than ${MAX_CONN_PARAMS_SIZE} bytes`,
);
}
}

Expand All @@ -26,7 +25,11 @@ export class MalformedResponseMessage extends ActorClientError {
}

export class RpcError extends ActorClientError {
constructor(public readonly code: string, message: string, public readonly metadata?: unknown) {
constructor(
public readonly code: string,
message: string,
public readonly metadata?: unknown,
) {
super(message);
}
}
48 changes: 27 additions & 21 deletions sdks/actors/client/src/handle.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { assertEquals } from "@std/assert";
import { MAX_CONN_PARAMS_SIZE } from "../../common/src/network.ts";
import { assertUnreachable } from "../../common/src/utils.ts";
import * as wsToClient from "../../protocol/src/ws/to_client.ts";
import type * as wsToClient from "../../protocol/src/ws/to_client.ts";
import type * as wsToServer from "../../protocol/src/ws/to_server.ts";
import { logger } from "./log.ts";
import * as errors from "./errors.ts";
import { logger } from "./log.ts";

interface RpcInFlight {
resolve: (response: wsToClient.RpcResponseOk) => void;
Expand Down Expand Up @@ -49,22 +49,22 @@ export class ActorHandleRaw {

const requestId = crypto.randomUUID();

const { promise: resolvePromise, resolve, reject } = Promise.withResolvers<
wsToClient.RpcResponseOk
>();
const {
promise: resolvePromise,
resolve,
reject,
} = Promise.withResolvers<wsToClient.RpcResponseOk>();
this.#websocketRpcInFlight.set(requestId, { resolve, reject });

this.#webSocketSend(
{
body: {
rpcRequest: {
id: requestId,
name,
args,
},
this.#webSocketSend({
body: {
rpcRequest: {
id: requestId,
name,
args,
},
} satisfies wsToServer.ToServer,
);
},
} satisfies wsToServer.ToServer);

// TODO: Throw error if disconnect is called

Expand Down Expand Up @@ -167,15 +167,20 @@ export class ActorHandleRaw {
response.body.rpcResponseError.id,
);
inFlight.reject(
new errors.RpcError(response.body.rpcResponseError.code, response.body.rpcResponseError.message, response.body.rpcResponseError.metadata)
new errors.RpcError(
response.body.rpcResponseError.code,
response.body.rpcResponseError.message,
response.body.rpcResponseError.metadata,
),
);
} else if ("event" in response.body) {
this.#dispatchEvent(response.body.event);
} else if ("error" in response.body) {
logger().warn(
"unhandled error from actor",
{ code: response.body.error.code, message: response.body.error.message, metadata: response.body.error.metadata }
);
logger().warn("unhandled error from actor", {
code: response.body.error.code,
message: response.body.error.message,
metadata: response.body.error.metadata,
});
} else {
assertUnreachable(response.body);
}
Expand All @@ -184,7 +189,8 @@ export class ActorHandleRaw {

#takeRpcInFlight(id: string): RpcInFlight {
const inFlight = this.#websocketRpcInFlight.get(id);
if (!inFlight) throw new errors.InternalError(`No in flight response for ${id}`);
if (!inFlight)
throw new errors.InternalError(`No in flight response for ${id}`);
this.#websocketRpcInFlight.delete(id);
return inFlight;
}
Expand Down
2 changes: 1 addition & 1 deletion sdks/actors/client/src/test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Client } from "./mod.ts";
import { setupLogging } from "../../common/src/log.ts";
import { InternalError } from "./errors.ts";
import { Client } from "./mod.ts";

/**
* Uses the Rivet CLI to read the manager endpoint to connect to. This allows
Expand Down
Loading

0 comments on commit 8fdf21d

Please sign in to comment.