Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: move c.stream* to helper #1846

Merged
merged 13 commits into from
Dec 26, 2023
Merged
8 changes: 7 additions & 1 deletion deno_dist/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type ContextOptions<E extends Env> = {
notFoundHandler?: NotFoundHandler<E>
}

const TEXT_PLAIN = 'text/plain; charset=UTF-8'
export const TEXT_PLAIN = 'text/plain; charset=UTF-8'

export class Context<
// eslint-disable-next-line @typescript-eslint/no-explicit-any
Expand Down Expand Up @@ -375,6 +375,9 @@ export class Context<
return this.newResponse(null, status)
}

/** @deprecated
* Use `streamText()` in `hono/streaming` instead of `c.streamText()`. The `c.streamText()` will be removed in v4.
*/
streamText = (
cb: (stream: StreamingApi) => Promise<void>,
arg?: StatusCode | ResponseInit,
Expand All @@ -387,6 +390,9 @@ export class Context<
return this.stream(cb, arg, headers)
}

/** @deprecated
* Use `stream()` in `hono/streaming` instead of `c.stream()`. The `c.stream()` will be removed in v4.
*/
stream = (
cb: (stream: StreamingApi) => Promise<void>,
arg?: StatusCode | ResponseInit,
Expand Down
61 changes: 7 additions & 54 deletions deno_dist/helper/streaming/index.ts
Original file line number Diff line number Diff line change
@@ -1,59 +1,12 @@
import type { Context } from '../../context.ts'
import { StreamingApi } from '../../utils/stream.ts'

interface SSEMessage {
data: string
event?: string
id?: string
export const stream = (c: Context, cb: (stream: StreamingApi) => Promise<void>): Response => {
const { readable, writable } = new TransformStream()
const stream = new StreamingApi(writable)
cb(stream).finally(() => stream.close())
return c.newResponse(readable)
}

class SSEStreamingApi extends StreamingApi {
constructor(writable: WritableStream) {
super(writable)
}

async writeSSE(message: SSEMessage) {
const data = message.data
.split('\n')
.map((line) => {
return `data: ${line}`
})
.join('\n')

const sseData =
[message.event && `event: ${message.event}`, data, message.id && `id: ${message.id}`]
.filter(Boolean)
.join('\n') + '\n\n'

await this.write(sseData)
}
}

const setSSEHeaders = (context: Context) => {
context.header('Transfer-Encoding', 'chunked')
context.header('Content-Type', 'text/event-stream')
context.header('Cache-Control', 'no-cache')
context.header('Connection', 'keep-alive')
}

export const streamSSE = (c: Context, cb: (stream: SSEStreamingApi) => Promise<void>) => {
return c.stream(async (originalStream: StreamingApi) => {
const { readable, writable } = new TransformStream()
const stream = new SSEStreamingApi(writable)

originalStream.pipe(readable).catch((err) => {
console.error('Error in stream piping: ', err)
stream.close()
})

setSSEHeaders(c)

try {
await cb(stream)
} catch (err) {
console.error('Error during streaming: ', err)
} finally {
await stream.close()
}
})
}
export { streamSSE } from './sse.ts'
export { streamText } from './text.ts'
60 changes: 60 additions & 0 deletions deno_dist/helper/streaming/sse.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import type { Context } from '../../context.ts'
import { StreamingApi } from '../../utils/stream.ts'
import { stream } from './index.ts'

interface SSEMessage {
data: string
event?: string
id?: string
}

class SSEStreamingApi extends StreamingApi {
constructor(writable: WritableStream) {
super(writable)
}

async writeSSE(message: SSEMessage) {
const data = message.data
.split('\n')
.map((line) => {
return `data: ${line}`
})
.join('\n')

const sseData =
[message.event && `event: ${message.event}`, data, message.id && `id: ${message.id}`]
.filter(Boolean)
.join('\n') + '\n\n'

await this.write(sseData)
}
}

const setSSEHeaders = (context: Context) => {
context.header('Transfer-Encoding', 'chunked')
context.header('Content-Type', 'text/event-stream')
context.header('Cache-Control', 'no-cache')
context.header('Connection', 'keep-alive')
}

export const streamSSE = (c: Context, cb: (stream: SSEStreamingApi) => Promise<void>) => {
return stream(c, async (originalStream: StreamingApi) => {
const { readable, writable } = new TransformStream()
const stream = new SSEStreamingApi(writable)

originalStream.pipe(readable).catch((err) => {
console.error('Error in stream piping: ', err)
stream.close()
})

setSSEHeaders(c)

try {
await cb(stream)
} catch (err) {
console.error('Error during streaming: ', err)
} finally {
await stream.close()
}
})
}
11 changes: 11 additions & 0 deletions deno_dist/helper/streaming/text.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import type { Context } from '../../context.ts'
import { TEXT_PLAIN } from '../../context.ts'
import type { StreamingApi } from '../../utils/stream.ts'
import { stream } from './index.ts'

export const streamText = (c: Context, cb: (stream: StreamingApi) => Promise<void>): Response => {
c.header('Content-Type', TEXT_PLAIN)
c.header('X-Content-Type-Options', 'nosniff')
c.header('Transfer-Encoding', 'chunked')
return stream(c, cb)
}
8 changes: 7 additions & 1 deletion src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type ContextOptions<E extends Env> = {
notFoundHandler?: NotFoundHandler<E>
}

const TEXT_PLAIN = 'text/plain; charset=UTF-8'
export const TEXT_PLAIN = 'text/plain; charset=UTF-8'

export class Context<
// eslint-disable-next-line @typescript-eslint/no-explicit-any
Expand Down Expand Up @@ -375,6 +375,9 @@ export class Context<
return this.newResponse(null, status)
}

/** @deprecated
* Use `streamText()` in `hono/streaming` instead of `c.streamText()`. The `c.streamText()` will be removed in v4.
*/
streamText = (
cb: (stream: StreamingApi) => Promise<void>,
arg?: StatusCode | ResponseInit,
Expand All @@ -387,6 +390,9 @@ export class Context<
return this.stream(cb, arg, headers)
}

/** @deprecated
* Use `stream()` in `hono/streaming` instead of `c.stream()`. The `c.stream()` will be removed in v4.
*/
stream = (
cb: (stream: StreamingApi) => Promise<void>,
arg?: StatusCode | ResponseInit,
Expand Down
51 changes: 17 additions & 34 deletions src/helper/streaming/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,45 +1,28 @@
import { Hono } from '../../hono'
import { streamSSE } from '.'
import { Context } from '../../context'
import { HonoRequest } from '../../request'
import { stream } from '.'

describe('SSE Streaming headers', () => {
it('Check SSE Response', async () => {
const app = new Hono()
app.get('/sse', async (c) => {
return streamSSE(c, async (stream) => {
let id = 0
const maxIterations = 5
describe('Basic Streaming Helper', () => {
const req = new HonoRequest(new Request('http://localhost/'))
let c: Context
beforeEach(() => {
c = new Context(req)
})

while (id < maxIterations) {
const message = `Message\nIt is ${id}`
await stream.writeSSE({ data: message, event: 'time-update', id: String(id++) })
await stream.sleep(100)
}
})
it('Check SSE Response', async () => {
const res = stream(c, async (stream) => {
for (let i = 0; i < 3; i++) {
await stream.write(new Uint8Array([i]))
await stream.sleep(1)
}
})

const res = await app.request('/sse')
expect(res).not.toBeNull()
expect(res.status).toBe(200)
expect(res.headers.get('Transfer-Encoding')).toEqual('chunked')
expect(res.headers.get('Content-Type')).toEqual('text/event-stream')
expect(res.headers.get('Cache-Control')).toEqual('no-cache')
expect(res.headers.get('Connection')).toEqual('keep-alive')

if (!res.body) {
throw new Error('Body is null')
}
const reader = res.body.getReader()
const decoder = new TextDecoder()
for (let i = 0; i < 5; i++) {
for (let i = 0; i < 3; i++) {
const { value } = await reader.read()
const decodedValue = decoder.decode(value)

// Check the structure and content of the SSE message
let expectedValue = 'event: time-update\n'
expectedValue += 'data: Message\n'
expectedValue += `data: It is ${i}\n`
expectedValue += `id: ${i}\n\n`
expect(decodedValue).toBe(expectedValue)
expect(value).toEqual(new Uint8Array([i]))
}
})
})
61 changes: 7 additions & 54 deletions src/helper/streaming/index.ts
Original file line number Diff line number Diff line change
@@ -1,59 +1,12 @@
import type { Context } from '../../context'
import { StreamingApi } from '../../utils/stream'

interface SSEMessage {
data: string
event?: string
id?: string
export const stream = (c: Context, cb: (stream: StreamingApi) => Promise<void>): Response => {
const { readable, writable } = new TransformStream()
const stream = new StreamingApi(writable)
cb(stream).finally(() => stream.close())
return c.newResponse(readable)
}

class SSEStreamingApi extends StreamingApi {
constructor(writable: WritableStream) {
super(writable)
}

async writeSSE(message: SSEMessage) {
const data = message.data
.split('\n')
.map((line) => {
return `data: ${line}`
})
.join('\n')

const sseData =
[message.event && `event: ${message.event}`, data, message.id && `id: ${message.id}`]
.filter(Boolean)
.join('\n') + '\n\n'

await this.write(sseData)
}
}

const setSSEHeaders = (context: Context) => {
context.header('Transfer-Encoding', 'chunked')
context.header('Content-Type', 'text/event-stream')
context.header('Cache-Control', 'no-cache')
context.header('Connection', 'keep-alive')
}

export const streamSSE = (c: Context, cb: (stream: SSEStreamingApi) => Promise<void>) => {
return c.stream(async (originalStream: StreamingApi) => {
const { readable, writable } = new TransformStream()
const stream = new SSEStreamingApi(writable)

originalStream.pipe(readable).catch((err) => {
console.error('Error in stream piping: ', err)
stream.close()
})

setSSEHeaders(c)

try {
await cb(stream)
} catch (err) {
console.error('Error during streaming: ', err)
} finally {
await stream.close()
}
})
}
export { streamSSE } from './sse'
export { streamText } from './text'
48 changes: 48 additions & 0 deletions src/helper/streaming/sse.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import { Context } from '../../context'
import { HonoRequest } from '../../request'
import { streamSSE } from '.'

describe('SSE Streaming helper', () => {
const req = new HonoRequest(new Request('http://localhost/'))
let c: Context
beforeEach(() => {
c = new Context(req)
})

it('Check streamSSE Response', async () => {
const res = streamSSE(c, async (stream) => {
let id = 0
const maxIterations = 5

while (id < maxIterations) {
const message = `Message\nIt is ${id}`
await stream.writeSSE({ data: message, event: 'time-update', id: String(id++) })
await stream.sleep(100)
}
})

expect(res).not.toBeNull()
expect(res.status).toBe(200)
expect(res.headers.get('Transfer-Encoding')).toEqual('chunked')
expect(res.headers.get('Content-Type')).toEqual('text/event-stream')
expect(res.headers.get('Cache-Control')).toEqual('no-cache')
expect(res.headers.get('Connection')).toEqual('keep-alive')

if (!res.body) {
throw new Error('Body is null')
}
const reader = res.body.getReader()
const decoder = new TextDecoder()
for (let i = 0; i < 5; i++) {
const { value } = await reader.read()
const decodedValue = decoder.decode(value)

// Check the structure and content of the SSE message
let expectedValue = 'event: time-update\n'
expectedValue += 'data: Message\n'
expectedValue += `data: It is ${i}\n`
expectedValue += `id: ${i}\n\n`
expect(decodedValue).toBe(expectedValue)
}
})
})
Loading
Loading