diff --git a/package.json b/package.json index 9e4f6129..510a40c8 100644 --- a/package.json +++ b/package.json @@ -78,7 +78,6 @@ "@libp2p/peer-id": "^4.0.5", "@libp2p/pubsub": "^9.0.8", "@multiformats/multiaddr": "^12.1.14", - "abortable-iterator": "^5.0.1", "denque": "^2.1.0", "it-length-prefixed": "^9.0.4", "it-pipe": "^3.0.1", diff --git a/src/stream.ts b/src/stream.ts index 505c7897..7c9db2eb 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -1,4 +1,3 @@ -import { abortableSource } from 'abortable-iterator' import { encode, decode } from 'it-length-prefixed' import { pipe } from 'it-pipe' import { pushable, type Pushable } from 'it-pushable' @@ -25,8 +24,15 @@ export class OutboundStream { this.closeController = new AbortController() this.maxBufferSize = opts.maxBufferSize ?? Infinity + this.closeController.signal.addEventListener('abort', () => { + rawStream.close() + .catch(err => { + rawStream.abort(err) + }) + }) + pipe( - abortableSource(this.pushable, this.closeController.signal, { returnOnAbort: true }), + this.pushable, this.rawStream ).catch(errCallback) } @@ -59,7 +65,6 @@ export class OutboundStream { this.closeController.abort() // similar to pushable.end() but clear the internal buffer await this.pushable.return() - await this.rawStream.close() } } @@ -73,17 +78,20 @@ export class InboundStream { this.rawStream = rawStream this.closeController = new AbortController() - this.source = abortableSource( - pipe(this.rawStream, (source) => decode(source, opts)), - this.closeController.signal, - { - returnOnAbort: true - } + this.closeController.signal.addEventListener('abort', () => { + rawStream.close() + .catch(err => { + rawStream.abort(err) + }) + }) + + this.source = pipe( + this.rawStream, + (source) => decode(source, opts) ) } async close (): Promise { this.closeController.abort() - await this.rawStream.close() } }