Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use PushableAbortable #434

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 6 additions & 10 deletions src/stream.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { Stream } from '@libp2p/interface-connection'
import { abortableSource } from 'abortable-iterator'
import { pipe } from 'it-pipe'
import { pushable, Pushable } from 'it-pushable'
import { encode, decode } from 'it-length-prefixed'
import { Uint8ArrayList } from 'uint8arraylist'
import { PushableAbortable } from './utils/pushable'

type OutboundStreamOpts = {
/** Max size in bytes for pushable buffer. If full, will throw on .push */
Expand All @@ -16,17 +16,15 @@ type InboundStreamOpts = {
}

export class OutboundStream {
private readonly pushable: Pushable<Uint8Array>
private readonly closeController: AbortController
private readonly pushable = new PushableAbortable()
private readonly maxBufferSize: number

constructor(private readonly rawStream: Stream, errCallback: (e: Error) => void, opts: OutboundStreamOpts) {
this.pushable = pushable({ objectMode: false })
this.closeController = new AbortController()
this.maxBufferSize = opts.maxBufferSize ?? Infinity

pipe(
abortableSource(this.pushable, this.closeController.signal, { returnOnAbort: true }),
//
this.pushable,
(source) => encode(source),
this.rawStream
).catch(errCallback)
Expand All @@ -38,17 +36,15 @@ export class OutboundStream {
}

push(data: Uint8Array): void {
if (this.pushable.readableLength > this.maxBufferSize) {
if (this.pushable.bufferSize > this.maxBufferSize) {
throw Error(`OutboundStream buffer full, size > ${this.maxBufferSize}`)
}

this.pushable.push(data)
}

close(): void {
this.closeController.abort()
// similar to pushable.end() but clear the internal buffer
this.pushable.return()
this.pushable.abort()
this.rawStream.close()
}
}
Expand Down
115 changes: 115 additions & 0 deletions src/utils/linkedList.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/**
* The node for LinkedList below
*/
class Node<T> {
data: T
next: Node<T> | null = null
prev: Node<T> | null = null

constructor(data: T) {
this.data = data
}
}

/**
* We want to use this if we only need push/pop/shift method
* without random access.
* The shift() method should be cheaper than regular array.
*/
export class LinkedList<T> {
private _length: number
private head: Node<T> | null
private tail: Node<T> | null

constructor() {
this._length = 0
this.head = null
this.tail = null
}

get length(): number {
return this._length
}

push(data: T): void {
if (this._length === 0) {
this.tail = this.head = new Node(data)
this._length++
return
}

if (!this.head || !this.tail) {
// should not happen
throw Error('No head or tail')
}

const newTail = new Node(data)
this.tail.next = newTail
newTail.prev = this.tail
this.tail = newTail
this._length++
}

pop(): T | null {
const oldTail = this.tail
if (!oldTail) return null
this._length = Math.max(0, this._length - 1)

if (this._length === 0) {
this.head = this.tail = null
} else {
this.tail = oldTail.prev
if (this.tail) this.tail.next = null
oldTail.prev = oldTail.next = null
}

return oldTail.data
}

shift(): T | null {
const oldHead = this.head
if (!oldHead) return null
this._length = Math.max(0, this._length - 1)

if (this._length === 0) {
this.head = this.tail = null
} else {
this.head = oldHead.next
if (this.head) this.head.prev = null
oldHead.prev = oldHead.next = null
}

return oldHead.data
}

clear(): void {
this.head = this.tail = null
this._length = 0
}

toArray(): T[] {
let node = this.head
if (!node) return []

const arr: T[] = []
while (node) {
arr.push(node.data)
node = node.next
}

return arr
}

map<U>(fn: (t: T) => U): U[] {
let node = this.head
if (!node) return []

const arr: U[] = []
while (node) {
arr.push(fn(node.data))
node = node.next
}

return arr
}
}
65 changes: 65 additions & 0 deletions src/utils/pushable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import { LinkedList } from './linkedList.js'

export class PushableAbortable {
private readonly items = new LinkedList<Uint8Array>()
private error: Error | null = null
private done = false
private onNext: () => void | null = null
private _bufferSize = 0

get bufferSize(): number {
return this._bufferSize
}

push(item: Uint8Array): void {
this.items.push(item)
this._bufferSize += item.length
this.onNext?.()
}

abort(err?: Error): void {
if (err) {
this.error = err
} else {
this.done = true
}
this.onNext?.()
}

[Symbol.asyncIterator](): AsyncIterator<Uint8Array> {
// eslint-disable-next-line @typescript-eslint/no-this-alias
const self = this

return {
async next() {
// eslint-disable-next-line no-constant-condition
while (true) {
const item = self.items.shift()
if (item !== null) {
self._bufferSize -= item.length
return { value: item, done: false }
}

if (self.error) {
throw self.error
}

if (self.done) {
// Is it correct to return undefined on done: true?
return { value: undefined as unknown as Uint8Array, done: true }
}

await new Promise<void>((resolve) => {
self.onNext = resolve
})
self.onNext = null
}
},

async return() {
// This will be reached if the consumer called 'break' or 'return' early in the loop.
return { value: undefined, done: true }
}
}
}
}