Skip to content

Commit

Permalink
stream: support abortsignal in constructor
Browse files Browse the repository at this point in the history
PR-URL: #36431
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
  • Loading branch information
benjamingr committed Dec 10, 2020
1 parent 8065883 commit 040a27a
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 6 deletions.
46 changes: 46 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1938,6 +1938,9 @@ method.
#### `new stream.Writable([options])`
<!-- YAML
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/36431
description: support passing in an AbortSignal.
- version: v14.0.0
pr-url: https://github.com/nodejs/node/pull/30623
description: Change `autoDestroy` option default to `true`.
Expand Down Expand Up @@ -1985,6 +1988,7 @@ changes:
[`stream._construct()`][writable-_construct] method.
* `autoDestroy` {boolean} Whether this stream should automatically call
`.destroy()` on itself after ending. **Default:** `true`.
* `signal` {AbortSignal} A signal representing possible cancellation.

<!-- eslint-disable no-useless-constructor -->
```js
Expand Down Expand Up @@ -2028,6 +2032,27 @@ const myWritable = new Writable({
});
```

Calling `abort` on the `AbortController` corresponding to the passed
`AbortSignal` will behave the same way as calling `.destroy(new AbortError())`
on the writeable stream.

```js
const { Writable } = require('stream');

const controller = new AbortController();
const myWritable = new Writable({
write(chunk, encoding, callback) {
// ...
},
writev(chunks, callback) {
// ...
},
signal: controller.signal
});
// Later, abort the operation closing the stream
controller.abort();

```
#### `writable._construct(callback)`
<!-- YAML
added: v15.0.0
Expand Down Expand Up @@ -2276,6 +2301,9 @@ constructor and implement the [`readable._read()`][] method.
#### `new stream.Readable([options])`
<!-- YAML
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/36431
description: support passing in an AbortSignal.
- version: v14.0.0
pr-url: https://github.com/nodejs/node/pull/30623
description: Change `autoDestroy` option default to `true`.
Expand Down Expand Up @@ -2306,6 +2334,7 @@ changes:
[`stream._construct()`][readable-_construct] method.
* `autoDestroy` {boolean} Whether this stream should automatically call
`.destroy()` on itself after ending. **Default:** `true`.
* `signal` {AbortSignal} A signal representing possible cancellation.

<!-- eslint-disable no-useless-constructor -->
```js
Expand Down Expand Up @@ -2346,6 +2375,23 @@ const myReadable = new Readable({
});
```

Calling `abort` on the `AbortController` corresponding to the passed
`AbortSignal` will behave the same way as calling `.destroy(new AbortError())`
on the readable created.

```js
const fs = require('fs');
const controller = new AbortController();
const read = new Readable({
read(size) {
// ...
},
signal: controller.signal
});
// Later, abort the operation closing the stream
controller.abort();
```

#### `readable._construct(callback)`
<!-- YAML
added: v15.0.0
Expand Down
15 changes: 10 additions & 5 deletions lib/internal/streams/add-abort-signal.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@ const eos = require('internal/streams/end-of-stream');
const { ERR_INVALID_ARG_TYPE } = codes;

