diff --git a/src/client/client.test.ts b/src/client/client.test.ts index 69f59ca18..3ec13f2cf 100644 --- a/src/client/client.test.ts +++ b/src/client/client.test.ts @@ -11,6 +11,8 @@ import type { Equal, Expect, JSONValue, SimplifyDeepArray } from '../utils/types import { validator } from '../validator' import { hc } from './client' import type { ClientResponse, InferRequestType, InferResponseType } from './types' +import { streamSSE } from '../helper/streaming' +import { EventSource, EventSourceInit } from './eventsource' describe('Basic - JSON', () => { const app = new Hono() @@ -1335,3 +1337,36 @@ describe('WebSocket Provider Integration', () => { expect(webSocketMock).toHaveBeenCalledWith(expectedUrl, undefined) }) }) + +describe('SSE Provider Integration', () => { + it('should initialize the SSE provider correctly', async () => { + const app = new Hono() + + const route = app.get('/sse-endpoint', (c) => + streamSSE(c, async (stream) => { + await stream.writeSSE({ + data: 'Hello from server!', + event: 'message', + }) + }) + ) + + type AppType = typeof route + + const client = hc('http://localhost') + assertType<(args: EventSourceInit) => EventSource>(client['sse-endpoint'].$sse) + const es = client['sse-endpoint'].$sse() + + expect(es.url).toBe('http://localhost/sse-endpoint') + + const eventData = await new Promise((resolve) => { + es.onmessage = (data) => { + expect(data).toBeInstanceOf(MessageEvent) + resolve(data) + } + }) + + expect(eventData.data).toBe('Hello from server!') + expect(eventData.type).toBe('message') + }) +}) diff --git a/src/client/client.ts b/src/client/client.ts index 95f3a6fd0..5c2c780a2 100644 --- a/src/client/client.ts +++ b/src/client/client.ts @@ -2,6 +2,7 @@ import type { Hono } from '../hono' import type { FormValue, ValidationTargets } from '../types' import { serialize } from '../utils/cookie' import type { UnionToIntersection } from '../utils/types' +import { EventSource } from './eventsource' import type { Callback, Client, ClientRequestOptions } from './types' import { buildSearchParams, @@ -193,6 +194,9 @@ export const hc = >( return establishWebSocket(targetUrl.toString()) } + if (method === 'sse') { + return new EventSource(url, opts.args[0]) + } const req = new ClientRequestImpl(url, method) if (method) { diff --git a/src/client/eventsource-parser/errors.ts b/src/client/eventsource-parser/errors.ts new file mode 100644 index 000000000..ef42b0ea7 --- /dev/null +++ b/src/client/eventsource-parser/errors.ts @@ -0,0 +1,44 @@ +/** + * The type of error that occurred. + * @public + */ +export type ErrorType = 'invalid-retry' | 'unknown-field' + +/** + * Error thrown when encountering an issue during parsing. + * + * @public + */ +export class ParseError extends Error { + /** + * The type of error that occurred. + */ + type: ErrorType + + /** + * In the case of an unknown field encountered in the stream, this will be the field name. + */ + field?: string + + /** + * In the case of an unknown field encountered in the stream, this will be the value of the field. + */ + value?: string + + /** + * The line that caused the error, if available. + */ + line?: string + + constructor( + message: string, + options: { type: ErrorType; field?: string; value?: string; line?: string } + ) { + super(message) + this.name = 'ParseError' + this.type = options.type + this.field = options.field + this.value = options.value + this.line = options.line + } +} diff --git a/src/client/eventsource-parser/index.ts b/src/client/eventsource-parser/index.ts new file mode 100644 index 000000000..fa0173ec9 --- /dev/null +++ b/src/client/eventsource-parser/index.ts @@ -0,0 +1,11 @@ +/**! + * EventSource Parser v3.0.0 + * https://github.com/rexxars/eventsource-parser + * + * Copyright (c) 2024 Espen Hovlandsdal + * Licensed under the MIT license. + * https://github.com/rexxars/eventsource-parser/blob/main/LICENSE + */ +export { type ErrorType, ParseError } from './errors' +export { createParser } from './parse' +export type { EventSourceMessage, EventSourceParser, ParserCallbacks } from './types.ts' diff --git a/src/client/eventsource-parser/parse.ts b/src/client/eventsource-parser/parse.ts new file mode 100644 index 000000000..eb1ddaf88 --- /dev/null +++ b/src/client/eventsource-parser/parse.ts @@ -0,0 +1,214 @@ +/** + * EventSource/Server-Sent Events parser + * @see https://html.spec.whatwg.org/multipage/server-sent-events.html + */ +import { ParseError } from './errors' +import type { EventSourceParser, ParserCallbacks } from './types' + +// eslint-disable-next-line @typescript-eslint/no-unused-vars +function noop(_arg: unknown) { + // intentional noop +} + +/** + * Creates a new EventSource parser. + * + * @param callbacks - Callbacks to invoke on different parsing events: + * - `onEvent` when a new event is parsed + * - `onError` when an error occurs + * - `onRetry` when a new reconnection interval has been sent from the server + * - `onComment` when a comment is encountered in the stream + * + * @returns A new EventSource parser, with `parse` and `reset` methods. + * @public + */ +export function createParser(callbacks: ParserCallbacks): EventSourceParser { + if (typeof callbacks === 'function') { + throw new TypeError( + '`callbacks` must be an object, got a function instead. Did you mean `{onEvent: fn}`?' + ) + } + + const { onEvent = noop, onError = noop, onRetry = noop, onComment } = callbacks + + let incompleteLine = '' + + let isFirstChunk = true + let id: string | undefined + let data = '' + let eventType = '' + + function feed(newChunk: string) { + // Strip any UTF8 byte order mark (BOM) at the start of the stream + const chunk = isFirstChunk ? newChunk.replace(/^\xEF\xBB\xBF/, '') : newChunk + + // If there was a previous incomplete line, append it to the new chunk, + // so we may process it together as a new (hopefully complete) chunk. + const [complete, incomplete] = splitLines(`${incompleteLine}${chunk}`) + + for (const line of complete) { + parseLine(line) + } + + incompleteLine = incomplete + isFirstChunk = false + } + + function parseLine(line: string) { + // If the line is empty (a blank line), dispatch the event + if (line === '') { + dispatchEvent() + return + } + + // If the line starts with a U+003A COLON character (:), ignore the line. + if (line.startsWith(':')) { + if (onComment) { + onComment(line.slice(line.startsWith(': ') ? 2 : 1)) + } + return + } + + // If the line contains a U+003A COLON character (:) + const fieldSeparatorIndex = line.indexOf(':') + if (fieldSeparatorIndex !== -1) { + // Collect the characters on the line before the first U+003A COLON character (:), + // and let `field` be that string. + const field = line.slice(0, fieldSeparatorIndex) + + // Collect the characters on the line after the first U+003A COLON character (:), + // and let `value` be that string. If value starts with a U+0020 SPACE character, + // remove it from value. + const offset = line[fieldSeparatorIndex + 1] === ' ' ? 2 : 1 + const value = line.slice(fieldSeparatorIndex + offset) + + processField(field, value, line) + return + } + + // Otherwise, the string is not empty but does not contain a U+003A COLON character (:) + // Process the field using the whole line as the field name, and an empty string as the field value. + // 👆 This is according to spec. That means that a line that has the value `data` will result in + // a newline being added to the current `data` buffer, for instance. + processField(line, '', line) + } + + function processField(field: string, value: string, line: string) { + // Field names must be compared literally, with no case folding performed. + switch (field) { + case 'event': + // Set the `event type` buffer to field value + eventType = value + break + case 'data': + // Append the field value to the `data` buffer, then append a single U+000A LINE FEED(LF) + // character to the `data` buffer. + data = `${data}${value}\n` + break + case 'id': + // If the field value does not contain U+0000 NULL, then set the `ID` buffer to + // the field value. Otherwise, ignore the field. + id = value.includes('\0') ? undefined : value + break + case 'retry': + // If the field value consists of only ASCII digits, then interpret the field value as an + // integer in base ten, and set the event stream's reconnection time to that integer. + // Otherwise, ignore the field. + if (/^\d+$/.test(value)) { + onRetry(parseInt(value, 10)) + } else { + onError( + new ParseError(`Invalid \`retry\` value: "${value}"`, { + type: 'invalid-retry', + value, + line, + }) + ) + } + break + default: + // Otherwise, the field is ignored. + onError( + new ParseError( + `Unknown field "${field.length > 20 ? `${field.slice(0, 20)}…` : field}"`, + { type: 'unknown-field', field, value, line } + ) + ) + break + } + } + + function dispatchEvent() { + const shouldDispatch = data.length > 0 + if (shouldDispatch) { + onEvent({ + id, + event: eventType || undefined, + // If the data buffer's last character is a U+000A LINE FEED (LF) character, + // then remove the last character from the data buffer. + data: data.endsWith('\n') ? data.slice(0, -1) : data, + }) + } + + // Reset for the next event + id = undefined + data = '' + eventType = '' + } + + function reset(options: { consume?: boolean } = {}) { + if (incompleteLine && options.consume) { + parseLine(incompleteLine) + } + + id = undefined + data = '' + eventType = '' + incompleteLine = '' + } + + return { feed, reset } +} + +/** + * For the given `chunk`, split it into lines according to spec, and return any remaining incomplete line. + * + * @param chunk - The chunk to split into lines + * @returns A tuple containing an array of complete lines, and any remaining incomplete line + * @internal + */ +function splitLines(chunk: string): [Array, string] { + /** + * According to the spec, a line is terminated by either: + * - U+000D CARRIAGE RETURN U+000A LINE FEED (CRLF) character pair + * - a single U+000A LINE FEED(LF) character not preceded by a U+000D CARRIAGE RETURN(CR) character + * - a single U+000D CARRIAGE RETURN(CR) character not followed by a U+000A LINE FEED(LF) character + */ + + const lines: Array = [] + let incompleteLine = '' + + const totalLength = chunk.length + for (let i = 0; i < totalLength; i++) { + const char = chunk[i] + + if (char === '\r' && chunk[i + 1] === '\n') { + // CRLF + lines.push(incompleteLine) + incompleteLine = '' + i++ // Skip the LF character + } else if (char === '\r') { + // Standalone CR + lines.push(incompleteLine) + incompleteLine = '' + } else if (char === '\n') { + // Standalone LF + lines.push(incompleteLine) + incompleteLine = '' + } else { + incompleteLine += char + } + } + + return [lines, incompleteLine] +} diff --git a/src/client/eventsource-parser/stream.ts b/src/client/eventsource-parser/stream.ts new file mode 100644 index 000000000..1dc3a1bde --- /dev/null +++ b/src/client/eventsource-parser/stream.ts @@ -0,0 +1,88 @@ +import { createParser } from './parse' +import type { EventSourceMessage, EventSourceParser } from './types.ts' + +/** + * Options for the EventSourceParserStream. + * + * @public + */ +export interface StreamOptions { + /** + * Behavior when a parsing error occurs. + * + * - A custom function can be provided to handle the error. + * - `'terminate'` will error the stream and stop parsing. + * - Any other value will ignore the error and continue parsing. + * + * @defaultValue `undefined` + */ + onError?: 'terminate' | ((error: Error) => void) + + /** + * Callback for when a reconnection interval is sent from the server. + * + * @param retry - The number of milliseconds to wait before reconnecting. + */ + onRetry?: (retry: number) => void + + /** + * Callback for when a comment is encountered in the stream. + * + * @param comment - The comment encountered in the stream. + */ + onComment?: (comment: string) => void +} + +/** + * A TransformStream that ingests a stream of strings and produces a stream of `EventSourceMessage`. + * + * @example Basic usage + * ``` + * const eventStream = + * response.body + * .pipeThrough(new TextDecoderStream()) + * .pipeThrough(new EventSourceParserStream()) + * ``` + * + * @example Terminate stream on parsing errors + * ``` + * const eventStream = + * response.body + * .pipeThrough(new TextDecoderStream()) + * .pipeThrough(new EventSourceParserStream({terminateOnError: true})) + * ``` + * + * @public + */ +export class EventSourceParserStream extends TransformStream { + constructor({ onError, onRetry, onComment }: StreamOptions = {}) { + let parser!: EventSourceParser + + super({ + start(controller) { + parser = createParser({ + onEvent: (event) => { + controller.enqueue(event) + }, + onError(error) { + if (onError === 'terminate') { + controller.error(error) + } else if (typeof onError === 'function') { + onError(error) + } + + // Ignore by default + }, + onRetry, + onComment, + }) + }, + transform(chunk) { + parser.feed(chunk) + }, + }) + } +} + +export { type ErrorType, ParseError } from './errors' +export type { EventSourceMessage } from './types' diff --git a/src/client/eventsource-parser/types.ts b/src/client/eventsource-parser/types.ts new file mode 100644 index 000000000..aa6a1f919 --- /dev/null +++ b/src/client/eventsource-parser/types.ts @@ -0,0 +1,97 @@ +import type { ParseError } from './errors' + +/** + * EventSource parser instance. + * + * Needs to be reset between reconnections/when switching data source, using the `reset()` method. + * + * @public + */ +export interface EventSourceParser { + /** + * Feeds the parser another chunk. The method _does not_ return a parsed message. + * Instead, if the chunk was a complete message (or completed a previously incomplete message), + * it will invoke the `onParse` callback used to create the parsers. + * + * @param chunk - The chunk to parse. Can be a partial, eg in the case of streaming messages. + * @public + */ + feed(chunk: string): void + + /** + * Resets the parser state. This is required when you have a new stream of messages - + * for instance in the case of a client being disconnected and reconnecting. + * + * Previously received, incomplete data will NOT be parsed unless you pass `consume: true`, + * which tells the parser to attempt to consume any incomplete data as if it ended with a newline + * character. This is useful for cases when a server sends a non-EventSource message that you + * want to be able to react to in an `onError` callback. + * + * @public + */ + reset(options?: { consume?: boolean }): void +} + +/** + * A parsed EventSource message event + * + * @public + */ +export interface EventSourceMessage { + /** + * The event type sent from the server. Note that this differs from the browser `EventSource` + * implementation in that browsers will default this to `message`, whereas this parser will + * leave this as `undefined` if not explicitly declared. + */ + event?: string + + /** + * ID of the message, if any was provided by the server. Can be used by clients to keep the + * last received message ID in sync when reconnecting. + */ + id?: string + + /** + * The data received for this message + */ + data: string +} + +/** + * Callbacks that can be passed to the parser to handle different types of parsed messages + * and errors. + * + * @public + */ +export interface ParserCallbacks { + /** + * Callback for when a new event/message is parsed from the stream. + * This is the main callback that clients will use to handle incoming messages. + * + * @param event - The parsed event/message + */ + onEvent?: (event: EventSourceMessage) => void + + /** + * Callback for when the server sends a new reconnection interval through the `retry` field. + * + * @param retry - The number of milliseconds to wait before reconnecting. + */ + onRetry?: (retry: number) => void + + /** + * Callback for when a comment is encountered in the stream. + * + * @param comment - The comment encountered in the stream. + */ + onComment?: (comment: string) => void + + /** + * Callback for when an error occurs during parsing. This is a catch-all for any errors + * that occur during parsing, and can be used to handle them in a custom way. Most clients + * tend to silently ignore any errors and instead retry, but it can be helpful to log/debug. + * + * @param error - The error that occurred during parsing + */ + onError?: (error: ParseError) => void +} diff --git a/src/client/eventsource/errors.ts b/src/client/eventsource/errors.ts new file mode 100644 index 000000000..7f097571c --- /dev/null +++ b/src/client/eventsource/errors.ts @@ -0,0 +1,141 @@ +/** + * An extended version of the `Event` emitted by the `EventSource` object when an error occurs. + * While the spec does not include any additional properties, we intentionally go beyond the spec + * and provide some (minimal) additional information to aid in debugging. + * + * @public + */ +export class ErrorEvent extends Event { + /** + * HTTP status code, if this was triggered by an HTTP error + * Note: this is not part of the spec, but is included for better error handling. + * + * @public + */ + public code?: number | undefined + + /** + * Optional message attached to the error. + * Note: this is not part of the spec, but is included for better error handling. + * + * @public + */ + public message?: string | undefined + + /** + * Constructs a new `ErrorEvent` instance. This is typically not called directly, + * but rather emitted by the `EventSource` object when an error occurs. + * + * @param type - The type of the event (should be "error") + * @param errorEventInitDict - Optional properties to include in the error event + */ + constructor( + type: string, + errorEventInitDict?: { message?: string | undefined; code?: number | undefined } + ) { + super(type) + this.code = errorEventInitDict?.code ?? undefined + this.message = errorEventInitDict?.message ?? undefined + } + + /** + * Node.js "hides" the `message` and `code` properties of the `ErrorEvent` instance, + * when it is `console.log`'ed. This makes it harder to debug errors. To ease debugging, + * we explicitly include the properties in the `inspect` method. + * + * This is automatically called by Node.js when you `console.log` an instance of this class. + * + * @param _depth - The current depth + * @param options - The options passed to `util.inspect` + * @param inspect - The inspect function to use (prevents having to import it from `util`) + * @returns A string representation of the error + */ + [Symbol.for('nodejs.util.inspect.custom')]( + _depth: number, + options: { colors: boolean }, + inspect: (obj: unknown, inspectOptions: { colors: boolean }) => string + ): string { + return inspect(inspectableError(this), options) + } + + /** + * Deno "hides" the `message` and `code` properties of the `ErrorEvent` instance, + * when it is `console.log`'ed. This makes it harder to debug errors. To ease debugging, + * we explicitly include the properties in the `inspect` method. + * + * This is automatically called by Deno when you `console.log` an instance of this class. + * + * @param inspect - The inspect function to use (prevents having to import it from `util`) + * @param options - The options passed to `Deno.inspect` + * @returns A string representation of the error + */ + [Symbol.for('Deno.customInspect')]( + inspect: (obj: unknown, inspectOptions: { colors: boolean }) => string, + options: { colors: boolean } + ): string { + return inspect(inspectableError(this), options) + } +} + +/** + * For environments where DOMException may not exist, we will use a SyntaxError instead. + * While this isn't strictly according to spec, it is very close. + * + * @param message - The message to include in the error + * @returns A `DOMException` or `SyntaxError` instance + * @internal + */ +export function syntaxError(message: string): SyntaxError { + // If someone can figure out a way to make this work without depending on DOM/Node.js typings, + // and without casting to `any`, please send a PR 🙏 + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const DomException = (globalThis as any).DOMException + if (typeof DomException === 'function') { + return new DomException(message, 'SyntaxError') + } + + return new SyntaxError(message) +} + +/** + * Flatten an error into a single error message string. + * Unwraps nested errors and joins them with a comma. + * + * @param err - The error to flatten + * @returns A string representation of the error + * @internal + */ +export function flattenError(err: unknown): string { + if (!(err instanceof Error)) { + return `${err}` + } + + if ('errors' in err && Array.isArray(err.errors)) { + return err.errors.map(flattenError).join(', ') + } + + if ('cause' in err && err.cause instanceof Error) { + return `${err}: ${flattenError(err.cause)}` + } + + return err.message +} + +/** + * Convert an `ErrorEvent` instance into a plain object for inspection. + * + * @param err - The `ErrorEvent` instance to inspect + * @returns A plain object representation of the error + * @internal + */ +function inspectableError(err: ErrorEvent) { + return { + type: err.type, + message: err.message, + code: err.code, + defaultPrevented: err.defaultPrevented, + cancelable: err.cancelable, + timeStamp: err.timeStamp, + } +} diff --git a/src/client/eventsource/eventsource.ts b/src/client/eventsource/eventsource.ts new file mode 100644 index 000000000..8583b60db --- /dev/null +++ b/src/client/eventsource/eventsource.ts @@ -0,0 +1,601 @@ +import { + createParser, + type EventSourceMessage, + type EventSourceParser, +} from '../eventsource-parser' + +import { ErrorEvent, flattenError, syntaxError } from './errors' +import type { + AddEventListenerOptions, + EventListenerOptions, + EventListenerOrEventListenerObject, + EventSourceEventMap, + EventSourceInit, + FetchLike, + FetchLikeInit, + FetchLikeResponse, +} from './types' + +/** + * An `EventSource` instance opens a persistent connection to an HTTP server, which sends events + * in `text/event-stream` format. The connection remains open until closed by calling `.close()`. + * + * @public + * @example + * ```js + * const eventSource = new EventSource('https://example.com/stream') + * eventSource.addEventListener('error', (error) => { + * console.error(error) + * }) + * eventSource.addEventListener('message', (event) => { + * console.log('Received message:', event.data) + * }) + * ``` + */ +export class EventSource extends EventTarget { + /** + * ReadyState representing an EventSource currently trying to connect + * + * @public + */ + static CONNECTING = 0 as const + + /** + * ReadyState representing an EventSource connection that is open (eg connected) + * + * @public + */ + static OPEN = 1 as const + + /** + * ReadyState representing an EventSource connection that is closed (eg disconnected) + * + * @public + */ + static CLOSED = 2 as const + + /** + * ReadyState representing an EventSource currently trying to connect + * + * @public + */ + readonly CONNECTING = 0 as const + + /** + * ReadyState representing an EventSource connection that is open (eg connected) + * + * @public + */ + readonly OPEN = 1 as const + + /** + * ReadyState representing an EventSource connection that is closed (eg disconnected) + * + * @public + */ + readonly CLOSED = 2 as const + + /** + * Returns the state of this EventSource object's connection. It can have the values described below. + * + * [MDN Reference](https://developer.mozilla.org/docs/Web/API/EventSource/readyState) + * + * Note: typed as `number` instead of `0 | 1 | 2` for compatibility with the `EventSource` interface, + * defined in the TypeScript `dom` library. + * + * @public + */ + public get readyState(): number { + return this.#readyState + } + + /** + * Returns the URL providing the event stream. + * + * [MDN Reference](https://developer.mozilla.org/docs/Web/API/EventSource/url) + * + * @public + */ + public get url(): string { + return this.#url.href + } + + /** + * Returns true if the credentials mode for connection requests to the URL providing the event stream is set to "include", and false otherwise. + * + * [MDN Reference](https://developer.mozilla.org/docs/Web/API/EventSource/withCredentials) + */ + public get withCredentials(): boolean { + return this.#withCredentials + } + + /** [MDN Reference](https://developer.mozilla.org/docs/Web/API/EventSource/error_event) */ + public get onerror(): ((ev: ErrorEvent) => unknown) | null { + return this.#onError + } + public set onerror(value: ((ev: ErrorEvent) => unknown) | null) { + this.#onError = value + } + + /** [MDN Reference](https://developer.mozilla.org/docs/Web/API/EventSource/message_event) */ + public get onmessage(): ((ev: MessageEvent) => unknown) | null { + return this.#onMessage + } + public set onmessage(value: ((ev: MessageEvent) => unknown) | null) { + this.#onMessage = value + } + + /** [MDN Reference](https://developer.mozilla.org/docs/Web/API/EventSource/open_event) */ + public get onopen(): ((ev: Event) => unknown) | null { + return this.#onOpen + } + public set onopen(value: ((ev: Event) => unknown) | null) { + this.#onOpen = value + } + + override addEventListener( + type: K, + listener: (this: EventSource, ev: EventSourceEventMap[K]) => unknown, + options?: boolean | AddEventListenerOptions + ): void + override addEventListener( + type: string, + listener: (this: EventSource, event: MessageEvent) => unknown, + options?: boolean | AddEventListenerOptions + ): void + override addEventListener( + type: string, + listener: EventListenerOrEventListenerObject, + options?: boolean | AddEventListenerOptions + ): void + override addEventListener( + type: string, + listener: + | ((this: EventSource, event: MessageEvent) => unknown) + | EventListenerOrEventListenerObject, + options?: boolean | AddEventListenerOptions + ): void { + const listen = listener as (this: EventSource, event: Event) => unknown + super.addEventListener(type, listen, options) + } + + override removeEventListener( + type: K, + listener: (this: EventSource, ev: EventSourceEventMap[K]) => unknown, + options?: boolean | EventListenerOptions + ): void + override removeEventListener( + type: string, + listener: (this: EventSource, event: MessageEvent) => unknown, + options?: boolean | EventListenerOptions + ): void + override removeEventListener( + type: string, + listener: EventListenerOrEventListenerObject, + options?: boolean | EventListenerOptions + ): void + override removeEventListener( + type: string, + listener: + | ((this: EventSource, event: MessageEvent) => unknown) + | EventListenerOrEventListenerObject, + options?: boolean | EventListenerOptions + ): void { + const listen = listener as (this: EventSource, event: Event) => unknown + super.removeEventListener(type, listen, options) + } + + constructor(url: string | URL, eventSourceInitDict?: EventSourceInit) { + super() + + try { + if (url instanceof URL) { + this.#url = url + } else if (typeof url === 'string') { + this.#url = new URL(url, getBaseURL()) + } else { + throw new Error('Invalid URL') + } + } catch (err) { + throw syntaxError('An invalid or illegal string was specified') + } + + this.#parser = createParser({ + onEvent: this.#onEvent, + onRetry: this.#onRetryChange, + }) + + this.#readyState = this.CONNECTING + this.#reconnectInterval = 3000 + this.#fetch = eventSourceInitDict?.fetch ?? globalThis.fetch + this.#withCredentials = eventSourceInitDict?.withCredentials ?? false + + this.#connect() + } + + /** + * Aborts any instances of the fetch algorithm started for this EventSource object, and sets the readyState attribute to CLOSED. + * + * [MDN Reference](https://developer.mozilla.org/docs/Web/API/EventSource/close) + * + * @public + */ + close(): void { + if (this.#reconnectTimer) clearTimeout(this.#reconnectTimer) + if (this.#readyState === this.CLOSED) return + if (this.#controller) this.#controller.abort() + this.#readyState = this.CLOSED + this.#controller = undefined + } + + // PRIVATES FOLLOW + + /** + * Current connection state + * + * @internal + */ + #readyState: number + + /** + * Original URL used to connect. + * + * Note that this will stay the same even after a redirect. + * + * @internal + */ + #url: URL + + /** + * The destination URL after a redirect. Is reset on reconnection. + * + * @internal + */ + #redirectUrl: URL | undefined + + /** + * Whether to include credentials in the request + * + * @internal + */ + #withCredentials: boolean + + /** + * The fetch implementation to use + * + * @internal + */ + #fetch: FetchLike + + /** + * The reconnection time in milliseconds + * + * @internal + */ + #reconnectInterval: number + + /** + * Reference to an ongoing reconnect attempt, if any + * + * @internal + */ + #reconnectTimer: ReturnType | undefined + + /** + * The last event ID seen by the EventSource, which will be sent as `Last-Event-ID` in the + * request headers on a reconnection attempt. + * + * @internal + */ + #lastEventId: string | null = null + + /** + * The AbortController instance used to abort the fetch request + * + * @internal + */ + #controller: AbortController | undefined + + /** + * Instance of an EventSource parser (`eventsource-parser` npm module) + * + * @internal + */ + #parser: EventSourceParser + + /** + * Holds the current error handler, attached through `onerror` property directly. + * Note that `addEventListener('error', …)` will not be stored here. + * + * @internal + */ + #onError: ((ev: ErrorEvent) => unknown) | null = null + + /** + * Holds the current message handler, attached through `onmessage` property directly. + * Note that `addEventListener('message', …)` will not be stored here. + * + * @internal + */ + #onMessage: ((ev: MessageEvent) => unknown) | null = null + + /** + * Holds the current open handler, attached through `onopen` property directly. + * Note that `addEventListener('open', …)` will not be stored here. + * + * @internal + */ + #onOpen: ((ev: Event) => unknown) | null = null + + /** + * Connect to the given URL and start receiving events + * + * @internal + */ + #connect() { + this.#readyState = this.CONNECTING + this.#controller = new AbortController() + + // Browser tests are failing if we directly call `this.#fetch()`, thus the indirection. + const fetch = this.#fetch + fetch(this.#url, this.#getRequestOptions()) + .then(this.#onFetchResponse) + .catch(this.#onFetchError) + } + + /** + * Handles the fetch response + * + * @param response - The Fetch(ish) response + * @internal + */ + #onFetchResponse = async (response: FetchLikeResponse) => { + this.#parser.reset() + + const { body, redirected, status, headers } = response + + // [spec] a client can be told to stop reconnecting using the HTTP 204 No Content response code. + if (status === 204) { + // We still need to emit an error event - this mirrors the browser behavior, + // and without it there is no way to tell the user that the connection was closed. + this.#failConnection('Server sent HTTP 204, not reconnecting', 204) + this.close() + return + } + + // [spec] …Event stream requests can be redirected using HTTP 301 and 307 redirects as with + // [spec] normal HTTP requests. + // Spec does not say anything about other redirect codes (302, 308), but this seems an + // unintended omission, rather than a feature. Browsers will happily redirect on other 3xxs's. + if (redirected) { + this.#redirectUrl = new URL(response.url) + } else { + this.#redirectUrl = undefined + } + + // [spec] if res's status is not 200, …, then fail the connection. + if (status !== 200) { + this.#failConnection(`Non-200 status code (${status})`, status) + return + } + + // [spec] …or if res's `Content-Type` is not `text/event-stream`, then fail the connection. + const contentType = headers.get('content-type') || '' + if (!contentType.startsWith('text/event-stream')) { + this.#failConnection('Invalid content type, expected "text/event-stream"', status) + return + } + + // [spec] …if the readyState attribute is set to a value other than CLOSED… + if (this.#readyState === this.CLOSED) { + return + } + + // [spec] …sets the readyState attribute to OPEN and fires an event + // [spec] …named open at the EventSource object. + this.#readyState = this.OPEN + + const openEvent = new Event('open') + this.#onOpen?.(openEvent) + this.dispatchEvent(openEvent) + + // Ensure that the response stream is a web stream + if (typeof body !== 'object' || !body || !('getReader' in body)) { + this.#failConnection('Invalid response body, expected a web ReadableStream', status) + this.close() // This should only happen if `fetch` provided is "faulty" - don't reconnect + return + } + + const decoder = new TextDecoder() + + const reader = body.getReader() + let open = true + + do { + const { done, value } = await reader.read() + if (value) { + this.#parser.feed(decoder.decode(value as any, { stream: !done })) + } + + if (!done) { + continue + } + + open = false + this.#parser.reset() + + this.#scheduleReconnect() + } while (open) + } + + /** + * Handles rejected requests for the EventSource endpoint + * + * @param err - The error from `fetch()` + * @internal + */ + #onFetchError = (err: Error & { type?: string }) => { + this.#controller = undefined + + // We expect abort errors when the user manually calls `close()` - ignore those + if (err.name === 'AbortError' || err.type === 'aborted') { + return + } + + this.#scheduleReconnect(flattenError(err)) + } + + /** + * Get request options for the `fetch()` request + * + * @returns The request options + * @internal + */ + #getRequestOptions(): FetchLikeInit { + const lastEvent = this.#lastEventId ? { 'Last-Event-ID': this.#lastEventId } : undefined + const headers = { Accept: 'text/event-stream', ...lastEvent } + + const init: FetchLikeInit = { + // [spec] Let `corsAttributeState` be `Anonymous`… + // [spec] …will have their mode set to "cors"… + mode: 'cors', + redirect: 'follow', + headers, + cache: 'no-store', + signal: this.#controller?.signal, + } + + // Some environments crash if attempting to set `credentials` where it is not supported, + // eg on Cloudflare Workers. To avoid this, we only set it in browser-like environments. + if ('window' in globalThis) { + // [spec] …and their credentials mode set to "same-origin" + // [spec] …if the `withCredentials` attribute is `true`, set the credentials mode to "include"… + init.credentials = this.withCredentials ? 'include' : 'same-origin' + } + + return init + } + + /** + * Called by EventSourceParser instance when an event has successfully been parsed + * and is ready to be processed. + * + * @param event - The parsed event + * @internal + */ + #onEvent = (event: EventSourceMessage) => { + if (typeof event.id === 'string') { + this.#lastEventId = event.id + } + + const messageEvent = new MessageEvent(event.event || 'message', { + data: event.data, + origin: this.#redirectUrl ? this.#redirectUrl.origin : this.#url.origin, + lastEventId: event.id || '', + }) + + // The `onmessage` property of the EventSource instance only triggers on messages without an + // `event` field, or ones that explicitly set `message`. + if (this.#onMessage && (!event.event || event.event === 'message')) { + this.#onMessage(messageEvent) + } + + this.dispatchEvent(messageEvent) + } + + /** + * Called by EventSourceParser instance when a new reconnection interval is received + * from the EventSource endpoint. + * + * @param value - The new reconnection interval in milliseconds + * @internal + */ + #onRetryChange = (value: number) => { + this.#reconnectInterval = value + } + + /** + * Handles the process referred to in the EventSource specification as "failing a connection". + * + * @param error - The error causing the connection to fail + * @param code - The HTTP status code, if available + * @internal + */ + #failConnection(message?: string, code?: number) { + // [spec] …if the readyState attribute is set to a value other than CLOSED, + // [spec] sets the readyState attribute to CLOSED… + if (this.#readyState !== this.CLOSED) { + this.#readyState = this.CLOSED + } + + // [spec] …and fires an event named `error` at the `EventSource` object. + // [spec] Once the user agent has failed the connection, it does not attempt to reconnect. + // [spec] > Implementations are especially encouraged to report detailed information + // [spec] > to their development consoles whenever an error event is fired, since little + // [spec] > to no information can be made available in the events themselves. + // Printing to console is not very programatically helpful, though, so we emit a custom event. + const errorEvent = new ErrorEvent('error', { code, message }) + + this.#onError?.(errorEvent) + this.dispatchEvent(errorEvent) + } + + /** + * Schedules a reconnection attempt against the EventSource endpoint. + * + * @param message - The error causing the connection to fail + * @param code - The HTTP status code, if available + * @internal + */ + #scheduleReconnect(message?: string, code?: number) { + // [spec] If the readyState attribute is set to CLOSED, abort the task. + if (this.#readyState === this.CLOSED) { + return + } + + // [spec] Set the readyState attribute to CONNECTING. + this.#readyState = this.CONNECTING + + // [spec] Fire an event named `error` at the EventSource object. + const errorEvent = new ErrorEvent('error', { code, message }) + this.#onError?.(errorEvent) + this.dispatchEvent(errorEvent) + + // [spec] Wait a delay equal to the reconnection time of the event source. + this.#reconnectTimer = setTimeout(this.#reconnect, this.#reconnectInterval) + } + + /** + * Reconnects to the EventSource endpoint after a disconnect/failure + * + * @internal + */ + #reconnect = () => { + this.#reconnectTimer = undefined + + // [spec] If the EventSource's readyState attribute is not set to CONNECTING, then return. + if (this.#readyState !== this.CONNECTING) { + return + } + + this.#connect() + } +} + +/** + * According to spec, when constructing a URL: + * > 1. Let baseURL be environment's base URL, if environment is a Document object + * > 2. Return the result of applying the URL parser to url, with baseURL. + * + * Thus we should use `document.baseURI` if available, since it can be set through a base tag. + * + * @returns The base URL, if available - otherwise `undefined` + * @internal + */ +function getBaseURL(): string | undefined { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const doc = 'document' in globalThis ? (globalThis as any).document : undefined + return doc && typeof doc === 'object' && 'baseURI' in doc && typeof doc.baseURI === 'string' + ? doc.baseURI + : undefined +} diff --git a/src/client/eventsource/index.ts b/src/client/eventsource/index.ts new file mode 100644 index 000000000..057eb1364 --- /dev/null +++ b/src/client/eventsource/index.ts @@ -0,0 +1,11 @@ +/**! + * EventSource v3.0.5 + * https://github.com/EventSource/eventsource + * + * Copyright (c) EventSource GitHub organisation + * Licensed under the MIT license. + * https://github.com/EventSource/eventsource/blob/main/LICENSE + */ +export { ErrorEvent } from './errors' +export { EventSource } from './eventsource' +export type * from './types' diff --git a/src/client/eventsource/types.ts b/src/client/eventsource/types.ts new file mode 100644 index 000000000..384be8d7e --- /dev/null +++ b/src/client/eventsource/types.ts @@ -0,0 +1,137 @@ +import type { ErrorEvent } from './errors' + +/** + * Stripped down version of `fetch()`, only defining the parts we care about. + * This ensures it should work with "most" fetch implementations. + * + * @public + */ +export type FetchLike = (url: string | URL, init?: FetchLikeInit) => Promise + +/** + * Stripped down version of `RequestInit`, only defining the parts we care about. + * + * @public + */ +export interface FetchLikeInit { + /** An AbortSignal to set request's signal. Typed as `any` because of polyfill inconsistencies. */ + // eslint-disable-next-line @typescript-eslint/no-explicit-any + signal?: { aborted: boolean } | any + + /** A Headers object, an object literal, or an array of two-item arrays to set request's headers. */ + headers?: Record + + /** A string to indicate whether the request will use CORS, or will be restricted to same-origin URLs. Sets request's mode. */ + mode?: 'cors' | 'no-cors' | 'same-origin' + + /** A string indicating whether credentials will be sent with the request always, never, or only when sent to a same-origin URL. Sets request's credentials. */ + credentials?: 'include' | 'omit' | 'same-origin' + + /** Controls how the request is cached. */ + cache?: 'no-store' + + /** A string indicating whether request follows redirects, results in an error upon encountering a redirect, or returns the redirect (in an opaque fashion). Sets request's redirect. */ + redirect?: 'error' | 'follow' | 'manual' +} + +/** + * Stripped down version of `ReadableStreamDefaultReader`, only defining the parts we care about. + * + * @public + */ +export interface ReaderLike { + read(): Promise<{ done: false; value: unknown } | { done: true; value?: undefined }> + cancel(): Promise +} + +/** + * Minimal version of the `Response` type returned by `fetch()`. + * + * @public + */ +export interface FetchLikeResponse { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + readonly body: { getReader(): ReaderLike } | Response['body'] | null + readonly url: string + readonly status: number + readonly redirected: boolean + readonly headers: { get(name: string): string | null } +} + +/** + * Mirrors the official DOM typings, with the exception of the extended ErrorEvent. + * + * @public + */ +export interface EventSourceEventMap { + error: ErrorEvent + message: MessageEvent + open: Event +} + +/** + * Mirrors the official DOM typings (for the most part) + * + * @public + */ +export interface EventSourceInit { + /** + * A boolean value, defaulting to `false`, indicating if CORS should be set to `include` credentials. + */ + withCredentials?: boolean + + /** + * Optional fetch implementation to use. Defaults to `globalThis.fetch`. + * Can also be used for advanced use cases like mocking, proxying, custom certs etc. + */ + fetch?: FetchLike +} + +/** + * Mirrors the official DOM typings (sorta). + * + * @public + */ +export interface EventListenerOptions { + /** Not directly used by Node.js. Added for API completeness. Default: `false`. */ + capture?: boolean +} + +/** + * Mirrors the official DOM typings (sorta). + * + * @public + */ +export interface AddEventListenerOptions extends EventListenerOptions { + /** When `true`, the listener is automatically removed when it is first invoked. Default: `false`. */ + once?: boolean + /** When `true`, serves as a hint that the listener will not call the `Event` object's `preventDefault()` method. Default: false. */ + passive?: boolean + /** The listener will be removed when the given AbortSignal object's `abort()` method is called. */ + signal?: AbortSignal +} + +/** + * Mirrors the official DOM typings. + * + * @public + */ +export type EventListenerOrEventListenerObject = EventListener | EventListenerObject + +/** + * Mirrors the official DOM typings. + * + * @public + */ +export interface EventListener { + (evt: Event | MessageEvent): void +} + +/** + * Mirrors the official DOM typings. + * + * @public + */ +export interface EventListenerObject { + handleEvent(object: Event): void +} diff --git a/src/client/types.ts b/src/client/types.ts index a7cbe2e6f..839fbf572 100644 --- a/src/client/types.ts +++ b/src/client/types.ts @@ -3,6 +3,7 @@ import type { HonoBase } from '../hono-base' import type { Endpoint, ResponseFormat, Schema } from '../types' import type { StatusCode, SuccessStatusCode } from '../utils/http-status' import type { HasRequiredKeys } from '../utils/types' +import { EventSource, EventSourceInit } from './eventsource' type HonoRequest = (typeof Hono.prototype)['request'] @@ -46,12 +47,14 @@ export type ClientRequest = { : {} : {} ) => URL -} & (S['$get'] extends { outputFormat: 'ws' } - ? S['$get'] extends { input: infer I } - ? { - $ws: (args?: I) => WebSocket - } - : {} +} & (S['$get'] extends { outputFormat: 'ws'; input: infer I } + ? { + $ws: (args?: I) => WebSocket + } + : S['$get'] extends { outputFormat: 'sse' } + ? { + $sse: (args?: EventSourceInit) => EventSource + } : {}) type ClientResponseOfEndpoint = T extends { diff --git a/src/helper/streaming/sse.ts b/src/helper/streaming/sse.ts index 8c10eae51..3e027eb8a 100644 --- a/src/helper/streaming/sse.ts +++ b/src/helper/streaming/sse.ts @@ -1,5 +1,7 @@ import type { Context } from '../../context' +import { TypedResponse } from '../../types' import { HtmlEscapedCallbackPhase, resolveCallback } from '../../utils/html' +import { StatusCode } from '../../utils/http-status' import { StreamingApi } from '../../utils/stream' import { isOldBunVersion } from './utils' @@ -63,11 +65,13 @@ const run = async ( const contextStash: WeakMap = new WeakMap() +type SSEResponse = Response & TypedResponse + export const streamSSE = ( c: Context, cb: (stream: SSEStreamingApi) => Promise, onError?: (e: Error, stream: SSEStreamingApi) => Promise -): Response => { +): SSEResponse => { const { readable, writable } = new TransformStream() const stream = new SSEStreamingApi(writable, readable) @@ -90,5 +94,5 @@ export const streamSSE = ( run(stream, cb, onError) - return c.newResponse(stream.responseReadable) + return c.newResponse(stream.responseReadable) as SSEResponse }