Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

readline: add support for async iteration #18904

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions doc/api/readline.md
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,38 @@ rl.write(null, { ctrl: true, name: 'u' });
The `rl.write()` method will write the data to the `readline` `Interface`'s
`input` *as if it were provided by the user*.

### rl[@@asyncIterator]
<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental

Returns an [AsyncIterator][async-iterator] to fully consume the stream.

```js
const readline = require('readline');
const fs = require('fs');

async function processLineByLine(readable) {
readable.setEncoding('utf8');
const rli = readline.createInterface({
input: readable,
crlfDelay: Infinity
});

for await (const line of rli) {
console.log(line);
}
}

processLineByLine(fs.createReadStream('file')).catch(console.error);
```

If the loop terminates with a `break` or a `throw`, the stream will be
destroyed. In other terms, iterating over a stream will consume the stream
fully.

## readline.clearLine(stream, dir)
<!-- YAML
added: v0.7.7
Expand Down Expand Up @@ -527,6 +559,29 @@ rl.on('line', (line) => {
});
```

> Stability: 1 - Experimental

Another way is to use async [for-await-of][for-await-of] iteration statement:

```js
const readline = require('readline');
const fs = require('fs');

async function processLineByLine(readable) {
readable.setEncoding('utf8');
const rli = readline.createInterface({
input: readable,
crlfDelay: Infinity
});

for await (const line of rli) {
console.log(line);
}
}

processLineByLine(fs.createReadStream('file')).catch(console.error);
```

[`'SIGCONT'`]: readline.html#readline_event_sigcont
[`'SIGTSTP'`]: readline.html#readline_event_sigtstp
[`process.stdin`]: process.html#process_process_stdin
Expand All @@ -535,3 +590,6 @@ rl.on('line', (line) => {
[TTY]: tty.html
[Writable]: stream.html#stream_writable_streams
[reading files]: #readline_example_read_file_stream_line_by_line
[async-iterator]: https://github.com/tc39/proposal-async-iteration
[for-await-of]: https://github.com/tc39/proposal-async-iteration#the-async-iteration-statement-for-await-of

3 changes: 3 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1241,6 +1241,8 @@ fully. The stream will be read in chunks of size equal to the `highWaterMark`
option. In the code example above, data will be in a single chunk if the file
has less then 64kb of data because no `highWaterMark` option is provided to
[`fs.createReadStream()`][].
Use [readline][readline-async-iterator] if there is a need for line-by-line
async file iteration.

### Duplex and Transform Streams

Expand Down Expand Up @@ -2518,3 +2520,4 @@ contain multi-byte characters.
[readable-destroy]: #stream_readable_destroy_error
[writable-_destroy]: #stream_writable_destroy_err_callback
[writable-destroy]: #stream_writable_destroy_error
[readline-async-iterator]: readline.html#readline_rl_asynciterator
137 changes: 137 additions & 0 deletions lib/internal/readline/async_iterator.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
'use strict';

const kReadlineInterface = Symbol('readlineInterface');
const {
kLastResolve,
kLastReject,
kError,
kEnded,
kLastPromise,
kHandlePromise,
kStream,
onEnd,
onError,
wrapForNext,
AsyncIteratorRecord,
BaseAsyncIterator
} = require('internal/streams/base_async_iterator');

function readAndResolve(iter) {
const resolve = iter[kLastResolve];
if (resolve !== null) {
const data = iter[kReadlineInterface].read();
// we defer if data is null
// we can be expecting either 'end' or
// 'error'
if (data !== null) {
iter[kLastPromise] = null;
iter[kLastResolve] = null;
iter[kLastReject] = null;
resolve(new AsyncIteratorRecord(data, false));
}
}
}

function writeToBuffer(iter) {
const data = iter[kStream].read();
if (data !== null) {
iter[kReadlineInterface]._normalWrite(data);
}
process.nextTick(readAndResolve, iter);
}

function onReadable(iter) {
process.nextTick(writeToBuffer, iter);
}

const ReadlineAsyncIterator =
class ReadlineAsyncIterator extends BaseAsyncIterator {
constructor(readline_interface) {
super();
this[kReadlineInterface] = readline_interface;
this[kStream] = readline_interface.input;
this[kLastResolve] = null;
this[kLastReject] = null;
this[kError] = null;
this[kEnded] = false;
this[kLastPromise] = null;

this[kStream].on('readable', onReadable.bind(null, this));
this[kStream].on('end', onEnd.bind(null, this));
this[kStream].on('error', onError.bind(null, this));

// the function passed to new Promise
// is cached so we avoid allocating a new
// closure at every run
this[kHandlePromise] = (resolve, reject) => {
const data = this[kReadlineInterface].read();
if (data) {
this[kLastPromise] = null;
this[kLastResolve] = null;
this[kLastReject] = null;
resolve(new AsyncIteratorRecord(data, false));
} else {
this[kLastResolve] = resolve;
this[kLastReject] = reject;
}
};
}

get stream() {
return this[kStream];
}

next() {
// if we have detected an error in the meanwhile
// reject straight away
const error = this[kError];
if (error !== null) {
return Promise.reject(error);
}

if (this[kEnded]) {
return Promise.resolve(new AsyncIteratorRecord(null, true));
}

// if we have multiple next() calls
// we will wait for the previous Promise to finish
// this logic is optimized to support for await loops,
// where next() is only called once at a time
const lastPromise = this[kLastPromise];
let promise;

if (lastPromise) {
promise = new Promise(wrapForNext(lastPromise, this));
} else {
// fast path needed to support multiple this.push()
// without triggering the next() queue
const data = this[kReadlineInterface].read();
if (data !== null) {
return Promise.resolve(new AsyncIteratorRecord(data, false));
}

promise = new Promise(this[kHandlePromise]);
}

this[kLastPromise] = promise;

return promise;
}

return() {
// destroy(err, cb) is a private API
// we can guarantee we have that here, because we control the
// Readable class this is attached to
return new Promise((resolve, reject) => {
this[kStream].destroy(null, (err) => {
if (err) {
reject(err);
return;
}
resolve(new AsyncIteratorRecord(null, true));
});
});
}
};

module.exports = ReadlineAsyncIterator;
Loading