// This method is inlined here for readable-stream
// It also does not allow for signal to not exist on the steam
// https://github.com/nodejs/node/pull/36061#discussion_r533718029
const validateAbortSignal = (signal, name) => {
if (signal !== undefined &&
(signal === null ||
typeof signal !== 'object' ||
!('aborted' in signal))) {
if (typeof signal !== 'object' ||
!('aborted' in signal)) {
throw new ERR_INVALID_ARG_TYPE(name, 'AbortSignal', signal);
}
};
Expand All @@ -23,11 +22,17 @@ function isStream(obj) {
return !!(obj && typeof obj.pipe === 'function');
}

module.exports = function addAbortSignal(signal, stream) {
module.exports.addAbortSignal = function addAbortSignal(signal, stream) {
validateAbortSignal(signal, 'signal');
if (!isStream(stream)) {
throw new ERR_INVALID_ARG_TYPE('stream', 'stream.Stream', stream);
}
return module.exports.addAbortSignalNoValidate(signal, stream);
};
module.exports.addAbortSignalNoValidate = function(signal, stream) {
if (typeof signal !== 'object' || !('aborted' in signal)) {
return stream;
}
const onAbort = () => {
stream.destroy(new AbortError());
};
Expand Down
6 changes: 6 additions & 0 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ const EE = require('events');
const { Stream, prependListener } = require('internal/streams/legacy');
const { Buffer } = require('buffer');

const {
addAbortSignalNoValidate,
} = require('internal/streams/add-abort-signal');

let debug = require('internal/util/debuglog').debuglog('stream', (fn) => {
debug = fn;
});
Expand Down Expand Up @@ -192,6 +196,8 @@ function Readable(options) {

if (typeof options.construct === 'function')
this._construct = options.construct;
if (options.signal && !isDuplex)
addAbortSignalNoValidate(options.signal, this);
}

Stream.call(this, options);
Expand Down
7 changes: 7 additions & 0 deletions lib/internal/streams/writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ const EE = require('events');
const Stream = require('internal/streams/legacy').Stream;
const { Buffer } = require('buffer');
const destroyImpl = require('internal/streams/destroy');

const {
addAbortSignalNoValidate,
} = require('internal/streams/add-abort-signal');

const {
getHighWaterMark,
getDefaultHighWaterMark
Expand Down Expand Up @@ -263,6 +268,8 @@ function Writable(options) {

if (typeof options.construct === 'function')
this._construct = options.construct;
if (options.signal)
addAbortSignalNoValidate(options.signal, this);
}

Stream.call(this, options);
Expand Down
3 changes: 2 additions & 1 deletion lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ Stream.Duplex = require('internal/streams/duplex');
Stream.Transform = require('internal/streams/transform');
Stream.PassThrough = require('internal/streams/passthrough');
Stream.pipeline = pipeline;
Stream.addAbortSignal = require('internal/streams/add-abort-signal');
const { addAbortSignal } = require('internal/streams/add-abort-signal');
Stream.addAbortSignal = addAbortSignal;
Stream.finished = eos;

function lazyLoadPromises() {
Expand Down
17 changes: 17 additions & 0 deletions test/parallel/test-stream-duplex-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -238,3 +238,20 @@ const assert = require('assert');
});
duplex.on('close', common.mustCall());
}
{
// Check abort signal
const controller = new AbortController();
const { signal } = controller;
const duplex = new Duplex({
write(chunk, enc, cb) { cb(); },
read() {},
signal,
});
let count = 0;
duplex.on('error', common.mustCall((e) => {
assert.strictEqual(count++, 0); // Ensure not called twice
assert.strictEqual(e.name, 'AbortError');
}));
duplex.on('close', common.mustCall());
controller.abort();
}
16 changes: 16 additions & 0 deletions test/parallel/test-stream-readable-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,22 @@ const assert = require('assert');
read.on('data', common.mustNotCall());
}

{
const controller = new AbortController();
const read = new Readable({
signal: controller.signal,
read() {
this.push('asd');
},
});

read.on('error', common.mustCall((e) => {
assert.strictEqual(e.name, 'AbortError');
}));
controller.abort();
read.on('data', common.mustNotCall());
}

{
const controller = new AbortController();
const read = addAbortSignal(controller.signal, new Readable({
Expand Down
15 changes: 15 additions & 0 deletions test/parallel/test-stream-writable-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -431,3 +431,18 @@ const assert = require('assert');
write.write('asd');
ac.abort();
}

{
const ac = new AbortController();
const write = new Writable({
signal: ac.signal,
write(chunk, enc, cb) { cb(); }
});

write.on('error', common.mustCall((e) => {
assert.strictEqual(e.name, 'AbortError');
assert.strictEqual(write.destroyed, true);
}));
write.write('asd');
ac.abort();
}

0 comments on commit 040a27a

Please sign in to comment.