Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(client/POC): RPC support for SSE #3957

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions src/client/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import { validator } from '../validator'
import { hc } from './client'
import type { ClientResponse, InferRequestType, InferResponseType } from './types'
import { streamSSE } from '../helper/streaming'

Check failure on line 14 in src/client/client.test.ts

View workflow job for this annotation

GitHub Actions / Main

`../helper/streaming` import should occur before import of `../hono`
import { EventSource, EventSourceInit } from './eventsource'

Check failure on line 15 in src/client/client.test.ts

View workflow job for this annotation

GitHub Actions / Main

`./eventsource` import should occur before type import of `./types`

Check failure on line 15 in src/client/client.test.ts

View workflow job for this annotation

GitHub Actions / Main

All imports in the declaration are only used as types. Use `import type`

describe('Basic - JSON', () => {
const app = new Hono()
Expand Down Expand Up @@ -1335,3 +1337,36 @@
expect(webSocketMock).toHaveBeenCalledWith(expectedUrl, undefined)
})
})

describe('SSE Provider Integration', () => {
it('should initialize the SSE provider correctly', async () => {
const app = new Hono()

const route = app.get('/sse-endpoint', (c) =>
streamSSE(c, async (stream) => {
await stream.writeSSE({
data: 'Hello from server!',
event: 'message',
})
})
)

type AppType = typeof route

const client = hc<AppType>('http://localhost')
assertType<(args: EventSourceInit) => EventSource>(client['sse-endpoint'].$sse)
const es = client['sse-endpoint'].$sse()

expect(es.url).toBe('http://localhost/sse-endpoint')

const eventData = await new Promise<MessageEvent>((resolve) => {
es.onmessage = (data) => {
expect(data).toBeInstanceOf(MessageEvent)
resolve(data)
}
})

expect(eventData.data).toBe('Hello from server!')
expect(eventData.type).toBe('message')
})
})
4 changes: 4 additions & 0 deletions src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { Hono } from '../hono'
import type { FormValue, ValidationTargets } from '../types'
import { serialize } from '../utils/cookie'
import type { UnionToIntersection } from '../utils/types'
import { EventSource } from './eventsource'
import type { Callback, Client, ClientRequestOptions } from './types'
import {
buildSearchParams,
Expand Down Expand Up @@ -193,6 +194,9 @@ export const hc = <T extends Hono<any, any, any>>(

return establishWebSocket(targetUrl.toString())
}
if (method === 'sse') {
return new EventSource(url, opts.args[0])
}

const req = new ClientRequestImpl(url, method)
if (method) {
Expand Down
44 changes: 44 additions & 0 deletions src/client/eventsource-parser/errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/**
* The type of error that occurred.
* @public
*/
export type ErrorType = 'invalid-retry' | 'unknown-field'

/**
* Error thrown when encountering an issue during parsing.
*
* @public
*/
export class ParseError extends Error {
/**
* The type of error that occurred.
*/
type: ErrorType

/**
* In the case of an unknown field encountered in the stream, this will be the field name.
*/
field?: string

/**
* In the case of an unknown field encountered in the stream, this will be the value of the field.
*/
value?: string

/**
* The line that caused the error, if available.
*/
line?: string

constructor(
message: string,
options: { type: ErrorType; field?: string; value?: string; line?: string }
) {
super(message)
this.name = 'ParseError'
this.type = options.type
this.field = options.field
this.value = options.value
this.line = options.line
}
}
11 changes: 11 additions & 0 deletions src/client/eventsource-parser/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/**!
* EventSource Parser v3.0.0
* https://github.com/rexxars/eventsource-parser
*
* Copyright (c) 2024 Espen Hovlandsdal <espen@hovlandsdal.com>
* Licensed under the MIT license.
* https://github.com/rexxars/eventsource-parser/blob/main/LICENSE
*/
export { type ErrorType, ParseError } from './errors'
export { createParser } from './parse'
export type { EventSourceMessage, EventSourceParser, ParserCallbacks } from './types.ts'
214 changes: 214 additions & 0 deletions src/client/eventsource-parser/parse.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
/**
* EventSource/Server-Sent Events parser
* @see https://html.spec.whatwg.org/multipage/server-sent-events.html
*/
import { ParseError } from './errors'
import type { EventSourceParser, ParserCallbacks } from './types'

// eslint-disable-next-line @typescript-eslint/no-unused-vars
function noop(_arg: unknown) {
// intentional noop
}

/**
* Creates a new EventSource parser.
*
* @param callbacks - Callbacks to invoke on different parsing events:
* - `onEvent` when a new event is parsed
* - `onError` when an error occurs
* - `onRetry` when a new reconnection interval has been sent from the server
* - `onComment` when a comment is encountered in the stream
*
* @returns A new EventSource parser, with `parse` and `reset` methods.
* @public
*/
export function createParser(callbacks: ParserCallbacks): EventSourceParser {
if (typeof callbacks === 'function') {
throw new TypeError(
'`callbacks` must be an object, got a function instead. Did you mean `{onEvent: fn}`?'
)
}

const { onEvent = noop, onError = noop, onRetry = noop, onComment } = callbacks

let incompleteLine = ''

let isFirstChunk = true
let id: string | undefined
let data = ''
let eventType = ''

function feed(newChunk: string) {
// Strip any UTF8 byte order mark (BOM) at the start of the stream
const chunk = isFirstChunk ? newChunk.replace(/^\xEF\xBB\xBF/, '') : newChunk

// If there was a previous incomplete line, append it to the new chunk,
// so we may process it together as a new (hopefully complete) chunk.
const [complete, incomplete] = splitLines(`${incompleteLine}${chunk}`)

for (const line of complete) {
parseLine(line)
}

incompleteLine = incomplete
isFirstChunk = false
}

function parseLine(line: string) {
// If the line is empty (a blank line), dispatch the event
if (line === '') {
dispatchEvent()
return
}

// If the line starts with a U+003A COLON character (:), ignore the line.
if (line.startsWith(':')) {
if (onComment) {
onComment(line.slice(line.startsWith(': ') ? 2 : 1))
}
return
}

// If the line contains a U+003A COLON character (:)
const fieldSeparatorIndex = line.indexOf(':')
if (fieldSeparatorIndex !== -1) {
// Collect the characters on the line before the first U+003A COLON character (:),
// and let `field` be that string.
const field = line.slice(0, fieldSeparatorIndex)

// Collect the characters on the line after the first U+003A COLON character (:),
// and let `value` be that string. If value starts with a U+0020 SPACE character,
// remove it from value.
const offset = line[fieldSeparatorIndex + 1] === ' ' ? 2 : 1
const value = line.slice(fieldSeparatorIndex + offset)

processField(field, value, line)
return
}

// Otherwise, the string is not empty but does not contain a U+003A COLON character (:)
// Process the field using the whole line as the field name, and an empty string as the field value.
// 👆 This is according to spec. That means that a line that has the value `data` will result in
// a newline being added to the current `data` buffer, for instance.
processField(line, '', line)
}

function processField(field: string, value: string, line: string) {
// Field names must be compared literally, with no case folding performed.
switch (field) {
case 'event':
// Set the `event type` buffer to field value
eventType = value
break
case 'data':
// Append the field value to the `data` buffer, then append a single U+000A LINE FEED(LF)
// character to the `data` buffer.
data = `${data}${value}\n`
break
case 'id':
// If the field value does not contain U+0000 NULL, then set the `ID` buffer to
// the field value. Otherwise, ignore the field.
id = value.includes('\0') ? undefined : value
break
case 'retry':
// If the field value consists of only ASCII digits, then interpret the field value as an
// integer in base ten, and set the event stream's reconnection time to that integer.
// Otherwise, ignore the field.
if (/^\d+$/.test(value)) {
onRetry(parseInt(value, 10))
} else {
onError(
new ParseError(`Invalid \`retry\` value: "${value}"`, {
type: 'invalid-retry',
value,
line,
})
)
}
break
default:
// Otherwise, the field is ignored.
onError(
new ParseError(
`Unknown field "${field.length > 20 ? `${field.slice(0, 20)}…` : field}"`,
{ type: 'unknown-field', field, value, line }
)
)
break
}
}

function dispatchEvent() {
const shouldDispatch = data.length > 0
if (shouldDispatch) {
onEvent({
id,
event: eventType || undefined,
// If the data buffer's last character is a U+000A LINE FEED (LF) character,
// then remove the last character from the data buffer.
data: data.endsWith('\n') ? data.slice(0, -1) : data,
})
}

// Reset for the next event
id = undefined
data = ''
eventType = ''
}

function reset(options: { consume?: boolean } = {}) {
if (incompleteLine && options.consume) {
parseLine(incompleteLine)
}

id = undefined
data = ''
eventType = ''
incompleteLine = ''
}

return { feed, reset }
}

/**
* For the given `chunk`, split it into lines according to spec, and return any remaining incomplete line.
*
* @param chunk - The chunk to split into lines
* @returns A tuple containing an array of complete lines, and any remaining incomplete line
* @internal
*/
function splitLines(chunk: string): [Array<string>, string] {
/**
* According to the spec, a line is terminated by either:
* - U+000D CARRIAGE RETURN U+000A LINE FEED (CRLF) character pair
* - a single U+000A LINE FEED(LF) character not preceded by a U+000D CARRIAGE RETURN(CR) character
* - a single U+000D CARRIAGE RETURN(CR) character not followed by a U+000A LINE FEED(LF) character
*/

const lines: Array<string> = []
let incompleteLine = ''

const totalLength = chunk.length
for (let i = 0; i < totalLength; i++) {
const char = chunk[i]

if (char === '\r' && chunk[i + 1] === '\n') {
// CRLF
lines.push(incompleteLine)
incompleteLine = ''
i++ // Skip the LF character
} else if (char === '\r') {
// Standalone CR
lines.push(incompleteLine)
incompleteLine = ''
} else if (char === '\n') {
// Standalone LF
lines.push(incompleteLine)
incompleteLine = ''
} else {
incompleteLine += char
}
}

return [lines, incompleteLine]
}
Loading
Loading