diff --git a/README.md b/README.md index 37a680a..bec696c 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,7 @@ const { Level } = require('level') const WriteStream = require('level-ws') const db = new Level('./db', { valueEncoding: 'json' }) -const ws = WriteStream(db) +const ws = new WriteStream(db) ws.on('close', function () { console.log('Done!') @@ -40,7 +40,7 @@ ws.end() ## API -### `ws = WriteStream(db[, options])` +### `ws = new WriteStream(db[, options])` Create a [writable stream](https://nodejs.org/dist/latest-v8.x/docs/api/stream.html#stream_class_stream_writable) that operates in object mode, accepting batch operations to be committed with `db.batch()` on each tick of the Node.js event loop. The optional `options` argument may contain: diff --git a/level-ws.js b/level-ws.js index e72444d..db6e60d 100644 --- a/level-ws.js +++ b/level-ws.js @@ -1,90 +1,84 @@ 'use strict' const Writable = require('readable-stream').Writable -const inherits = require('inherits') -const defaultOptions = { type: 'put' } +class WriteStream extends Writable { + constructor (db, options) { + options = Object.assign({ type: 'put' }, options) -function WriteStream (db, options) { - if (!(this instanceof WriteStream)) { - return new WriteStream(db, options) - } - - options = Object.assign({}, defaultOptions, options) - - Writable.call(this, { - objectMode: true, - highWaterMark: options.highWaterMark || 16 - }) + super({ + objectMode: true, + highWaterMark: options.highWaterMark || 16 + }) - this._options = options - this._db = db - this._buffer = [] - this._flushing = false - this._maxBufferLength = options.maxBufferLength || Infinity + this._options = options + this._db = db + this._buffer = [] + this._flushing = false + this._maxBufferLength = options.maxBufferLength || Infinity + this._flush = this._flush.bind(this) - this.on('finish', () => { - this.emit('close') - }) -} + this.on('finish', () => { + this.emit('close') + }) + } -inherits(WriteStream, Writable) + _write (data, enc, next) { + if (this.destroyed) return -WriteStream.prototype._write = function (data, enc, next) { - if (this.destroyed) return + if (!this._flushing) { + this._flushing = true + process.nextTick(this._flush) + } - if (!this._flushing) { - this._flushing = true - process.nextTick(() => { this._flush() }) + if (this._buffer.length >= this._maxBufferLength) { + this.once('_flush', (err) => { + if (err) return this.destroy(err) + this._write(data, enc, next) + }) + } else { + this._buffer.push(Object.assign({ type: this._options.type }, data)) + next() + } } - if (this._buffer.length >= this._maxBufferLength) { - this.once('_flush', (err) => { - if (err) return this.destroy(err) - this._write(data, enc, next) - }) - } else { - this._buffer.push(Object.assign({ type: this._options.type }, data)) - next() - } -} + _flush () { + const buffer = this._buffer -WriteStream.prototype._flush = function () { - const buffer = this._buffer + if (this.destroyed) return - if (this.destroyed) return + this._buffer = [] + this._db.batch(buffer, (err) => { + this._flushing = false - this._buffer = [] - this._db.batch(buffer, (err) => { - this._flushing = false + if (!this.emit('_flush', err) && err) { + // There was no _flush listener. + this.destroy(err) + } + }) + } - if (!this.emit('_flush', err) && err) { - // There was no _flush listener. - this.destroy(err) + _final (cb) { + if (this._flushing) { + // Wait for scheduled or in-progress _flush() + this.once('_flush', (err) => { + if (err) return cb(err) + + // There could be additional buffered writes + this._final(cb) + }) + } else if (this._buffer && this._buffer.length) { + this.once('_flush', cb) + this._flush() + } else { + cb() } - }) -} - -WriteStream.prototype._final = function (cb) { - if (this._flushing) { - // Wait for scheduled or in-progress _flush() - this.once('_flush', (err) => { - if (err) return cb(err) - - // There could be additional buffered writes - this._final(cb) - }) - } else if (this._buffer && this._buffer.length) { - this.once('_flush', cb) - this._flush() - } else { - cb() } -} -WriteStream.prototype._destroy = function (err, cb) { - this._buffer = null - cb(err) + _destroy (err, cb) { + this._buffer = null + cb(err) + } } module.exports = WriteStream diff --git a/package.json b/package.json index 5d3ff7e..80d9cfb 100644 --- a/package.json +++ b/package.json @@ -14,7 +14,6 @@ "UPGRADING.md" ], "dependencies": { - "inherits": "^2.0.3", "readable-stream": "^3.1.0" }, "devDependencies": { diff --git a/test.js b/test.js index 359230d..f3f4bed 100644 --- a/test.js +++ b/test.js @@ -77,7 +77,7 @@ function test (label, options, fn) { // TODO: test various encodings test('test simple WriteStream', function (t, ctx, done) { - const ws = WriteStream(ctx.db) + const ws = new WriteStream(ctx.db) ws.on('error', function (err) { t.notOk(err, 'no error') }) @@ -89,7 +89,7 @@ test('test simple WriteStream', function (t, ctx, done) { }) test('test WriteStream with async writes', function (t, ctx, done) { - const ws = WriteStream(ctx.db) + const ws = new WriteStream(ctx.db) const sourceData = ctx.sourceData let i = -1 @@ -121,7 +121,7 @@ test('race condition between batch callback and close event', function (t, ctx, // Delaying the batch should not be a problem slowdown(ctx.db) - const ws = WriteStream(ctx.db) + const ws = new WriteStream(ctx.db) let i = 0 ws.on('error', function (err) { @@ -141,7 +141,7 @@ test('race condition between batch callback and close event', function (t, ctx, test('race condition between two flushes', function (t, ctx, done) { slowdown(ctx.db) - const ws = WriteStream(ctx.db) + const ws = new WriteStream(ctx.db) const order = monitor(ws) ws.on('close', function () { @@ -166,7 +166,7 @@ test('race condition between two flushes', function (t, ctx, done) { }) test('test end accepts data', function (t, ctx, done) { - const ws = WriteStream(ctx.db) + const ws = new WriteStream(ctx.db) let i = 0 ws.on('error', function (err) { @@ -184,7 +184,7 @@ test('test end accepts data', function (t, ctx, done) { }) test('test destroy()', function (t, ctx, done) { - const ws = WriteStream(ctx.db) + const ws = new WriteStream(ctx.db) const verify = function () { ctx.db.iterator().all(function (err, result) { @@ -205,7 +205,7 @@ test('test destroy()', function (t, ctx, done) { }) test('test destroy(err)', function (t, ctx, done) { - const ws = WriteStream(ctx.db) + const ws = new WriteStream(ctx.db) const order = monitor(ws) ws.on('error', function (err) { @@ -242,7 +242,7 @@ test('test json encoding', { keyEncoding: 'utf8', valueEncoding: 'json' }, funct { key: 'cc', value: { c: 'w00t', d: { e: [0, 10, 20, 30], f: 1, g: 'wow' } } } ] - const ws = WriteStream(ctx.db) + const ws = new WriteStream(ctx.db) ws.on('error', function (err) { t.notOk(err, 'no error') }) @@ -267,7 +267,7 @@ test('test del capabilities for each key/value', { keyEncoding: 'utf8', valueEnc ] function del () { - const delStream = WriteStream(ctx.db) + const delStream = new WriteStream(ctx.db) delStream.on('error', function (err) { t.notOk(err, 'no error') }) @@ -290,7 +290,7 @@ test('test del capabilities for each key/value', { keyEncoding: 'utf8', valueEnc }) } - const ws = WriteStream(ctx.db) + const ws = new WriteStream(ctx.db) ws.on('error', function (err) { t.notOk(err, 'no error') }) @@ -317,7 +317,7 @@ test('test del capabilities as constructor option', { keyEncoding: 'utf8', value ] function del () { - const delStream = WriteStream(ctx.db, { type: 'del' }) + const delStream = new WriteStream(ctx.db, { type: 'del' }) delStream.on('error', function (err) { t.notOk(err, 'no error') }) @@ -339,7 +339,7 @@ test('test del capabilities as constructor option', { keyEncoding: 'utf8', value }) } - const ws = WriteStream(ctx.db) + const ws = new WriteStream(ctx.db) ws.on('error', function (err) { t.notOk(err, 'no error') }) @@ -369,7 +369,7 @@ test('test type at key/value level must take precedence on the constructor', { k exception.type = 'put' function del () { - const delStream = WriteStream(ctx.db, { type: 'del' }) + const delStream = new WriteStream(ctx.db, { type: 'del' }) delStream.on('error', function (err) { t.notOk(err, 'no error') }) @@ -392,7 +392,7 @@ test('test type at key/value level must take precedence on the constructor', { k }) } - const ws = WriteStream(ctx.db) + const ws = new WriteStream(ctx.db) ws.on('error', function (err) { t.notOk(err, 'no error') }) @@ -419,7 +419,7 @@ test('test that missing type errors', function (t, ctx, done) { }) } - const ws = WriteStream(ctx.db) + const ws = new WriteStream(ctx.db) ws.on('error', function (err) { t.equal(err.message, 'A batch operation must have a type property that is \'put\' or \'del\'', 'should error') errored = true @@ -432,7 +432,7 @@ test('test that missing type errors', function (t, ctx, done) { }) test('test limbo batch error', function (t, ctx, done) { - const ws = WriteStream(ctx.db) + const ws = new WriteStream(ctx.db) const order = monitor(ws) monkeyBatch(ctx.db, function (original, ops, options, cb) { @@ -454,7 +454,7 @@ test('test limbo batch error', function (t, ctx, done) { }) test('test batch error when buffer is full', function (t, ctx, done) { - const ws = WriteStream(ctx.db, { maxBufferLength: 1 }) + const ws = new WriteStream(ctx.db, { maxBufferLength: 1 }) const order = monitor(ws) monkeyBatch(ctx.db, function (original, ops, options, cb) { @@ -477,7 +477,7 @@ test('test batch error when buffer is full', function (t, ctx, done) { }) test('test destroy while waiting to drain', function (t, ctx, done) { - const ws = WriteStream(ctx.db, { maxBufferLength: 1 }) + const ws = new WriteStream(ctx.db, { maxBufferLength: 1 }) const order = monitor(ws) ws.on('error', function (err) { @@ -506,7 +506,7 @@ test('test destroy while waiting to drain', function (t, ctx, done) { function testMaxBuffer (max, randomize) { return function (t, ctx, done) { - const ws = WriteStream(ctx.db, { maxBufferLength: max }) + const ws = new WriteStream(ctx.db, { maxBufferLength: max }) const sourceData = [] const batches = []