From 84100d81427604be9a4399c86207f15dbd154a47 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Thu, 5 Oct 2023 00:10:22 +0300 Subject: [PATCH 1/5] add flush callback --- README.md | 4 +- index.js | 49 +++++++++++++-- test/flush.test.js | 145 +++++++++++++++++++++++++++++++++++++++++++++ types/index.d.ts | 2 +- 4 files changed, 193 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 0889af9..c28c157 100644 --- a/README.md +++ b/README.md @@ -83,11 +83,13 @@ For `sync:true` this is not relevant because the `'ready'` event will be fired w Writes the string to the file. It will return false to signal the producer to slow down. -### SonicBoom#flush() +### SonicBoom#flush([cb]) Writes the current buffer to the file if a write was not in progress. Do nothing if `minLength` is zero or if it is already writing. +call the callback when the flush operation is completed. when failed the callback is called with an error. + ### SonicBoom#reopen([file]) Reopen the file in place, useful for log rotation. diff --git a/index.js b/index.js index f5589b5..6d0adac 100644 --- a/index.js +++ b/index.js @@ -336,12 +336,40 @@ function writeBuffer (data) { return this._len < this._hwm } -function flush () { +function callFlushCallbackOnDrain (cb) { + const onDrain = () => { + cb() + this.off('error', onError) + } + const onError = (err) => { + cb(err) + this.off('drain', onDrain) + } + this.once('drain', onDrain) + this.once('error', onError) +} + +function flush (cb) { + if (cb != null && typeof cb !== 'function') { + throw new Error('flush cb must be a function') + } + if (this.destroyed) { - throw new Error('SonicBoom destroyed') + const error = new Error('SonicBoom destroyed') + cb?.(error) + throw error } - if (this._writing || this.minLength <= 0) { + if (this.minLength <= 0) { + cb?.() + return + } + + if (cb) { + callFlushCallbackOnDrain.call(this, cb) + } + + if (this._writing) { return } @@ -352,15 +380,26 @@ function flush () { this._actualWrite() } -function flushBuffer () { +function flushBuffer (cb) { + if (cb != null && typeof cb !== 'function') { + throw new Error('flush cb must be a function') + } + if (this.destroyed) { - throw new Error('SonicBoom destroyed') + const error = new Error('SonicBoom destroyed') + cb?.(error) + throw error } if (this._writing || this.minLength <= 0) { + cb?.() return } + if (cb) { + callFlushCallbackOnDrain.call(this, cb) + } + if (this._bufs.length === 0) { this._bufs.push([]) this._lens.push(0) diff --git a/test/flush.test.js b/test/flush.test.js index 8bf2092..27e4905 100644 --- a/test/flush.test.js +++ b/test/flush.test.js @@ -4,6 +4,7 @@ const fs = require('fs') const path = require('path') const SonicBoom = require('../') const { file, runTests } = require('./helper') +const proxyquire = require('proxyquire') runTests(buildTests) @@ -100,4 +101,148 @@ function buildTests (test, sync) { t.pass('drain emitted') }) }) + + test('call flush cb after flushed', (t) => { + t.plan(4) + + const dest = file() + const fd = fs.openSync(dest, 'w') + const stream = new SonicBoom({ fd, minLength: 4096, sync }) + + stream.on('ready', () => { + t.pass('ready emitted') + }) + + t.ok(stream.write('hello world\n')) + t.ok(stream.write('something else\n')) + + stream.flush((err) => { + if (err) t.fail(err) + else t.pass('flush cb called') + }) + }) + + test('call flush cb even when have no data', (t) => { + t.plan(2) + + const dest = file() + const fd = fs.openSync(dest, 'w') + const stream = new SonicBoom({ fd, minLength: 4096, sync }) + + stream.on('ready', () => { + t.pass('ready emitted') + + stream.flush((err) => { + if (err) t.fail(err) + else t.pass('flush cb called') + }) + }) + }) + + test('call flush cb even when minLength is 0', (t) => { + t.plan(1) + + const dest = file() + const fd = fs.openSync(dest, 'w') + const stream = new SonicBoom({ fd, minLength: 0, sync }) + + stream.flush((err) => { + if (err) t.fail(err) + else t.pass('flush cb called') + }) + }) + + test('call flush cb with an error when trying to flush destroyed stream', (t) => { + t.plan(1) + + const dest = file() + const fd = fs.openSync(dest, 'w') + const stream = new SonicBoom({ fd, minLength: 4096, sync }) + stream.destroy() + + try { + stream.flush((err) => { + if (err) t.pass(err) + else t.fail('flush cb called without an error') + }) + } catch { + // ignore + } + }) + + test('call flush cb with an error when failed to flush', (t) => { + t.plan(5) + + const fakeFs = Object.create(fs) + const SonicBoom = proxyquire('../', { + fs: fakeFs + }) + + const dest = file() + const fd = fs.openSync(dest, 'w') + const stream = new SonicBoom({ + fd, + sync: false, + minLength: 1000 + }) + + stream.on('ready', () => { + t.pass('ready emitted') + }) + + const err = new Error('other') + err.code = 'other' + fakeFs.write = function (fd, buf, enc, cb) { + fakeFs.write = fs.write + Error.captureStackTrace(err) + t.pass('fake fs.write called') + cb(err) + } + + t.ok(stream.write('hello world\n')) + stream.flush((err) => { + if (err) t.equal(err.code, 'other') + else t.fail('flush cb called without an error') + }) + + stream.end() + + stream.on('close', () => { + t.pass('close emitted') + }) + }) + + test('call flush cb when finish writing when currently in the middle', (t) => { + t.plan(4) + + const fakeFs = Object.create(fs) + const SonicBoom = proxyquire('../', { + fs: fakeFs + }) + + const dest = file() + const fd = fs.openSync(dest, 'w') + const stream = new SonicBoom({ + fd, + sync: false, + minLength: 1 + }) + + stream.on('ready', () => { + t.pass('ready emitted') + }) + + fakeFs.write = function (...args) { + stream.flush((err) => { + if (err) t.fail(err) + else t.pass('flush cb called') + }) + + t.pass('fake fs.write called') + fakeFs.write = fs.write + return fakeFs.write(...args) + } + + t.ok(stream.write('hello world\n')) + }) } diff --git a/types/index.d.ts b/types/index.d.ts index 5408636..9df073d 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -38,7 +38,7 @@ export class SonicBoom extends EventEmitter { * Writes the current buffer to the file if a write was not in progress. * Do nothing if minLength is zero or if it is already writing. */ - flush(): void; + flush(cb?: (err?: Error) => unknown): void; /** * Reopen the file in place, useful for log rotation. From 6192d137eb9557e50183bcedf3084ce82603a608 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Thu, 5 Oct 2023 01:09:07 +0300 Subject: [PATCH 2/5] add fsync --- index.js | 30 ++++++++++++-- test/flush.test.js | 101 +++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 119 insertions(+), 12 deletions(-) diff --git a/index.js b/index.js index 6d0adac..976f163 100644 --- a/index.js +++ b/index.js @@ -338,13 +338,19 @@ function writeBuffer (data) { function callFlushCallbackOnDrain (cb) { const onDrain = () => { - cb() + // only if _fsync is false to avoid double fsync + if (!this._fsync) { + fs.fsync(this.fd, cb) + } else { + cb() + } this.off('error', onError) } const onError = (err) => { cb(err) this.off('drain', onDrain) } + this.once('drain', onDrain) this.once('error', onError) } @@ -356,7 +362,11 @@ function flush (cb) { if (this.destroyed) { const error = new Error('SonicBoom destroyed') - cb?.(error) + if (cb) { + cb(error) + return + } + throw error } @@ -387,11 +397,15 @@ function flushBuffer (cb) { if (this.destroyed) { const error = new Error('SonicBoom destroyed') - cb?.(error) + if (cb) { + cb(error) + return + } + throw error } - if (this._writing || this.minLength <= 0) { + if (this.minLength <= 0) { cb?.() return } @@ -400,6 +414,14 @@ function flushBuffer (cb) { callFlushCallbackOnDrain.call(this, cb) } + if (this._writing) { + return + } + + if (cb) { + callFlushCallbackOnDrain.call(this, cb) + } + if (this._bufs.length === 0) { this._bufs.push([]) this._lens.push(0) diff --git a/test/flush.test.js b/test/flush.test.js index 27e4905..10c8ce4 100644 --- a/test/flush.test.js +++ b/test/flush.test.js @@ -122,6 +122,95 @@ function buildTests (test, sync) { }) }) + test('only call fsyncSync and not fsync when fsync: true', (t) => { + t.plan(6) + + const fakeFs = Object.create(fs) + const SonicBoom = proxyquire('../', { + fs: fakeFs + }) + + const dest = file() + const fd = fs.openSync(dest, 'w') + const stream = new SonicBoom({ + fd, + sync: false, + fsync: true, + minLength: 100 + }) + + stream.on('ready', () => { + t.pass('ready emitted') + }) + + fakeFs.fsync = function (fd, cb) { + t.fail('fake fs.fsync called while should not') + cb() + } + fakeFs.fsyncSync = function (fd) { + t.pass('fake fsyncSync called') + } + + fakeFs.write = function (...args) { + t.pass('fake fs.write called') + fakeFs.write = fs.write + return fakeFs.write(...args) + } + + t.ok(stream.write('hello world\n')) + stream.flush((err) => { + if (err) t.fail(err) + else t.pass('flush cb called') + + process.nextTick(() => { + // to make sure fsync is not called as well + t.pass('nextTick after flush called') + }) + }) + }) + + test('call flush cb with error when fsync failed', (t) => { + t.plan(5) + + const fakeFs = Object.create(fs) + const SonicBoom = proxyquire('../', { + fs: fakeFs + }) + + const dest = file() + const fd = fs.openSync(dest, 'w') + const stream = new SonicBoom({ + fd, + sync: false, + minLength: 100 + }) + + stream.on('ready', () => { + t.pass('ready emitted') + }) + + const err = new Error('other') + err.code = 'other' + fakeFs.fsync = function (fd, cb) { + fakeFs.fsync = fs.fsync + Error.captureStackTrace(err) + t.pass('fake fs.fsync called') + cb(err) + } + + fakeFs.write = function (...args) { + t.pass('fake fs.write called') + fakeFs.write = fs.write + return fakeFs.write(...args) + } + + t.ok(stream.write('hello world\n')) + stream.flush((err) => { + if (err) t.equal(err.code, 'other') + else t.fail('flush cb called without an error') + }) + }) + test('call flush cb even when have no data', (t) => { t.plan(2) @@ -160,14 +249,10 @@ function buildTests (test, sync) { const stream = new SonicBoom({ fd, minLength: 4096, sync }) stream.destroy() - try { - stream.flush((err) => { - if (err) t.pass(err) - else t.fail('flush cb called without an error') - }) - } catch { - // ignore - } + stream.flush((err) => { + if (err) t.pass(err) + else t.fail('flush cb called without an error') + }) }) test('call flush cb with an error when failed to flush', (t) => { From 135d5295068102fb76c21a0eace0a648bdb94096 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Thu, 5 Oct 2023 01:20:49 +0300 Subject: [PATCH 3/5] use same minLength --- test/flush.test.js | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/test/flush.test.js b/test/flush.test.js index 10c8ce4..0f18535 100644 --- a/test/flush.test.js +++ b/test/flush.test.js @@ -136,7 +136,7 @@ function buildTests (test, sync) { fd, sync: false, fsync: true, - minLength: 100 + minLength: 4096 }) stream.on('ready', () => { @@ -182,7 +182,7 @@ function buildTests (test, sync) { const stream = new SonicBoom({ fd, sync: false, - minLength: 100 + minLength: 4096 }) stream.on('ready', () => { @@ -268,7 +268,7 @@ function buildTests (test, sync) { const stream = new SonicBoom({ fd, sync: false, - minLength: 1000 + minLength: 4096 }) stream.on('ready', () => { @@ -310,6 +310,8 @@ function buildTests (test, sync) { const stream = new SonicBoom({ fd, sync: false, + + // to trigger write without calling flush minLength: 1 }) From 26e261aa1c5109aefaa86feb61c5aea9a2044337 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Thu, 5 Oct 2023 01:26:38 +0300 Subject: [PATCH 4/5] add sync tests variation and fix --- index.js | 4 -- test/flush.test.js | 103 +++++++++++++++++++++++++++++++-------------- 2 files changed, 72 insertions(+), 35 deletions(-) diff --git a/index.js b/index.js index 976f163..dc537ef 100644 --- a/index.js +++ b/index.js @@ -418,10 +418,6 @@ function flushBuffer (cb) { return } - if (cb) { - callFlushCallbackOnDrain.call(this, cb) - } - if (this._bufs.length === 0) { this._bufs.push([]) this._lens.push(0) diff --git a/test/flush.test.js b/test/flush.test.js index 0f18535..f886b0d 100644 --- a/test/flush.test.js +++ b/test/flush.test.js @@ -134,7 +134,7 @@ function buildTests (test, sync) { const fd = fs.openSync(dest, 'w') const stream = new SonicBoom({ fd, - sync: false, + sync, fsync: true, minLength: 4096 }) @@ -151,10 +151,18 @@ function buildTests (test, sync) { t.pass('fake fsyncSync called') } - fakeFs.write = function (...args) { - t.pass('fake fs.write called') - fakeFs.write = fs.write - return fakeFs.write(...args) + function successOnAsyncOrSyncFn (isSync, originalFn) { + return function (...args) { + t.pass(`fake fs.${originalFn.name} called`) + fakeFs[originalFn.name] = originalFn + return fakeFs[originalFn.name](...args) + } + } + + if (sync) { + fakeFs.writeSync = successOnAsyncOrSyncFn(true, fs.writeSync) + } else { + fakeFs.write = successOnAsyncOrSyncFn(false, fs.write) } t.ok(stream.write('hello world\n')) @@ -181,7 +189,7 @@ function buildTests (test, sync) { const fd = fs.openSync(dest, 'w') const stream = new SonicBoom({ fd, - sync: false, + sync, minLength: 4096 }) @@ -191,17 +199,33 @@ function buildTests (test, sync) { const err = new Error('other') err.code = 'other' - fakeFs.fsync = function (fd, cb) { - fakeFs.fsync = fs.fsync - Error.captureStackTrace(err) - t.pass('fake fs.fsync called') - cb(err) + + function onFsyncOnFsyncSync (isSync, originalFn) { + return function (...args) { + Error.captureStackTrace(err) + t.pass(`fake fs.${originalFn.name} called`) + fakeFs[originalFn.name] = originalFn + const cb = args[args.length - 1] + + cb(err) + } } - fakeFs.write = function (...args) { - t.pass('fake fs.write called') - fakeFs.write = fs.write - return fakeFs.write(...args) + // only one is called depends on sync + fakeFs.fsync = onFsyncOnFsyncSync(false, fs.fsync) + + function successOnAsyncOrSyncFn (isSync, originalFn) { + return function (...args) { + t.pass(`fake fs.${originalFn.name} called`) + fakeFs[originalFn.name] = originalFn + return fakeFs[originalFn.name](...args) + } + } + + if (sync) { + fakeFs.writeSync = successOnAsyncOrSyncFn(true, fs.writeSync) + } else { + fakeFs.write = successOnAsyncOrSyncFn(false, fs.write) } t.ok(stream.write('hello world\n')) @@ -267,7 +291,7 @@ function buildTests (test, sync) { const fd = fs.openSync(dest, 'w') const stream = new SonicBoom({ fd, - sync: false, + sync, minLength: 4096 }) @@ -277,13 +301,24 @@ function buildTests (test, sync) { const err = new Error('other') err.code = 'other' - fakeFs.write = function (fd, buf, enc, cb) { - fakeFs.write = fs.write - Error.captureStackTrace(err) - t.pass('fake fs.write called') - cb(err) + + function onWriteOrWriteSync (isSync, originalFn) { + return function (...args) { + Error.captureStackTrace(err) + t.pass(`fake fs.${originalFn.name} called`) + fakeFs[originalFn.name] = originalFn + + if (isSync) throw err + const cb = args[args.length - 1] + + cb(err) + } } + // only one is called depends on sync + fakeFs.write = onWriteOrWriteSync(false, fs.write) + fakeFs.writeSync = onWriteOrWriteSync(true, fs.writeSync) + t.ok(stream.write('hello world\n')) stream.flush((err) => { if (err) t.equal(err.code, 'other') @@ -309,7 +344,7 @@ function buildTests (test, sync) { const fd = fs.openSync(dest, 'w') const stream = new SonicBoom({ fd, - sync: false, + sync, // to trigger write without calling flush minLength: 1 @@ -319,17 +354,23 @@ function buildTests (test, sync) { t.pass('ready emitted') }) - fakeFs.write = function (...args) { - stream.flush((err) => { - if (err) t.fail(err) - else t.pass('flush cb called') - }) - - t.pass('fake fs.write called') - fakeFs.write = fs.write - return fakeFs.write(...args) + function onWriteOrWriteSync (originalFn) { + return function (...args) { + stream.flush((err) => { + if (err) t.fail(err) + else t.pass('flush cb called') + }) + + t.pass(`fake fs.${originalFn.name} called`) + fakeFs[originalFn.name] = originalFn + return originalFn(...args) + } } + // only one is called depends on sync + fakeFs.write = onWriteOrWriteSync(fs.write) + fakeFs.writeSync = onWriteOrWriteSync(fs.writeSync) + t.ok(stream.write('hello world\n')) }) } From 924a029054bf6a78bf6ca136e662b63c3b6f1df7 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Thu, 5 Oct 2023 12:52:22 +0300 Subject: [PATCH 5/5] Add test for flush callback type --- types/index.test-d.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/types/index.test-d.ts b/types/index.test-d.ts index 39687bf..564f7f2 100644 --- a/types/index.test-d.ts +++ b/types/index.test-d.ts @@ -21,6 +21,7 @@ expectType( new SonicBoomCjsNamed({ fd: 1})); expectType(sonic.write('hello sonic\n')); sonic.flush(); +sonic.flush((err?: Error) => undefined); sonic.flushSync();