-
-
Notifications
You must be signed in to change notification settings - Fork 292
/
events.ts
61 lines (53 loc) · 2.53 KB
/
events.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import {ChainForkConfig} from "@lodestar/config";
import {Api, BeaconEvent, routesData, getEventSerdes} from "../routes/events.js";
import {stringifyQuery} from "../../utils/client/format.js";
import {getEventSource} from "../../utils/client/eventSource.js";
import {HttpStatusCode} from "../../utils/client/httpStatusCode.js";
/**
* REST HTTP client for events routes
*/
export function getClient(config: ChainForkConfig, baseUrl: string): Api {
const eventSerdes = getEventSerdes(config);
return {
eventstream: async (topics, signal, onEvent) => {
const query = stringifyQuery({topics});
// TODO: Use a proper URL formatter
const url = `${baseUrl}${routesData.eventstream.url}?${query}`;
// eslint-disable-next-line @typescript-eslint/naming-convention
const EventSource = await getEventSource();
const eventSource = new EventSource(url);
try {
await new Promise<void>((resolve, reject) => {
for (const topic of topics) {
eventSource.addEventListener(topic, ((event: MessageEvent) => {
const message = eventSerdes.fromJson(topic, JSON.parse(event.data));
onEvent({type: topic, message} as BeaconEvent);
}) as EventListener);
}
// EventSource will try to reconnect always on all errors
// `eventSource.onerror` events are informative but don't indicate the EventSource closed
// The only way to abort the connection from the client is via eventSource.close()
eventSource.onerror = function onerror(err) {
const errEs = err as unknown as EventSourceError;
// Consider 400 and 500 status errors unrecoverable, close the eventsource
if (errEs.status === 400) {
reject(Error(`400 Invalid topics: ${errEs.message}`));
}
if (errEs.status === 500) {
reject(Error(`500 Internal Server Error: ${errEs.message}`));
}
// TODO: else log the error somewhere
// console.log("eventstream client error", errEs);
};
// And abort resolve the promise so finally {} eventSource.close()
signal.addEventListener("abort", () => resolve(), {once: true});
});
} finally {
eventSource.close();
}
return {ok: true, response: undefined, status: HttpStatusCode.OK};
},
};
}
// https://github.com/EventSource/eventsource/blob/82e034389bd2c08d532c63172b8e858c5b185338/lib/eventsource.js#L143
type EventSourceError = {status: number; message: string};