Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

doc: piping from async generators using pipeline() #33992

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 20 additions & 67 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -2886,80 +2886,33 @@ readable.on('data', (chunk) => {

#### Piping to writable streams from async iterators

In the scenario of writing to a writable stream from an async iterator, ensure
the correct handling of backpressure and errors.
When writing to a writable stream from an async iterator, ensure correct
handling of backpressure and errors. [`stream.pipeline()`][] abstracts away
the handling of backpressure and backpressure-related errors:

```js
const { once } = require('events');
const finished = util.promisify(stream.finished);
const { pipeline } = require('stream');
const util = require('util');
const fs = require('fs');

const writable = fs.createWriteStream('./file');

function drain(writable) {
if (writable.destroyed) {
return Promise.reject(new Error('premature close'));
}
return Promise.race([
once(writable, 'drain'),
once(writable, 'close')
.then(() => Promise.reject(new Error('premature close')))
]);
}

async function pump(iterable, writable) {
for await (const chunk of iterable) {
// Handle backpressure on write().
if (!writable.write(chunk)) {
await drain(writable);
}
// Callback Pattern
pipeline(iterator, writable, (err, value) => {
if (err) {
console.error(err);
} else {
console.log(value, 'value returned');
}
writable.end();
}

(async function() {
// Ensure completion without errors.
await Promise.all([
pump(iterable, writable),
finished(writable)
]);
})();
```

In the above, errors on `write()` would be caught and thrown by the
`once()` listener for the `'drain'` event, since `once()` will also handle the
`'error'` event. To ensure completion of the write stream without errors,
it is safer to use the `finished()` method as above, instead of using the
`once()` listener for the `'finish'` event. Under certain cases, an `'error'`
event could be emitted by the writable stream after `'finish'` and as `once()`
will release the `'error'` handler on handling the `'finish'` event, it could
result in an unhandled error.

Alternatively, the readable stream could be wrapped with `Readable.from()` and
then piped via `.pipe()`:

```js
const finished = util.promisify(stream.finished);

const writable = fs.createWriteStream('./file');

(async function() {
const readable = Readable.from(iterable);
readable.pipe(writable);
// Ensure completion without errors.
await finished(writable);
})();
```

Or, using `stream.pipeline()` to pipe streams:

```js
const pipeline = util.promisify(stream.pipeline);

const writable = fs.createWriteStream('./file');
});

(async function() {
await pipeline(iterable, writable);
})();
// Promise Pattern
const pipelinePromise = util.promisify(pipeline);
pipelinePromise(iterator, writable)
.then((value) => {
console.log(value, 'value returned');
})
.catch(console.error);
```

<!--type=misc-->
Expand Down