-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathget-stream-from-async-iterable.ts
45 lines (39 loc) · 1.6 KB
/
get-stream-from-async-iterable.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import { CustomProgressEvent } from 'progress-events'
import type { VerifiedFetchInit } from '../index.js'
import type { ComponentLogger } from '@libp2p/interface'
/**
* Converts an async iterator of Uint8Array bytes to a stream and returns the first chunk of bytes
*/
export async function getStreamFromAsyncIterable (iterator: AsyncIterable<Uint8Array>, path: string, logger: ComponentLogger, options?: Pick<VerifiedFetchInit, 'onProgress' | 'signal'>): Promise<{ stream: ReadableStream<Uint8Array>, firstChunk: Uint8Array }> {
const log = logger.forComponent('helia:verified-fetch:get-stream-from-async-iterable')
const reader = iterator[Symbol.asyncIterator]()
const { value: firstChunk, done } = await reader.next()
if (done === true) {
log.error('no content found for path', path)
throw new Error('No content found')
}
const stream = new ReadableStream({
async start (controller) {
// the initial value is already available
options?.onProgress?.(new CustomProgressEvent<void>('verified-fetch:request:progress:chunk'))
controller.enqueue(firstChunk)
},
async pull (controller) {
const { value, done } = await reader.next()
if (done === true) {
if (value != null) {
options?.onProgress?.(new CustomProgressEvent<void>('verified-fetch:request:progress:chunk'))
controller.enqueue(value)
}
controller.close()
return
}
options?.onProgress?.(new CustomProgressEvent<void>('verified-fetch:request:progress:chunk'))
controller.enqueue(value)
}
})
return {
stream,
firstChunk
}
}