diff --git a/packages/api/src/agent.ts b/packages/api/src/agent.ts index 127936e58c4..63f33ea33f4 100644 --- a/packages/api/src/agent.ts +++ b/packages/api/src/agent.ts @@ -1,14 +1,16 @@ import { AtpClient } from './client' import { BSKY_LABELER_DID } from './const' -import { AtpDispatcher } from './dispatcher/atp-dispatcher' +import { SessionManager, isSessionManager } from './session/session-manager' import { - StatelessDispatcher, - StatelessDispatcherOptions, -} from './dispatcher/stateless-dispatcher' + StatelessSessionManager, + StatelessSessionManagerOptions, +} from './session/stateless-session-handler' import { AtpAgentGlobalOpts, AtprotoServiceType } from './types' const MAX_LABELERS = 10 +export type AtpAgentOptions = SessionManager | StatelessSessionManagerOptions + export class AtpAgent { /** * The labelers to be used across all requests with the takedown capability @@ -28,19 +30,21 @@ export class AtpAgent { labelersHeader: string[] = [] proxyHeader?: string - readonly dispatcher: AtpDispatcher + readonly sessionManager: SessionManager get com() { return this.api.com } - constructor(options: AtpDispatcher | StatelessDispatcherOptions) { - this.dispatcher = - options instanceof AtpDispatcher - ? options - : new StatelessDispatcher(options) + constructor(options: AtpAgentOptions) { + this.sessionManager = isSessionManager(options) + ? options + : new StatelessSessionManager(options) - this.api = new AtpClient(this.dispatcher) + this.api = new AtpClient((...args) => + // The function needs to be "bound" to the right context + this.sessionManager.fetchHandler(...args), + ) this.api.setHeader('atproto-accept-labelers', () => // Make sure to read the static property from the subclass in case it was // overridden. @@ -54,7 +58,7 @@ export class AtpAgent { } clone() { - const inst = new AtpAgent(this.dispatcher) + const inst = new AtpAgent(this.sessionManager) this.copyInto(inst) return inst } @@ -70,16 +74,16 @@ export class AtpAgent { return inst } + /** @deprecated only used for a very particular use-case in the official Bluesky app */ async getServiceUrl(): Promise { - // Clone to prevent mutation of the original dispatcher's URL - return this.dispatcher.getServiceUrl() + return this.sessionManager.getServiceUrl() } /** * Get the active session's DID */ async getDid(): Promise { - return this.dispatcher.getDid() + return this.sessionManager.getDid() } /** diff --git a/packages/api/src/bsky-agent.ts b/packages/api/src/bsky-agent.ts index c361edcaa4e..c4461866b41 100644 --- a/packages/api/src/bsky-agent.ts +++ b/packages/api/src/bsky-agent.ts @@ -51,7 +51,7 @@ declare global { export class BskyAgent extends AtpAgent { clone() { - const inst = new BskyAgent(this.dispatcher) + const inst = new BskyAgent(this.sessionManager) this.copyInto(inst) return inst } diff --git a/packages/api/src/client/index.ts b/packages/api/src/client/index.ts index aaacca73c3c..cc573ad9d45 100644 --- a/packages/api/src/client/index.ts +++ b/packages/api/src/client/index.ts @@ -1,11 +1,7 @@ /** * GENERATED CODE - DO NOT MODIFY */ -import { - XrpcClient, - XrpcDispatcher, - XrpcDispatcherOptions, -} from '@atproto/xrpc' +import { XrpcClient, FetchHandler, FetchHandlerOptions } from '@atproto/xrpc' import { schemas } from './lexicons' import { CID } from 'multiformats/cid' import * as ComAtprotoAdminDefs from './types/com/atproto/admin/defs' @@ -369,7 +365,7 @@ export class AtpClient extends XrpcClient { app: AppNS tools: ToolsNS - constructor(options: XrpcDispatcher | XrpcDispatcherOptions) { + constructor(options: FetchHandler | FetchHandlerOptions) { super(options, schemas) this.com = new ComNS(this) this.app = new AppNS(this) diff --git a/packages/api/src/dispatcher/atp-dispatcher.ts b/packages/api/src/dispatcher/atp-dispatcher.ts deleted file mode 100644 index 1cc44154ee7..00000000000 --- a/packages/api/src/dispatcher/atp-dispatcher.ts +++ /dev/null @@ -1,6 +0,0 @@ -import { XrpcDispatcher } from '@atproto/xrpc' - -export abstract class AtpDispatcher extends XrpcDispatcher { - abstract getDid(): string | PromiseLike - abstract getServiceUrl(): URL | PromiseLike -} diff --git a/packages/api/src/dispatcher/index.ts b/packages/api/src/dispatcher/index.ts deleted file mode 100644 index bb096d381ce..00000000000 --- a/packages/api/src/dispatcher/index.ts +++ /dev/null @@ -1,3 +0,0 @@ -export * from './atp-dispatcher' -export * from './session-dispatcher' -export * from './stateless-dispatcher' diff --git a/packages/api/src/dispatcher/stateless-dispatcher.ts b/packages/api/src/dispatcher/stateless-dispatcher.ts deleted file mode 100644 index b81c6e911a9..00000000000 --- a/packages/api/src/dispatcher/stateless-dispatcher.ts +++ /dev/null @@ -1,19 +0,0 @@ -import { AtpDispatcher } from './atp-dispatcher' - -export type StatelessDispatcherOptions = { - service: string | URL - headers?: { [_ in string]?: null | string } -} - -export class StatelessDispatcher extends AtpDispatcher { - getServiceUrl: () => URL | PromiseLike - - constructor({ service, headers }: StatelessDispatcherOptions) { - super({ service, headers }) - this.getServiceUrl = () => new URL(service) - } - - async getDid(): Promise { - throw new Error('Not logged in') - } -} diff --git a/packages/api/src/index.ts b/packages/api/src/index.ts index ef4feced23b..b53897b3f6d 100644 --- a/packages/api/src/index.ts +++ b/packages/api/src/index.ts @@ -11,7 +11,7 @@ export * from './types' export * from './const' export * from './util' export * from './client' -export * from './dispatcher' +export * from './session' export * from './rich-text/rich-text' export * from './rich-text/sanitization' export * from './rich-text/unicode' diff --git a/packages/api/src/dispatcher/session-dispatcher.ts b/packages/api/src/session/atp-session-manager.ts similarity index 92% rename from packages/api/src/dispatcher/session-dispatcher.ts rename to packages/api/src/session/atp-session-manager.ts index cc329c292b5..39ebcee89c0 100644 --- a/packages/api/src/dispatcher/session-dispatcher.ts +++ b/packages/api/src/session/atp-session-manager.ts @@ -16,24 +16,24 @@ import { AtpPersistSessionHandler, AtpSessionData, } from '../types' -import { AtpDispatcher } from './atp-dispatcher' +import { SessionManager } from './session-manager' const ReadableStream = globalThis.ReadableStream as | typeof globalThis.ReadableStream | undefined export type Fetch = (this: void, request: Request) => Promise -export interface SessionDispatcherOptions { +export interface AtpSessionManagerOptions { service: string | URL persistSession?: AtpPersistSessionHandler fetch?: Fetch } /** - * An {@link XrpcDispatcher} that uses legacy "com.atproto.server" endpoints to + * A {@link SessionManager} that uses legacy "com.atproto.server" endpoints to * manage sessions and route XRPC requests. */ -export class SessionDispatcher extends AtpDispatcher { +export class AtpSessionManager implements SessionManager { public serviceUrl: URL public pdsUrl?: URL // The PDS URL, driven by the did doc public session?: AtpSessionData @@ -43,9 +43,7 @@ export class SessionDispatcher extends AtpDispatcher { private persistSession?: AtpPersistSessionHandler private refreshSessionPromise: Promise | undefined - constructor(options: SessionDispatcherOptions) { - super((url, init) => this._dispatch(url, init)) - + constructor(options: AtpSessionManagerOptions) { this.serviceUrl = new URL(options.service) this.fetch = options.fetch || globalThis.fetch this.setPersistSessionHandler(options.persistSession) @@ -71,8 +69,7 @@ export class SessionDispatcher extends AtpDispatcher { } /** - * Internal fetch method that will be triggered by the XRPC Dispatcher (parent - * class). This method will: + * fetch method that will be triggered by the ApiClient. This method will: * - Set the proper origin for the request (pds or service) * - Add the proper auth headers to the request * - Handle session refreshes @@ -80,10 +77,7 @@ export class SessionDispatcher extends AtpDispatcher { * @note We define this as a method on the prototype instead of inlining the * function in the constructor for readability. */ - protected async _dispatch( - url: string, - reqInit: RequestInit, - ): Promise { + async fetchHandler(url: string, reqInit: RequestInit): Promise { // wait for any active session-refreshes to finish await this.refreshSessionPromise @@ -251,11 +245,12 @@ export class SessionDispatcher extends AtpDispatcher { ): Promise { try { this.session = session - // For this particular call, we want this._dispatch() to be used in order - // to refresh the session if needed. To do so, we use a (new) AtpClient - // instance to build the HTTP request, and pass "this" as the dispatcher - // so that this._dispatch() is called. - const res = await new AtpClient(this).com.atproto.server.getSession() + // For this particular call, we want this.fetchHandler() to be used in + // order to refresh the session if needed. So let's create a new client + // instance with the right fetchHandler. + const res = await new AtpClient( + this.fetchHandler, + ).com.atproto.server.getSession() if (res.data.did !== this.session.did) { throw new XRPCError( ResponseType.InvalidRequest, diff --git a/packages/api/src/session/index.ts b/packages/api/src/session/index.ts new file mode 100644 index 00000000000..3759556de9b --- /dev/null +++ b/packages/api/src/session/index.ts @@ -0,0 +1,3 @@ +export * from './session-manager' +export * from './atp-session-manager' +export * from './stateless-session-handler' diff --git a/packages/api/src/session/session-manager.ts b/packages/api/src/session/session-manager.ts new file mode 100644 index 00000000000..70fcd889e8e --- /dev/null +++ b/packages/api/src/session/session-manager.ts @@ -0,0 +1,20 @@ +export interface SessionManager { + fetchHandler(url: string, reqInit: RequestInit): Promise + getDid(): string | PromiseLike + + /** @deprecated only used for a very particular use-case in the official Bluesky app */ + getServiceUrl(): URL | PromiseLike +} + +export function isSessionManager(value: T): value is T & SessionManager { + return ( + value !== null && + typeof value === 'object' && + 'fetchHandler' in value && + typeof value.fetchHandler === 'function' && + 'getDid' in value && + typeof value.getDid === 'function' && + 'getServiceUrl' in value && + typeof value.getServiceUrl === 'function' + ) +} diff --git a/packages/api/src/session/stateless-session-handler.ts b/packages/api/src/session/stateless-session-handler.ts new file mode 100644 index 00000000000..c4757afc2da --- /dev/null +++ b/packages/api/src/session/stateless-session-handler.ts @@ -0,0 +1,43 @@ +import { SessionManager } from './session-manager' + +export type StatelessSessionManagerOptions = { + service: string | URL + headers?: { [_ in string]?: null | string } +} + +export class StatelessSessionManager implements SessionManager { + readonly serviceUrl: URL + readonly headers: Map + + constructor({ service, headers }: StatelessSessionManagerOptions) { + this.headers = new Map( + headers + ? Object.entries(headers).filter( + ( + e: T, + ): e is T & [T[0], NonNullable] => e[1] != null, + ) + : headers, + ) + this.serviceUrl = new URL(service) + } + + async getServiceUrl(): Promise { + return this.serviceUrl + } + + async getDid(): Promise { + throw new Error('Not logged in') + } + + async fetchHandler(url: string, reqInit: RequestInit): Promise { + const fullUrl = new URL(url, this.serviceUrl) + const headers = new Headers(reqInit.headers) + + for (const [key, value] of this.headers) { + if (value != null) headers.set(key, value) + } + + return globalThis.fetch(fullUrl.toString(), { ...reqInit, headers }) + } +} diff --git a/packages/dev-env/src/agent.ts b/packages/dev-env/src/agent.ts index 2cc359e1fbb..550624afb62 100644 --- a/packages/dev-env/src/agent.ts +++ b/packages/dev-env/src/agent.ts @@ -1,36 +1,36 @@ import AtpAgent, { - SessionDispatcher, - SessionDispatcherOptions, + AtpSessionManager, + AtpSessionManagerOptions, } from '@atproto/api' import { EXAMPLE_LABELER } from './const' export class TestAgent extends AtpAgent { - readonly dispatcher: SessionDispatcher + readonly sessionManager: AtpSessionManager - constructor(options: SessionDispatcherOptions) { - const dispatcher = new SessionDispatcher(options) - super(dispatcher) - this.dispatcher = dispatcher + constructor(options: AtpSessionManagerOptions) { + const sessionManager = new AtpSessionManager(options) + super(sessionManager) + this.sessionManager = sessionManager this.configureLabelersHeader([EXAMPLE_LABELER]) } get session() { - return this.dispatcher.session + return this.sessionManager.session } get hasSession() { - return this.dispatcher.hasSession + return this.sessionManager.hasSession } get service() { - return this.dispatcher.serviceUrl + return this.sessionManager.serviceUrl } - login(...args: Parameters) { - return this.dispatcher.login(...args) + login(...args: Parameters) { + return this.sessionManager.login(...args) } - createAccount(...args: Parameters) { - return this.dispatcher.createAccount(...args) + createAccount(...args: Parameters) { + return this.sessionManager.createAccount(...args) } } diff --git a/packages/lex-cli/src/codegen/client.ts b/packages/lex-cli/src/codegen/client.ts index 94cb6cdf738..53daf6dd597 100644 --- a/packages/lex-cli/src/codegen/client.ts +++ b/packages/lex-cli/src/codegen/client.ts @@ -61,14 +61,14 @@ const indexTs = ( nsidTokens: Record, ) => gen(project, '/index.ts', async (file) => { - //= import { XrpcClient, XrpcDispatcher, XrpcDispatcherOptions } from '@atproto/xrpc' + //= import { XrpcClient, FetchHandler, FetchHandlerOptions } from '@atproto/xrpc' const xrpcImport = file.addImportDeclaration({ moduleSpecifier: '@atproto/xrpc', }) xrpcImport.addNamedImports([ { name: 'XrpcClient' }, - { name: 'XrpcDispatcher' }, - { name: 'XrpcDispatcherOptions' }, + { name: 'FetchHandler' }, + { name: 'FetchHandlerOptions' }, ]) //= import {schemas} from './lexicons' file @@ -131,13 +131,13 @@ const indexTs = ( }) } - //= constructor (options: XrpcDispatcher | XrpcDispatcherOptions) { + //= constructor (options: FetchHandler | FetchHandlerOptions) { //= super(options, schemas) //= {namespace declarations} //= } atpClientCls.addConstructor({ parameters: [ - { name: 'options', type: 'XrpcDispatcher | XrpcDispatcherOptions' }, + { name: 'options', type: 'FetchHandler | FetchHandlerOptions' }, ], statements: [ 'super(options, schemas)', diff --git a/packages/xrpc/src/fetch-handler.ts b/packages/xrpc/src/fetch-handler.ts new file mode 100644 index 00000000000..fa7f0564ad8 --- /dev/null +++ b/packages/xrpc/src/fetch-handler.ts @@ -0,0 +1,83 @@ +import { Gettable } from './types' + +export type FetchHandler = ( + this: void, + /** + * The URL (pathname + query parameters) to make the request to, without the + * origin. The origin (protocol, hostname, and port) must be added by this + * {@link FetchHandler}, typically based on authentication or other factors. + */ + url: string, + init: RequestInit, +) => Promise + +export type FetchHandlerOptions = BuildFetchHandlerOptions | string | URL + +export type BuildFetchHandlerOptions = { + /** + * The service URL to make requests to. This can be a string, URL, or a + * function that returns a string or URL. This is useful for dynamic URLs, + * such as a service URL that changes based on authentication. + */ + service: Gettable + + /** + * Headers to be added to every request. If a function is provided, it will be + * called on each request to get the headers. This is useful for dynamic + * headers, such as authentication tokens that may expire. + */ + headers?: + | { [_ in string]?: Gettable } + | (() => Iterable< + [name: string, value: string, options?: { override?: boolean }] + >) + + /** + * Bring your own fetch implementation. Typically useful for testing, logging, + * mocking, or adding retries, session management, signatures, proof of + * possession (DPoP), etc. Defaults to the global `fetch` function. + */ + fetch?: typeof globalThis.fetch +} + +export function buildFetchHandler(options: FetchHandlerOptions): FetchHandler { + const { + service, + headers: inputHeaders = undefined, + fetch = globalThis.fetch, + } = typeof options === 'string' || options instanceof URL + ? { service: options } + : options + + if (typeof fetch !== 'function') { + throw new TypeError( + 'XrpcDispatcher requires fetch() to be available in your environment.', + ) + } + + return async function (url, init) { + const base = typeof service === 'function' ? await service() : service + const fullUrl = new URL(url, base) + + const headers = new Headers(init.headers) + + if (typeof inputHeaders === 'function') { + for (const [key, value, options = undefined] of inputHeaders()) { + if (options?.override ?? !headers.has(key)) { + headers.set(key, value) + } + } + } else if (inputHeaders) { + for (const [key, getter] of Object.entries(inputHeaders)) { + if (headers.has(key)) continue + + const value = typeof getter === 'function' ? await getter() : getter + if (value == null) continue + + headers.set(key, value) + } + } + + return fetch(fullUrl, { ...init, headers }) + } +} diff --git a/packages/xrpc/src/index.ts b/packages/xrpc/src/index.ts index 0a385446498..25289bed3dc 100644 --- a/packages/xrpc/src/index.ts +++ b/packages/xrpc/src/index.ts @@ -1,6 +1,6 @@ export * from './client' +export * from './fetch-handler' export * from './types' -export * from './xrpc-dispatcher' export * from './xrpc-client' import { Client } from './client' diff --git a/packages/xrpc/src/xrpc-client.ts b/packages/xrpc/src/xrpc-client.ts index c50fd39310b..b7f4eeec156 100644 --- a/packages/xrpc/src/xrpc-client.ts +++ b/packages/xrpc/src/xrpc-client.ts @@ -17,22 +17,27 @@ import { httpResponseBodyParse, isErrorResponseBody, } from './util' -import { XrpcDispatcher, XrpcDispatcherOptions } from './xrpc-dispatcher' +import { + FetchHandler, + FetchHandlerOptions, + buildFetchHandler, +} from './fetch-handler' export class XrpcClient { - readonly dispatcher: XrpcDispatcher + readonly fetchHandler: FetchHandler readonly lex: Lexicons protected headers = new Map>() constructor( - dispatcher: XrpcDispatcher | XrpcDispatcherOptions, - lex?: Lexicons | Iterable, + fetchHandler: FetchHandler | FetchHandlerOptions, + lex: Lexicons | Iterable, ) { - this.dispatcher = - dispatcher instanceof XrpcDispatcher - ? dispatcher - : new XrpcDispatcher(dispatcher) + this.fetchHandler = + typeof fetchHandler === 'function' + ? fetchHandler + : buildFetchHandler(fetchHandler) + this.lex = lex instanceof Lexicons ? lex : new Lexicons(lex) } @@ -80,7 +85,7 @@ export class XrpcClient { } try { - const response = await this.dispatcher.dispatch(reqUrl, init) + const response = await this.fetchHandler.call(undefined, reqUrl, init) const resStatus = response.status const resHeaders = Object.fromEntries(response.headers.entries()) diff --git a/packages/xrpc/src/xrpc-dispatcher.ts b/packages/xrpc/src/xrpc-dispatcher.ts deleted file mode 100644 index 5c1e756a1a8..00000000000 --- a/packages/xrpc/src/xrpc-dispatcher.ts +++ /dev/null @@ -1,135 +0,0 @@ -import { Gettable } from './types' - -export type Fetch = (request: Request) => Promise -export type Dispatch = ( - /** - * The URL (pathname + query parameters) to make the request to, without the - * origin. The origin (protocol, hostname, and port) must be added by this - * {@link FetchHandler}, typically based on authentication or other factors. - */ - url: string, - init: RequestInit, -) => Promise - -export type XrpcDispatcherOptions = - | Dispatch - | BuildDispatchOptions - | string - | URL - -/** - * Default {@link FetchAgent} implementation that uses WHATWG's `fetch` API and - * no authentication. This class would typically be extended to add authentication - * or other features (retry, session management, etc.). - * - * @example - * ```ts - * class MyDispatcher extends XrpcDispatcher { - * constructor( - * public serviceUri: string | URL, - * public bearer?: string, - * ) { - * super({ - * service: serviceUri - * headers: () => ({ authorization: `Bearer ${this.bearer}` }), - * }) - * } - * } - * - * const client = new XrpcClient(new MyDispatcher('https://example.com', 'my-token')) - * ``` - * - * @example - * ```ts - * class MyDispatcher extends XrpcDispatcher { - * constructor( - * public serviceUri: string | URL, - * public bearer?: string, - * ) { - * super((url, init) => { - * const uri = new URL(url, this.serviceUri) - * const request = new Request(uri, init) - * if (this.bearer) { - * request.headers.set('Authorization', `Bearer ${this.bearer}`) - * } - * return globalThis.fetch(request) - * }) - * } - * } - * - * const client = new XrpcClient(new MyDispatcher('https://example.com', 'my-token')) - * ``` - */ -export class XrpcDispatcher { - public readonly dispatch: Dispatch - constructor(options: XrpcDispatcherOptions) { - this.dispatch = buildDispatch(options).bind(this) - } -} - -export type BuildDispatchOptions = { - /** - * The service URL to make requests to. This can be a string, URL, or a - * function that returns a string or URL. This is useful for dynamic URLs, - * such as a service URL that changes based on authentication. - */ - service: Gettable - - /** - * Headers to be added to every request. If a function is provided, it will be - * called on each request to get the headers. This is useful for dynamic - * headers, such as authentication tokens that may expire. - */ - headers?: - | { [_ in string]?: Gettable } - | (() => Iterable< - [name: string, value: string, options?: { override?: boolean }] - >) - - /** - * Bring your own fetch implementation. Typically useful for testing, logging, - * mocking, or adding retries, session management, signatures, proof of - * possession (DPoP), etc. Defaults to the global `fetch` function. - */ - fetch?: (request: Request) => Promise -} - -export function buildDispatch(options: XrpcDispatcherOptions): Dispatch { - if (typeof options === 'function') return options - - const { - service, - headers = undefined, - fetch = globalThis.fetch, - } = typeof options === 'string' || options instanceof URL - ? { service: options } - : options - - if (typeof fetch !== 'function') { - throw new TypeError( - 'XrpcDispatcher requires fetch() to be available in your environment.', - ) - } - - return async function (url, init) { - const base = typeof service === 'function' ? await service() : service - const request = new Request(new URL(url, base), init) - if (typeof headers === 'function') { - for (const [key, value, options = undefined] of headers()) { - if (options?.override ?? !request.headers.has(key)) { - request.headers.set(key, value) - } - } - } else if (headers) { - for (const [key, getter] of Object.entries(headers)) { - if (request.headers.has(key)) continue - - const value = typeof getter === 'function' ? await getter() : getter - if (value == null) continue - - request.headers.set(key, value) - } - } - return fetch(request) - } -}