Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v13.x backport] stream: async iterator improvements #32174

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 3 additions & 11 deletions lib/internal/streams/async_iterator.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const {
} = primordials;

const finished = require('internal/streams/end-of-stream');
const destroyImpl = require('internal/streams/destroy');

const kLastResolve = Symbol('lastResolve');
const kLastReject = Symbol('lastReject');
Expand All @@ -22,15 +23,6 @@ const kStream = Symbol('stream');

let Readable;

function destroy(stream, err) {
// request.destroy just do .end - .abort is what we want
if (typeof stream.abort === 'function') return stream.abort();
if (stream.req &&
typeof stream.req.abort === 'function') return stream.req.abort();
if (typeof stream.destroy === 'function') return stream.destroy(err);
if (typeof stream.close === 'function') return stream.close();
}

function createIterResult(value, done) {
return { value, done };
}
Expand Down Expand Up @@ -92,7 +84,7 @@ function finish(self, err) {
resolve(createIterResult(undefined, true));
}
});
destroy(stream, err);
destroyImpl.destroyer(stream, err);
});
}

Expand Down Expand Up @@ -172,7 +164,7 @@ const createReadableStreamAsyncIterator = (stream) => {

const src = stream;
stream = new Readable({ objectMode: true }).wrap(src);
finished(stream, (err) => destroy(src, err));
finished(stream, (err) => destroyImpl.destroyer(src, err));
}

const iterator = ObjectCreate(ReadableStreamAsyncIteratorPrototype, {
Expand Down
13 changes: 13 additions & 0 deletions lib/internal/streams/destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,21 @@ function errorOrDestroy(stream, err) {
stream.emit('error', err);
}

function isRequest(stream) {
return stream && stream.setHeader && typeof stream.abort === 'function';
}

// Normalize destroy for legacy.
function destroyer(stream, err) {
// request.destroy just do .end - .abort is what we want
if (isRequest(stream)) return stream.abort();
if (isRequest(stream.req)) return stream.req.abort();
if (typeof stream.destroy === 'function') return stream.destroy(err);
if (typeof stream.close === 'function') return stream.close();
}

module.exports = {
destroyer,
destroy,
undestroy,
errorOrDestroy
Expand Down
39 changes: 6 additions & 33 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const {
let eos;

const { once } = require('internal/util');
const destroyImpl = require('internal/streams/destroy');
const {
ERR_INVALID_ARG_TYPE,
ERR_INVALID_RETURN_VALUE,
Expand All @@ -28,14 +29,6 @@ function isRequest(stream) {
return stream && stream.setHeader && typeof stream.abort === 'function';
}

function destroyStream(stream, err) {
// request.destroy just do .end - .abort is what we want
if (isRequest(stream)) return stream.abort();
if (isRequest(stream.req)) return stream.req.abort();
if (typeof stream.destroy === 'function') return stream.destroy(err);
if (typeof stream.close === 'function') return stream.close();
}

function destroyer(stream, reading, writing, final, callback) {
callback = once(callback);
let destroyed = false;
Expand All @@ -46,15 +39,15 @@ function destroyer(stream, reading, writing, final, callback) {
destroyed = true;
const readable = stream.readable || isRequest(stream);
if (err || !final || !readable) {
destroyStream(stream, err);
destroyImpl.destroyer(stream, err);
}
callback(err);
});

return (err) => {
if (destroyed) return;
destroyed = true;
destroyStream(stream, err);
destroyImpl.destroyer(stream, err);
callback(err || new ERR_STREAM_DESTROYED('pipe'));
};
}
Expand Down Expand Up @@ -97,39 +90,19 @@ function makeAsyncIterable(val) {
return val;
} else if (isReadable(val)) {
// Legacy streams are not Iterable.
return _fromReadable(val);
return fromReadable(val);
} else {
throw new ERR_INVALID_ARG_TYPE(
'val', ['Readable', 'Iterable', 'AsyncIterable'], val);
}
}

async function* _fromReadable(val) {
async function* fromReadable(val) {
if (!createReadableStreamAsyncIterator) {
createReadableStreamAsyncIterator =
require('internal/streams/async_iterator');
}

try {
if (typeof val.read !== 'function') {
// createReadableStreamAsyncIterator does not support
// v1 streams. Convert it into a v2 stream.

if (!PassThrough) {
PassThrough = require('_stream_passthrough');
}

const pt = new PassThrough();
val
.on('error', (err) => pt.destroy(err))
.pipe(pt);
yield* createReadableStreamAsyncIterator(pt);
} else {
yield* createReadableStreamAsyncIterator(val);
}
} finally {
destroyStream(val);
}
yield* createReadableStreamAsyncIterator(val);
}

async function pump(iterable, writable, finish) {
Expand Down