diff --git a/deno_dist/context.ts b/deno_dist/context.ts index 0605f0105..756cc5a64 100644 --- a/deno_dist/context.ts +++ b/deno_dist/context.ts @@ -83,7 +83,7 @@ type ContextOptions = { notFoundHandler?: NotFoundHandler } -const TEXT_PLAIN = 'text/plain; charset=UTF-8' +export const TEXT_PLAIN = 'text/plain; charset=UTF-8' export class Context< // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -375,6 +375,9 @@ export class Context< return this.newResponse(null, status) } + /** @deprecated + * Use `streamText()` in `hono/streaming` instead of `c.streamText()`. The `c.streamText()` will be removed in v4. + */ streamText = ( cb: (stream: StreamingApi) => Promise, arg?: StatusCode | ResponseInit, @@ -387,6 +390,9 @@ export class Context< return this.stream(cb, arg, headers) } + /** @deprecated + * Use `stream()` in `hono/streaming` instead of `c.stream()`. The `c.stream()` will be removed in v4. + */ stream = ( cb: (stream: StreamingApi) => Promise, arg?: StatusCode | ResponseInit, diff --git a/deno_dist/helper/streaming/index.ts b/deno_dist/helper/streaming/index.ts index 503516af3..9a829577b 100644 --- a/deno_dist/helper/streaming/index.ts +++ b/deno_dist/helper/streaming/index.ts @@ -1,59 +1,12 @@ import type { Context } from '../../context.ts' import { StreamingApi } from '../../utils/stream.ts' -interface SSEMessage { - data: string - event?: string - id?: string +export const stream = (c: Context, cb: (stream: StreamingApi) => Promise): Response => { + const { readable, writable } = new TransformStream() + const stream = new StreamingApi(writable) + cb(stream).finally(() => stream.close()) + return c.newResponse(readable) } -class SSEStreamingApi extends StreamingApi { - constructor(writable: WritableStream) { - super(writable) - } - - async writeSSE(message: SSEMessage) { - const data = message.data - .split('\n') - .map((line) => { - return `data: ${line}` - }) - .join('\n') - - const sseData = - [message.event && `event: ${message.event}`, data, message.id && `id: ${message.id}`] - .filter(Boolean) - .join('\n') + '\n\n' - - await this.write(sseData) - } -} - -const setSSEHeaders = (context: Context) => { - context.header('Transfer-Encoding', 'chunked') - context.header('Content-Type', 'text/event-stream') - context.header('Cache-Control', 'no-cache') - context.header('Connection', 'keep-alive') -} - -export const streamSSE = (c: Context, cb: (stream: SSEStreamingApi) => Promise) => { - return c.stream(async (originalStream: StreamingApi) => { - const { readable, writable } = new TransformStream() - const stream = new SSEStreamingApi(writable) - - originalStream.pipe(readable).catch((err) => { - console.error('Error in stream piping: ', err) - stream.close() - }) - - setSSEHeaders(c) - - try { - await cb(stream) - } catch (err) { - console.error('Error during streaming: ', err) - } finally { - await stream.close() - } - }) -} +export { streamSSE } from './sse.ts' +export { streamText } from './text.ts' diff --git a/deno_dist/helper/streaming/sse.ts b/deno_dist/helper/streaming/sse.ts new file mode 100644 index 000000000..a4e083a6b --- /dev/null +++ b/deno_dist/helper/streaming/sse.ts @@ -0,0 +1,60 @@ +import type { Context } from '../../context.ts' +import { StreamingApi } from '../../utils/stream.ts' +import { stream } from './index.ts' + +interface SSEMessage { + data: string + event?: string + id?: string +} + +class SSEStreamingApi extends StreamingApi { + constructor(writable: WritableStream) { + super(writable) + } + + async writeSSE(message: SSEMessage) { + const data = message.data + .split('\n') + .map((line) => { + return `data: ${line}` + }) + .join('\n') + + const sseData = + [message.event && `event: ${message.event}`, data, message.id && `id: ${message.id}`] + .filter(Boolean) + .join('\n') + '\n\n' + + await this.write(sseData) + } +} + +const setSSEHeaders = (context: Context) => { + context.header('Transfer-Encoding', 'chunked') + context.header('Content-Type', 'text/event-stream') + context.header('Cache-Control', 'no-cache') + context.header('Connection', 'keep-alive') +} + +export const streamSSE = (c: Context, cb: (stream: SSEStreamingApi) => Promise) => { + return stream(c, async (originalStream: StreamingApi) => { + const { readable, writable } = new TransformStream() + const stream = new SSEStreamingApi(writable) + + originalStream.pipe(readable).catch((err) => { + console.error('Error in stream piping: ', err) + stream.close() + }) + + setSSEHeaders(c) + + try { + await cb(stream) + } catch (err) { + console.error('Error during streaming: ', err) + } finally { + await stream.close() + } + }) +} diff --git a/deno_dist/helper/streaming/text.ts b/deno_dist/helper/streaming/text.ts new file mode 100644 index 000000000..7f6a978c3 --- /dev/null +++ b/deno_dist/helper/streaming/text.ts @@ -0,0 +1,11 @@ +import type { Context } from '../../context.ts' +import { TEXT_PLAIN } from '../../context.ts' +import type { StreamingApi } from '../../utils/stream.ts' +import { stream } from './index.ts' + +export const streamText = (c: Context, cb: (stream: StreamingApi) => Promise): Response => { + c.header('Content-Type', TEXT_PLAIN) + c.header('X-Content-Type-Options', 'nosniff') + c.header('Transfer-Encoding', 'chunked') + return stream(c, cb) +} diff --git a/src/context.ts b/src/context.ts index 2a8f603d2..88bd1f1f0 100644 --- a/src/context.ts +++ b/src/context.ts @@ -83,7 +83,7 @@ type ContextOptions = { notFoundHandler?: NotFoundHandler } -const TEXT_PLAIN = 'text/plain; charset=UTF-8' +export const TEXT_PLAIN = 'text/plain; charset=UTF-8' export class Context< // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -375,6 +375,9 @@ export class Context< return this.newResponse(null, status) } + /** @deprecated + * Use `streamText()` in `hono/streaming` instead of `c.streamText()`. The `c.streamText()` will be removed in v4. + */ streamText = ( cb: (stream: StreamingApi) => Promise, arg?: StatusCode | ResponseInit, @@ -387,6 +390,9 @@ export class Context< return this.stream(cb, arg, headers) } + /** @deprecated + * Use `stream()` in `hono/streaming` instead of `c.stream()`. The `c.stream()` will be removed in v4. + */ stream = ( cb: (stream: StreamingApi) => Promise, arg?: StatusCode | ResponseInit, diff --git a/src/helper/streaming/index.test.ts b/src/helper/streaming/index.test.ts index e6c9ad157..c834c7df6 100644 --- a/src/helper/streaming/index.test.ts +++ b/src/helper/streaming/index.test.ts @@ -1,45 +1,28 @@ -import { Hono } from '../../hono' -import { streamSSE } from '.' +import { Context } from '../../context' +import { HonoRequest } from '../../request' +import { stream } from '.' -describe('SSE Streaming headers', () => { - it('Check SSE Response', async () => { - const app = new Hono() - app.get('/sse', async (c) => { - return streamSSE(c, async (stream) => { - let id = 0 - const maxIterations = 5 +describe('Basic Streaming Helper', () => { + const req = new HonoRequest(new Request('http://localhost/')) + let c: Context + beforeEach(() => { + c = new Context(req) + }) - while (id < maxIterations) { - const message = `Message\nIt is ${id}` - await stream.writeSSE({ data: message, event: 'time-update', id: String(id++) }) - await stream.sleep(100) - } - }) + it('Check SSE Response', async () => { + const res = stream(c, async (stream) => { + for (let i = 0; i < 3; i++) { + await stream.write(new Uint8Array([i])) + await stream.sleep(1) + } }) - - const res = await app.request('/sse') - expect(res).not.toBeNull() - expect(res.status).toBe(200) - expect(res.headers.get('Transfer-Encoding')).toEqual('chunked') - expect(res.headers.get('Content-Type')).toEqual('text/event-stream') - expect(res.headers.get('Cache-Control')).toEqual('no-cache') - expect(res.headers.get('Connection')).toEqual('keep-alive') - if (!res.body) { throw new Error('Body is null') } const reader = res.body.getReader() - const decoder = new TextDecoder() - for (let i = 0; i < 5; i++) { + for (let i = 0; i < 3; i++) { const { value } = await reader.read() - const decodedValue = decoder.decode(value) - - // Check the structure and content of the SSE message - let expectedValue = 'event: time-update\n' - expectedValue += 'data: Message\n' - expectedValue += `data: It is ${i}\n` - expectedValue += `id: ${i}\n\n` - expect(decodedValue).toBe(expectedValue) + expect(value).toEqual(new Uint8Array([i])) } }) }) diff --git a/src/helper/streaming/index.ts b/src/helper/streaming/index.ts index 669c5e399..65bee4322 100644 --- a/src/helper/streaming/index.ts +++ b/src/helper/streaming/index.ts @@ -1,59 +1,12 @@ import type { Context } from '../../context' import { StreamingApi } from '../../utils/stream' -interface SSEMessage { - data: string - event?: string - id?: string +export const stream = (c: Context, cb: (stream: StreamingApi) => Promise): Response => { + const { readable, writable } = new TransformStream() + const stream = new StreamingApi(writable) + cb(stream).finally(() => stream.close()) + return c.newResponse(readable) } -class SSEStreamingApi extends StreamingApi { - constructor(writable: WritableStream) { - super(writable) - } - - async writeSSE(message: SSEMessage) { - const data = message.data - .split('\n') - .map((line) => { - return `data: ${line}` - }) - .join('\n') - - const sseData = - [message.event && `event: ${message.event}`, data, message.id && `id: ${message.id}`] - .filter(Boolean) - .join('\n') + '\n\n' - - await this.write(sseData) - } -} - -const setSSEHeaders = (context: Context) => { - context.header('Transfer-Encoding', 'chunked') - context.header('Content-Type', 'text/event-stream') - context.header('Cache-Control', 'no-cache') - context.header('Connection', 'keep-alive') -} - -export const streamSSE = (c: Context, cb: (stream: SSEStreamingApi) => Promise) => { - return c.stream(async (originalStream: StreamingApi) => { - const { readable, writable } = new TransformStream() - const stream = new SSEStreamingApi(writable) - - originalStream.pipe(readable).catch((err) => { - console.error('Error in stream piping: ', err) - stream.close() - }) - - setSSEHeaders(c) - - try { - await cb(stream) - } catch (err) { - console.error('Error during streaming: ', err) - } finally { - await stream.close() - } - }) -} +export { streamSSE } from './sse' +export { streamText } from './text' diff --git a/src/helper/streaming/sse.test.ts b/src/helper/streaming/sse.test.ts new file mode 100644 index 000000000..a6dc9b3a4 --- /dev/null +++ b/src/helper/streaming/sse.test.ts @@ -0,0 +1,48 @@ +import { Context } from '../../context' +import { HonoRequest } from '../../request' +import { streamSSE } from '.' + +describe('SSE Streaming helper', () => { + const req = new HonoRequest(new Request('http://localhost/')) + let c: Context + beforeEach(() => { + c = new Context(req) + }) + + it('Check streamSSE Response', async () => { + const res = streamSSE(c, async (stream) => { + let id = 0 + const maxIterations = 5 + + while (id < maxIterations) { + const message = `Message\nIt is ${id}` + await stream.writeSSE({ data: message, event: 'time-update', id: String(id++) }) + await stream.sleep(100) + } + }) + + expect(res).not.toBeNull() + expect(res.status).toBe(200) + expect(res.headers.get('Transfer-Encoding')).toEqual('chunked') + expect(res.headers.get('Content-Type')).toEqual('text/event-stream') + expect(res.headers.get('Cache-Control')).toEqual('no-cache') + expect(res.headers.get('Connection')).toEqual('keep-alive') + + if (!res.body) { + throw new Error('Body is null') + } + const reader = res.body.getReader() + const decoder = new TextDecoder() + for (let i = 0; i < 5; i++) { + const { value } = await reader.read() + const decodedValue = decoder.decode(value) + + // Check the structure and content of the SSE message + let expectedValue = 'event: time-update\n' + expectedValue += 'data: Message\n' + expectedValue += `data: It is ${i}\n` + expectedValue += `id: ${i}\n\n` + expect(decodedValue).toBe(expectedValue) + } + }) +}) diff --git a/src/helper/streaming/sse.ts b/src/helper/streaming/sse.ts new file mode 100644 index 000000000..0f43287ce --- /dev/null +++ b/src/helper/streaming/sse.ts @@ -0,0 +1,60 @@ +import type { Context } from '../../context' +import { StreamingApi } from '../../utils/stream' +import { stream } from '.' + +interface SSEMessage { + data: string + event?: string + id?: string +} + +class SSEStreamingApi extends StreamingApi { + constructor(writable: WritableStream) { + super(writable) + } + + async writeSSE(message: SSEMessage) { + const data = message.data + .split('\n') + .map((line) => { + return `data: ${line}` + }) + .join('\n') + + const sseData = + [message.event && `event: ${message.event}`, data, message.id && `id: ${message.id}`] + .filter(Boolean) + .join('\n') + '\n\n' + + await this.write(sseData) + } +} + +const setSSEHeaders = (context: Context) => { + context.header('Transfer-Encoding', 'chunked') + context.header('Content-Type', 'text/event-stream') + context.header('Cache-Control', 'no-cache') + context.header('Connection', 'keep-alive') +} + +export const streamSSE = (c: Context, cb: (stream: SSEStreamingApi) => Promise) => { + return stream(c, async (originalStream: StreamingApi) => { + const { readable, writable } = new TransformStream() + const stream = new SSEStreamingApi(writable) + + originalStream.pipe(readable).catch((err) => { + console.error('Error in stream piping: ', err) + stream.close() + }) + + setSSEHeaders(c) + + try { + await cb(stream) + } catch (err) { + console.error('Error during streaming: ', err) + } finally { + await stream.close() + } + }) +} diff --git a/src/helper/streaming/text.test.ts b/src/helper/streaming/text.test.ts new file mode 100644 index 000000000..a81584daf --- /dev/null +++ b/src/helper/streaming/text.test.ts @@ -0,0 +1,35 @@ +import { Context } from '../../context' +import { HonoRequest } from '../../request' +import { streamText } from '.' + +describe('Text Streaming Helper', () => { + const req = new HonoRequest(new Request('http://localhost/')) + let c: Context + beforeEach(() => { + c = new Context(req) + }) + + it('Check streamText Response', async () => { + const res = streamText(c, async (stream) => { + for (let i = 0; i < 3; i++) { + await stream.write(`${i}`) + await stream.sleep(1) + } + }) + + expect(res.status).toBe(200) + expect(res.headers.get('content-type')).toMatch(/^text\/plain/) + expect(res.headers.get('x-content-type-options')).toBe('nosniff') + expect(res.headers.get('transfer-encoding')).toBe('chunked') + + if (!res.body) { + throw new Error('Body is null') + } + const reader = res.body.getReader() + const decoder = new TextDecoder() + for (let i = 0; i < 3; i++) { + const { value } = await reader.read() + expect(decoder.decode(value)).toEqual(`${i}`) + } + }) +}) diff --git a/src/helper/streaming/text.ts b/src/helper/streaming/text.ts new file mode 100644 index 000000000..69a321a67 --- /dev/null +++ b/src/helper/streaming/text.ts @@ -0,0 +1,11 @@ +import type { Context } from '../../context' +import { TEXT_PLAIN } from '../../context' +import type { StreamingApi } from '../../utils/stream' +import { stream } from '.' + +export const streamText = (c: Context, cb: (stream: StreamingApi) => Promise): Response => { + c.header('Content-Type', TEXT_PLAIN) + c.header('X-Content-Type-Options', 'nosniff') + c.header('Transfer-Encoding', 'chunked') + return stream(c, cb) +}