diff --git a/lib/Dicer.js b/lib/Dicer.js index e385a94..5e0b128 100644 --- a/lib/Dicer.js +++ b/lib/Dicer.js @@ -55,40 +55,6 @@ class Dicer extends Writable { }); } - emit(ev) { - if (ev !== 'finish' || this._realFinish) { - Writable.prototype.emit.apply(this, arguments); - return; - } - - if (this._finished) - return; - - process.nextTick(() => { - this.emit('error', new Error('Unexpected end of multipart data')); - - if (this._part && !this._ignoreData) { - const type = (this._isPreamble ? 'Preamble' : 'Part'); - this._part.emit( - 'error', - new Error(`${type} terminated early due to ` - + 'unexpected end of multipart data') - ); - this._part.push(null); - process.nextTick(() => { - this._realFinish = true; - this.emit('finish'); - this._realFinish = false; - }); - return; - } - - this._realFinish = true; - this.emit('finish'); - this._realFinish = false; - }); - } - _write(data, encoding, cb) { // Ignore unexpected data (e.g. extra trailer data after finished) if (!this._hparser && !this._bparser) @@ -97,9 +63,7 @@ class Dicer extends Writable { if (this._headerFirst && this._isPreamble) { if (!this._part) { this._part = new PartStream(this._partOpts); - if (this._events.preamble) - this.emit('preamble', this._part); - else + if (!this.emit('preamble', this._part)) ignore(this); } const r = this._hparser.push(data); @@ -123,6 +87,32 @@ class Dicer extends Writable { cb(); } + _final(cb) { + if (this._finished) { + if (this._parts !== 0) { + this._pause = true; + this._cb = cb; + return; + } + + cb(); + return; + } + + if (this._part && !this._ignoreData) { + const type = (this._isPreamble ? 'Preamble' : 'Part'); + this._part.emit( + 'error', + new Error(`${type} terminated early due to ` + + 'unexpected end of multipart data') + ); + this._part.push(null); + ignore(this); + } + + cb(new Error('Unexpected end of multipart data')); + } + reset() { this._part = undefined; this._bparser = undefined; @@ -154,15 +144,13 @@ function onInfo(isMatch, data, start, end) { } } if (this._dashes === 2) { - if ((start + i) < end && this._events.trailer) + if ((start + i) < end && this.listenerCount('trailer')) this.emit('trailer', data.slice(start + i, end)); this.reset(); this._finished = true; // No more parts will be added if (this._parts === 0) { - this._realFinish = true; - this.emit('finish'); - this._realFinish = false; + unpause(this); } } if (this._dashes) @@ -176,9 +164,7 @@ function onInfo(isMatch, data, start, end) { unpause(this); }; ev = this._isPreamble ? 'preamble' : 'part'; - if (this._events[ev]) - this.emit(ev, this._part); - else + if (!this.emit(ev, this._part)) ignore(this); if (!this._isPreamble) this._inHeader = true; @@ -206,13 +192,7 @@ function onInfo(isMatch, data, start, end) { ++this._parts; this._part.on('end', () => { if (--this._parts === 0) { - if (this._finished) { - this._realFinish = true; - this.emit('finish'); - this._realFinish = false; - } else { - unpause(this); - } + unpause(this); } }); } diff --git a/test/test-multipart.js b/test/test-multipart.js index d696c92..9a62e7e 100644 --- a/test/test-multipart.js +++ b/test/test-multipart.js @@ -133,7 +133,10 @@ function next() { }); }).on('error', (err) => { error = err; - }).on('finish', () => { + onFinish(); + }).on('finish', onFinish); + + function onFinish() { assert(finishes++ === 0, makeMsg(v.what, 'finish emitted multiple times')); if (v.dicerError) { @@ -242,7 +245,7 @@ function next() { } ++t; next(); - }); + } fs.createReadStream(fixtureBase + '/original').pipe(dicer); } diff --git a/test/test-pipeline.js b/test/test-pipeline.js new file mode 100644 index 0000000..6186723 --- /dev/null +++ b/test/test-pipeline.js @@ -0,0 +1,36 @@ +'use strict'; + +const assert = require('assert'); +const fs = require('fs'); +const path = require('path'); +const { inspect } = require('util'); +const { Readable } = require('stream'); +const { pipeline } = require('stream/promises'); +const Dicer = require('..'); + +async function main() { + const r = new Readable({ read() {} }); + const d = new Dicer({ boundary: 'a' }); + + d.on('part', async (part) => { + part.resume(); + }); + + r.push(`--a\r\nA: 1\r\nB: 1\r\n\r\n123\r\n--a\r\n\r\n456\r\n--a--\r\n`); + setImmediate(() => { + r.push(null); + }); + + const timer = setTimeout(() => { + throw new Error('Should be canceled'); + }, 2000); + + await pipeline(r, d); + + clearTimeout(timer); +} + +main().catch((err) => { + console.error(err); + process.exit(1); +});