Skip to content

Commit

Permalink
Add option to limit OutboundStream buffer size
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed Aug 13, 2022
1 parent 9f0be04 commit 57fb6bb
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 6 deletions.
12 changes: 10 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,12 @@ export interface GossipsubOpts extends GossipsubOptsSpec, PubSubInit {
* streams that are allowed to be open concurrently
*/
maxOutboundStreams?: number

/**
* Specify max buffer size in bytes for OutboundStream.
* If full it will throw and reject sending any more data.
*/
maxOutboundBufferSize?: number
}

export interface GossipsubMessage {
Expand Down Expand Up @@ -691,8 +697,10 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
}

try {
const stream = new OutboundStream(await connection.newStream(this.multicodecs), (e) =>
this.log.error('outbound pipe error', e)
const stream = new OutboundStream(
await connection.newStream(this.multicodecs),
(e) => this.log.error('outbound pipe error', e),
{ maxBufferSize: this.opts.maxOutboundBufferSize }
)

this.log('create outbound stream %p', peerId)
Expand Down
17 changes: 13 additions & 4 deletions src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,20 @@ import { pushable, Pushable } from 'it-pushable'
import { encode, decode } from 'it-length-prefixed'
import { Uint8ArrayList } from 'uint8arraylist'

type OutboundStreamOpts = {
/** Max size in bytes for pushable buffer. If full, will throw on .push */
maxBufferSize?: number
}

export class OutboundStream {
private readonly rawStream: Stream
private readonly pushable: Pushable<Uint8Array>
private readonly closeController: AbortController
private readonly maxBufferSize: number

constructor(rawStream: Stream, errCallback: (e: Error) => void) {
this.rawStream = rawStream
this.pushable = pushable()
constructor(private readonly rawStream: Stream, errCallback: (e: Error) => void, opts: OutboundStreamOpts) {
this.pushable = pushable({ objectMode: false })
this.closeController = new AbortController()
this.maxBufferSize = opts.maxBufferSize ?? Infinity

pipe(
abortableSource(this.pushable, this.closeController.signal, { returnOnAbort: true }),
Expand All @@ -28,6 +33,10 @@ export class OutboundStream {
}

push(data: Uint8Array): void {
if (this.pushable.readableLength > this.maxBufferSize) {
throw Error(`OutboundStream buffer full, size > ${this.maxBufferSize}`)
}

this.pushable.push(data)
}

Expand Down

0 comments on commit 57fb6bb

Please sign in to comment.