Skip to content

Commit

Permalink
refactor: will this work better
Browse files Browse the repository at this point in the history
  • Loading branch information
mary-ext committed Nov 25, 2024
1 parent c0e1ed8 commit 7048c9d
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 4 deletions.
14 changes: 11 additions & 3 deletions scripts/export-dids.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,17 @@ let firehoseCursor: number | undefined = state?.firehose.cursor;
console.log(` connecting to ${JETSTREAM_URL}`);
console.log(` starting ${cursor || `<root>`}`);

const ws = createWebSocketStream<JetstreamEvent>(
() => JETSTREAM_URL + `?cursor=${cursor}` + `&wantedCollections=invalid.nsid.record`,
);
const ws = createWebSocketStream<JetstreamEvent>({
url() {
return JETSTREAM_URL + `?cursor=${cursor}` + `&wantedCollections=invalid.nsid.record`;
},
onOpen() {
console.log(` connected`);
},
onClose() {
console.log(` closed`);
},
});

for await (const data of ws) {
if (data.time_us > cursor) {
Expand Down
24 changes: 23 additions & 1 deletion src/utils/websocket.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,39 @@
import { WebSocket } from 'partysocket';

export const createWebSocketStream = <T = any>(url: () => string) => {
export const createWebSocketStream = <T = any>({
url,
onOpen,
onClose,
}: {
url: () => string;
onOpen?: () => void;
onClose?: () => void;
}) => {
let ws: WebSocket | undefined;
let closed = false;

return new ReadableStream<T>({
start(controller) {
ws = new WebSocket(url, null, { maxRetries: Infinity });
closed = false;

ws.addEventListener('open', () => {
onOpen?.();
});
ws.addEventListener('close', (ev) => {
onClose?.();

if (!closed && ev.wasClean) {
closed = true;
controller.close();
}
});
ws.addEventListener('message', (ev) => {
controller.enqueue(JSON.parse(ev.data));
});
},
cancel() {
closed = true;
ws?.close();
},
});
Expand Down

0 comments on commit 7048c9d

Please sign in to comment.