From 308bde70a37a327bbd231345a3c9c3c8678d8cd4 Mon Sep 17 00:00:00 2001 From: Vincent Weevers Date: Sat, 2 Jul 2022 15:07:01 +0200 Subject: [PATCH] Replace `duplexify` and friends with `readable-stream` v4 (#7) Co-authored-by: Robert Nagy --- guest.js | 20 +++++++++++--------- host.js | 9 ++++----- package.json | 8 +++----- 3 files changed, 18 insertions(+), 19 deletions(-) diff --git a/guest.js b/guest.js index 51f11ca..d3abf63 100644 --- a/guest.js +++ b/guest.js @@ -1,11 +1,10 @@ 'use strict' -const duplexify = require('duplexify') const { AbstractLevel, AbstractIterator } = require('abstract-level') -const eos = require('end-of-stream') -const lpstream = require('length-prefixed-stream') +const lpstream = require('@vweevers/length-prefixed-stream') const ModuleError = require('module-error') const { input, output } = require('./tags') +const { Duplex, pipeline, finished } = require('readable-stream') const kExplicitClose = Symbol('explicitClose') const kAbortRequests = Symbol('abortRequests') @@ -108,10 +107,8 @@ class ManyLevelGuest extends AbstractLevel { self[kFlushed]() }) - const proxy = duplexify() - proxy.setWritable(decode) - proxy.setReadable(encode) - eos(proxy, cleanup) + const proxy = Duplex.from({ writable: decode, readable: encode }) + finished(proxy, cleanup) this[kRpcStream] = proxy return proxy @@ -312,7 +309,7 @@ class ManyLevelGuest extends AbstractLevel { this[kAbortRequests]('Aborted on database close()', 'LEVEL_DATABASE_NOT_OPEN') if (this[kRpcStream]) { - eos(this[kRpcStream], () => { + finished(this[kRpcStream], () => { this[kRpcStream] = null this._close(cb) }) @@ -330,7 +327,12 @@ class ManyLevelGuest extends AbstractLevel { // For tests only so does not need error handling this[kExplicitClose] = false const remote = this[kRemote]() - remote.pipe(this.connect()).pipe(remote) + pipeline( + remote, + this.connect(), + remote, + () => {} + ) } else if (this[kExplicitClose]) { throw new ModuleError('Cannot reopen many-level database after close()', { code: 'LEVEL_NOT_SUPPORTED' diff --git a/host.js b/host.js index 513c6dc..cb0bd78 100644 --- a/host.js +++ b/host.js @@ -1,9 +1,8 @@ 'use strict' -const lpstream = require('length-prefixed-stream') +const lpstream = require('@vweevers/length-prefixed-stream') const ModuleError = require('module-error') -const eos = require('end-of-stream') -const duplexify = require('duplexify') +const { Duplex, finished } = require('readable-stream') const { input, output } = require('./tags') const rangeOptions = new Set(['gt', 'gte', 'lt', 'lte']) @@ -60,7 +59,7 @@ function createRpcStream (db, options, streamOptions) { const readonly = options.readonly const decode = lpstream.decode() const encode = lpstream.encode() - const stream = duplexify(decode, encode) + const stream = Duplex.from({ writable: decode, readable: encode }) const preput = options.preput const predel = options.predel @@ -85,7 +84,7 @@ function createRpcStream (db, options, streamOptions) { const iterators = new Map() - eos(stream, function () { + finished(stream, function () { for (const iterator of iterators.values()) { iterator.close() } diff --git a/package.json b/package.json index c25f836..e0a6de0 100644 --- a/package.json +++ b/package.json @@ -25,12 +25,11 @@ "UPGRADING.md" ], "dependencies": { + "@vweevers/length-prefixed-stream": "^1.0.0", "abstract-level": "^1.0.3", - "duplexify": "^4.1.1", - "end-of-stream": "^1.1.0", - "length-prefixed-stream": "^2.0.0", "module-error": "^1.0.2", - "protocol-buffers-encodings": "^1.1.0" + "protocol-buffers-encodings": "^1.1.0", + "readable-stream": "^4.0.0" }, "devDependencies": { "@types/readable-stream": "^2.3.13", @@ -43,7 +42,6 @@ "memory-level": "^1.0.0", "nyc": "^15.1.0", "protocol-buffers": "^5.0.0", - "readable-stream": "^3.6.0", "standard": "^16.0.3", "tape": "^5.0.1", "ts-standard": "^11.0.0",