diff --git a/deno_dist/context.ts b/deno_dist/context.ts index 0605f0105b..4c928dd7e9 100644 --- a/deno_dist/context.ts +++ b/deno_dist/context.ts @@ -8,7 +8,7 @@ import type { StatusCode } from './utils/http-status.ts' import { StreamingApi } from './utils/stream.ts' import type { JSONValue, InterfaceToType, JSONParsed } from './utils/types.ts' -type HeaderRecord = Record +export type HeaderRecord = Record type Data = string | ArrayBuffer | ReadableStream export interface ExecutionContext { @@ -83,7 +83,7 @@ type ContextOptions = { notFoundHandler?: NotFoundHandler } -const TEXT_PLAIN = 'text/plain; charset=UTF-8' +export const TEXT_PLAIN = 'text/plain; charset=UTF-8' export class Context< // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -375,6 +375,9 @@ export class Context< return this.newResponse(null, status) } + /** @deprecated + * Use `streamText()` in `hono/helper/streaming` instead of `c.streamText()`. The `c.streamText()` will be removed in v4. + */ streamText = ( cb: (stream: StreamingApi) => Promise, arg?: StatusCode | ResponseInit, @@ -387,6 +390,9 @@ export class Context< return this.stream(cb, arg, headers) } + /** @deprecated + * Use `stream()` in `hono/helper/streaming` instead of `c.stream()`. The `c.stream()` will be removed in v4. + */ stream = ( cb: (stream: StreamingApi) => Promise, arg?: StatusCode | ResponseInit, diff --git a/deno_dist/helper/streaming/index.ts b/deno_dist/helper/streaming/index.ts index 503516af34..ae13edf192 100644 --- a/deno_dist/helper/streaming/index.ts +++ b/deno_dist/helper/streaming/index.ts @@ -1,59 +1,21 @@ -import type { Context } from '../../context.ts' +import { Context, HeaderRecord, TEXT_PLAIN } from '../../context.ts' +import { StatusCode } from '../../utils/http-status.ts' import { StreamingApi } from '../../utils/stream.ts' -interface SSEMessage { - data: string - event?: string - id?: string +export const stream = ( + c: Context, + cb: (stream: StreamingApi) => Promise, + arg?: StatusCode | ResponseInit, + headers?: HeaderRecord +): Response => { + const { readable, writable } = new TransformStream() + const stream = new StreamingApi(writable) + cb(stream).finally(() => stream.close()) + + return typeof arg === 'number' + ? c.newResponse(readable, arg, headers) + : c.newResponse(readable, arg) } -class SSEStreamingApi extends StreamingApi { - constructor(writable: WritableStream) { - super(writable) - } - - async writeSSE(message: SSEMessage) { - const data = message.data - .split('\n') - .map((line) => { - return `data: ${line}` - }) - .join('\n') - - const sseData = - [message.event && `event: ${message.event}`, data, message.id && `id: ${message.id}`] - .filter(Boolean) - .join('\n') + '\n\n' - - await this.write(sseData) - } -} - -const setSSEHeaders = (context: Context) => { - context.header('Transfer-Encoding', 'chunked') - context.header('Content-Type', 'text/event-stream') - context.header('Cache-Control', 'no-cache') - context.header('Connection', 'keep-alive') -} - -export const streamSSE = (c: Context, cb: (stream: SSEStreamingApi) => Promise) => { - return c.stream(async (originalStream: StreamingApi) => { - const { readable, writable } = new TransformStream() - const stream = new SSEStreamingApi(writable) - - originalStream.pipe(readable).catch((err) => { - console.error('Error in stream piping: ', err) - stream.close() - }) - - setSSEHeaders(c) - - try { - await cb(stream) - } catch (err) { - console.error('Error during streaming: ', err) - } finally { - await stream.close() - } - }) -} +export { streamSSE } from './sse.ts' +export { streamText } from './text.ts' diff --git a/deno_dist/helper/streaming/sse.ts b/deno_dist/helper/streaming/sse.ts new file mode 100644 index 0000000000..71a69ec012 --- /dev/null +++ b/deno_dist/helper/streaming/sse.ts @@ -0,0 +1,60 @@ +import { stream } from './index.ts' +import type { Context } from '../../context.ts' +import { StreamingApi } from '../../utils/stream.ts' + +interface SSEMessage { + data: string + event?: string + id?: string +} + +class SSEStreamingApi extends StreamingApi { + constructor(writable: WritableStream) { + super(writable) + } + + async writeSSE(message: SSEMessage) { + const data = message.data + .split('\n') + .map((line) => { + return `data: ${line}` + }) + .join('\n') + + const sseData = + [message.event && `event: ${message.event}`, data, message.id && `id: ${message.id}`] + .filter(Boolean) + .join('\n') + '\n\n' + + await this.write(sseData) + } +} + +const setSSEHeaders = (context: Context) => { + context.header('Transfer-Encoding', 'chunked') + context.header('Content-Type', 'text/event-stream') + context.header('Cache-Control', 'no-cache') + context.header('Connection', 'keep-alive') +} + +export const streamSSE = (c: Context, cb: (stream: SSEStreamingApi) => Promise) => { + return stream(c, async (originalStream: StreamingApi) => { + const { readable, writable } = new TransformStream() + const stream = new SSEStreamingApi(writable) + + originalStream.pipe(readable).catch((err) => { + console.error('Error in stream piping: ', err) + stream.close() + }) + + setSSEHeaders(c) + + try { + await cb(stream) + } catch (err) { + console.error('Error during streaming: ', err) + } finally { + await stream.close() + } + }) +} diff --git a/deno_dist/helper/streaming/text.ts b/deno_dist/helper/streaming/text.ts new file mode 100644 index 0000000000..5cba3f5719 --- /dev/null +++ b/deno_dist/helper/streaming/text.ts @@ -0,0 +1,17 @@ +import { stream } from './index.ts' +import { Context, HeaderRecord, TEXT_PLAIN } from '../../context.ts' +import { StatusCode } from '../../utils/http-status.ts' +import { StreamingApi } from '../../utils/stream.ts' + +export const streamText = ( + c: Context, + cb: (stream: StreamingApi) => Promise, + arg?: StatusCode | ResponseInit, + headers?: HeaderRecord +): Response => { + headers ??= {} + c.header('content-type', TEXT_PLAIN) + c.header('x-content-type-options', 'nosniff') + c.header('transfer-encoding', 'chunked') + return stream(c, cb, arg, headers) +}