Skip to content

Commit

Permalink
lib: use Writable#_final for 'finish' tracking
Browse files Browse the repository at this point in the history
Instead overriding `emit` method and using `this._realFinish` to detect
unexpected end of data - implement `_final` method and track the logic
there.

The only behavior change from the use of `_final` is that we would no
longer emit `finish` when the input data is terminated early. Instead we
would emit `error` as before and stop the processing. Note that this is
the behavior provided by `stream.Writable`, and it is thus conformant
with the specification.

Additionally, replace uses of private `_events` property with either
straight `emit()` with return value check, or `listenerCount()` in the
situations where there might be an overhead from the constructed event
arguments.

Replace `emit('error', error)` with `destroy(error)` calls as well, as
otherwise the behavior is undefined.

Fix: mscdex#26
  • Loading branch information
indutny authored and indutny-signal committed Mar 2, 2022
1 parent c64ada8 commit abd843e
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 75 deletions.
97 changes: 38 additions & 59 deletions lib/Dicer.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,44 +48,8 @@ class Dicer extends Writable {
this._part.emit('header', header);
});
this._hparser.on('error', (err) => {
if (this._part && !this._ignoreData) {
this._part.emit('error', err);
this._part.push(null);
}
});
}

emit(ev) {
if (ev !== 'finish' || this._realFinish) {
Writable.prototype.emit.apply(this, arguments);
return;
}

if (this._finished)
return;

process.nextTick(() => {
this.emit('error', new Error('Unexpected end of multipart data'));

if (this._part && !this._ignoreData) {
const type = (this._isPreamble ? 'Preamble' : 'Part');
this._part.emit(
'error',
new Error(`${type} terminated early due to `
+ 'unexpected end of multipart data')
);
this._part.push(null);
process.nextTick(() => {
this._realFinish = true;
this.emit('finish');
this._realFinish = false;
});
return;
}

this._realFinish = true;
this.emit('finish');
this._realFinish = false;
if (this._part && !this._ignoreData)
this._part.destroy(err);
});
}

Expand All @@ -97,9 +61,7 @@ class Dicer extends Writable {
if (this._headerFirst && this._isPreamble) {
if (!this._part) {
this._part = new PartStream(this._partOpts);
if (this._events.preamble)
this.emit('preamble', this._part);
else
if (!this.emit('preamble', this._part))
ignore(this);
}
const r = this._hparser.push(data);
Expand All @@ -123,6 +85,34 @@ class Dicer extends Writable {
cb();
}

_final(cb) {
if (this._finished) {
if (this._parts !== 0) {
this._pause = true;
this._cb = cb;
return;
}

cb();
return;
}

if (this._part && !this._ignoreData) {
const type = (this._isPreamble ? 'Preamble' : 'Part');
this._part.destroy(
new Error(`${type} terminated early due to `
+ 'unexpected end of multipart data')
);
ignore(this);
}

// Node <= 12 compatibility, otherwise `part`'s 'error'/'end' have no chance
// to emit.
process.nextTick(() => {
cb(new Error('Unexpected end of multipart data'));
});
}

reset() {
this._part = undefined;
this._bparser = undefined;
Expand Down Expand Up @@ -154,16 +144,14 @@ function onInfo(isMatch, data, start, end) {
}
}
if (this._dashes === 2) {
if ((start + i) < end && this._events.trailer)
if ((start + i) < end && this.listenerCount('trailer'))
this.emit('trailer', data.slice(start + i, end));
this.reset();
this._finished = true;
// No more parts will be added
if (this._parts === 0) {
this._realFinish = true;
this.emit('finish');
this._realFinish = false;
}
if (this._parts === 0)
unpause(this);

}
if (this._dashes)
return;
Expand All @@ -176,9 +164,7 @@ function onInfo(isMatch, data, start, end) {
unpause(this);
};
ev = this._isPreamble ? 'preamble' : 'part';
if (this._events[ev])
this.emit(ev, this._part);
else
if (!this.emit(ev, this._part))
ignore(this);
if (!this._isPreamble)
this._inHeader = true;
Expand All @@ -205,15 +191,8 @@ function onInfo(isMatch, data, start, end) {
} else {
++this._parts;
this._part.on('end', () => {
if (--this._parts === 0) {
if (this._finished) {
this._realFinish = true;
this.emit('finish');
this._realFinish = false;
} else {
unpause(this);
}
}
if (--this._parts === 0)
unpause(this);
});
}
this._part.push(null);
Expand Down
49 changes: 33 additions & 16 deletions test/test-multipart.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ function next() {
header: undefined
};

const onPreambleEnd = () => {
if (preamble.body)
preamble.body = Buffer.concat(preamble.body, preamble.bodylen);
if (preamble.body || preamble.header)
state.preamble = preamble;
};

p.on('header', (h) => {
preamble.header = h;
if (v.setBoundary)
Expand All @@ -100,12 +107,8 @@ function next() {
preamble.bodylen += data.length;
}).on('error', (err) => {
preamble.error = err;
}).on('end', () => {
if (preamble.body)
preamble.body = Buffer.concat(preamble.body, preamble.bodylen);
if (preamble.body || preamble.header)
state.preamble = preamble;
});
onPreambleEnd();
}).on('end', onPreambleEnd);
});
dicer.on('part', (p) => {
const part = {
Expand All @@ -115,6 +118,12 @@ function next() {
header: undefined
};

const onPartEnd = () => {
if (part.body)
part.body = Buffer.concat(part.body, part.bodylen);
state.parts.push(part);
};

p.on('header', (h) => {
part.header = h;
}).on('data', (data) => {
Expand All @@ -126,15 +135,23 @@ function next() {
}).on('error', (err) => {
part.error = err;
++partErrors;
}).on('end', () => {
if (part.body)
part.body = Buffer.concat(part.body, part.bodylen);
state.parts.push(part);
});
}).on('error', (err) => {
error = err;
}).on('finish', () => {
assert(finishes++ === 0, makeMsg(v.what, 'finish emitted multiple times'));
onPartEnd();
}).on('end', onPartEnd);
}).on('error', onFinish).on('finish', onFinish);

function onFinish(err) {
if (err) {
assert(
error === undefined,
makeMsg(v.what, 'error emitted multiple times')
);
error = err;
}

// Node <= 12 emits both 'error' and 'end', while Node > 14 emits only
// 'error'.
if (finishes++ > 0)
return;

if (v.dicerError) {
assert(error !== undefined, makeMsg(v.what, 'Expected error'));
Expand Down Expand Up @@ -242,7 +259,7 @@ function next() {
}
++t;
next();
});
}

fs.createReadStream(fixtureBase + '/original').pipe(dicer);
}
Expand Down
30 changes: 30 additions & 0 deletions test/test-pipeline.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
'use strict';

const assert = require('assert');

const { Readable, pipeline } = require('stream');
const Dicer = require('..');

const r = new Readable({ read() {} });
const d = new Dicer({ boundary: 'a' });

let isFinished = false;

d.on('part', async (part) => {
part.resume();
});

r.push('--a\r\nA: 1\r\nB: 1\r\n\r\n123\r\n--a\r\n\r\n456\r\n--a--\r\n');
setImmediate(() => {
r.push(null);
});

pipeline(r, d, (error) => {
assert(isFinished === false, 'Double-invocation of pipeline callback');
assert(error === undefined, 'Unexpected pipeline error');
isFinished = true;
});

process.on('exit', () => {
assert(isFinished === true, 'Should finish before exiting');
});

0 comments on commit abd843e

Please sign in to comment.