Skip to content

Commit

Permalink
readline: add support for async iteration
Browse files Browse the repository at this point in the history
Co-authored-by: Ivan Filenko <ivan.filenko@protonmail.com>
Fixes: nodejs#18603
Refs: nodejs#18904
PR-URL: nodejs#23916
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Gus Caplan <me@gus.host>
  • Loading branch information
2 people authored and Trott committed Nov 20, 2018
1 parent 2742f38 commit 2a7432d
Show file tree
Hide file tree
Showing 6 changed files with 315 additions and 3 deletions.
70 changes: 68 additions & 2 deletions doc/api/readline.md
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,43 @@ rl.write(null, { ctrl: true, name: 'u' });
The `rl.write()` method will write the data to the `readline` `Interface`'s
`input` *as if it were provided by the user*.

### rl\[Symbol.asyncIterator\]()
<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental
* Returns: {AsyncIterator}

Create an `AsyncIterator` object that iterates through each line in the input
stream as a string. This method allows asynchronous iteration of
`readline.Interface` objects through `for`-`await`-`of` loops.

Errors in the input stream are not forwarded.

If the loop is terminated with `break`, `throw`, or `return`,
[`rl.close()`][] will be called. In other words, iterating over a
`readline.Interface` will always consume the input stream fully.

A caveat with using this experimental API is that the performance is
currently not on par with the traditional `'line'` event API, and thus it is
not recommended for performance-sensitive applications. We expect this
situation to improve in the future.

```js
async function processLineByLine() {
const rl = readline.createInterface({
// ...
});

for await (const line of rl) {
// Each line in the readline input will be successively available here as
// `line`.
}
}
```

## readline.clearLine(stream, dir)
<!-- YAML
added: v0.7.7
Expand Down Expand Up @@ -517,12 +554,38 @@ rl.on('line', (line) => {

## Example: Read File Stream Line-by-Line

A common use case for `readline` is to consume input from a filesystem
[Readable][] stream one line at a time:
A common use case for `readline` is to consume an input file one line at a
time. The easiest way to do so is leveraging the [`fs.ReadStream`][] API as
well as a `for`-`await`-`of` loop:

```js
const fs = require('fs');
const readline = require('readline');

async function processLineByLine() {
const fileStream = fs.createReadStream('input.txt');

const rl = readline.createInterface({
input: fileStream,
crlfDelay: Infinity
});
// Note: we use the crlfDelay option to recognize all instances of CR LF
// ('\r\n') in input.txt as a single line break.

for await (const line of rl) {
// Each line in input.txt will be successively available here as `line`.
console.log(`Line from file: ${line}`);
}
}

processLineByLine();
```

Alternatively, one could use the [`'line'`][] event:

```js
const fs = require('fs');
const readline = require('readline');

const rl = readline.createInterface({
input: fs.createReadStream('sample.txt'),
Expand All @@ -536,8 +599,11 @@ rl.on('line', (line) => {

[`'SIGCONT'`]: readline.html#readline_event_sigcont
[`'SIGTSTP'`]: readline.html#readline_event_sigtstp
[`'line'`]: #readline_event_line
[`fs.ReadStream`]: fs.html#fs_class_fs_readstream
[`process.stdin`]: process.html#process_process_stdin
[`process.stdout`]: process.html#process_process_stdout
[`rl.close()`]: #readline_rl_close
[Readable]: stream.html#stream_readable_streams
[TTY]: tty.html
[Writable]: stream.html#stream_writable_streams
Expand Down
43 changes: 43 additions & 0 deletions lib/readline.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const {
ERR_INVALID_OPT_VALUE
} = require('internal/errors').codes;
const { debug, inherits } = require('util');
const { emitExperimentalWarning } = require('internal/util');
const { Buffer } = require('buffer');
const EventEmitter = require('events');
const {
Expand All @@ -54,11 +55,16 @@ const {
// Lazy load StringDecoder for startup performance.
let StringDecoder;

// Lazy load Readable for startup performance.
let Readable;

const kHistorySize = 30;
const kMincrlfDelay = 100;
// \r\n, \n, or \r followed by something other than \n
const lineEnding = /\r?\n|\r(?!\n)/;

const kLineObjectStream = Symbol('line object stream');

const KEYPRESS_DECODER = Symbol('keypress-decoder');
const ESCAPE_DECODER = Symbol('escape-decoder');

Expand Down Expand Up @@ -190,6 +196,8 @@ function Interface(input, output, completer, terminal) {
self._refreshLine();
}

this[kLineObjectStream] = undefined;

if (!this.terminal) {
function onSelfCloseWithoutTerminal() {
input.removeListener('data', ondata);
Expand Down Expand Up @@ -1019,6 +1027,41 @@ Interface.prototype._ttyWrite = function(s, key) {
}
};

Interface.prototype[Symbol.asyncIterator] = function() {
emitExperimentalWarning('readline Interface [Symbol.asyncIterator]');

if (this[kLineObjectStream] === undefined) {
if (Readable === undefined) {
Readable = require('stream').Readable;
}
const readable = new Readable({
objectMode: true,
read: () => {
this.resume();
},
destroy: (err, cb) => {
this.off('line', lineListener);
this.off('close', closeListener);
this.close();
cb(err);
}
});
const lineListener = (input) => {
if (!readable.push(input)) {
this.pause();
}
};
const closeListener = () => {
readable.push(null);
};
this.on('line', lineListener);
this.on('close', closeListener);
this[kLineObjectStream] = readable;
}

return this[kLineObjectStream][Symbol.asyncIterator]();
};

/**
* accepts a readable Stream instance and makes it emit "keypress" events
*/
Expand Down
48 changes: 48 additions & 0 deletions test/parallel/test-readline-async-iterators-backpressure.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
'use strict';

const common = require('../common');
const assert = require('assert');
const { Readable } = require('stream');
const readline = require('readline');

const CONTENT = 'content';
const TOTAL_LINES = 18;

(async () => {
const readable = new Readable({ read() {} });
readable.push(`${CONTENT}\n`.repeat(TOTAL_LINES));

const rli = readline.createInterface({
input: readable,
crlfDelay: Infinity
});

const it = rli[Symbol.asyncIterator]();
const highWaterMark = it.stream.readableHighWaterMark;

// For this test to work, we have to queue up more than the number of
// highWaterMark items in rli. Make sure that is the case.
assert(TOTAL_LINES > highWaterMark);

let iterations = 0;
let readableEnded = false;
for await (const line of it) {
assert.strictEqual(readableEnded, false);

assert.strictEqual(line, CONTENT);

const expectedPaused = TOTAL_LINES - iterations > highWaterMark;
assert.strictEqual(readable.isPaused(), expectedPaused);

iterations += 1;

// We have to end the input stream asynchronously for back pressure to work.
// Only end when we have reached the final line.
if (iterations === TOTAL_LINES) {
readable.push(null);
readableEnded = true;
}
}

assert.strictEqual(iterations, TOTAL_LINES);
})().then(common.mustCall());
78 changes: 78 additions & 0 deletions test/parallel/test-readline-async-iterators-destroy.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
'use strict';

const common = require('../common');
const fs = require('fs');
const { join } = require('path');
const readline = require('readline');
const assert = require('assert');

const tmpdir = require('../common/tmpdir');
tmpdir.refresh();

const filename = join(tmpdir.path, 'test.txt');

const testContents = [
'',
'\n',
'line 1',
'line 1\nline 2 南越国是前203年至前111年存在于岭南地区的一个国家\nline 3\ntrailing',
'line 1\nline 2\nline 3 ends with newline\n'
];

async function testSimpleDestroy() {
for (const fileContent of testContents) {
fs.writeFileSync(filename, fileContent);

const readable = fs.createReadStream(filename);
const rli = readline.createInterface({
input: readable,
crlfDelay: Infinity
});

const iteratedLines = [];
for await (const k of rli) {
iteratedLines.push(k);
break;
}

const expectedLines = fileContent.split('\n');
if (expectedLines[expectedLines.length - 1] === '') {
expectedLines.pop();
}
expectedLines.splice(1);

assert.deepStrictEqual(iteratedLines, expectedLines);
}
}

async function testMutualDestroy() {
for (const fileContent of testContents) {
fs.writeFileSync(filename, fileContent);

const readable = fs.createReadStream(filename);
const rli = readline.createInterface({
input: readable,
crlfDelay: Infinity
});

const expectedLines = fileContent.split('\n');
if (expectedLines[expectedLines.length - 1] === '') {
expectedLines.pop();
}
expectedLines.splice(2);

const iteratedLines = [];
for await (const k of rli) {
iteratedLines.push(k);
for await (const l of rli) {
iteratedLines.push(l);
break;
}
assert.deepStrictEqual(iteratedLines, expectedLines);
}

assert.deepStrictEqual(iteratedLines, expectedLines);
}
}

testSimpleDestroy().then(testMutualDestroy).then(common.mustCall());
77 changes: 77 additions & 0 deletions test/parallel/test-readline-async-iterators.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
'use strict';

const common = require('../common');
const fs = require('fs');
const { join } = require('path');
const readline = require('readline');
const assert = require('assert');

const tmpdir = require('../common/tmpdir');
tmpdir.refresh();

const filename = join(tmpdir.path, 'test.txt');

const testContents = [
'',
'\n',
'line 1',
'line 1\nline 2 南越国是前203年至前111年存在于岭南地区的一个国家\nline 3\ntrailing',
'line 1\nline 2\nline 3 ends with newline\n'
];

async function testSimple() {
for (const fileContent of testContents) {
fs.writeFileSync(filename, fileContent);

const readable = fs.createReadStream(filename);
const rli = readline.createInterface({
input: readable,
crlfDelay: Infinity
});

const iteratedLines = [];
for await (const k of rli) {
iteratedLines.push(k);
}

const expectedLines = fileContent.split('\n');
if (expectedLines[expectedLines.length - 1] === '') {
expectedLines.pop();
}
assert.deepStrictEqual(iteratedLines, expectedLines);
assert.strictEqual(iteratedLines.join(''), fileContent.replace(/\n/gm, ''));
}
}

async function testMutual() {
for (const fileContent of testContents) {
fs.writeFileSync(filename, fileContent);

const readable = fs.createReadStream(filename);
const rli = readline.createInterface({
input: readable,
crlfDelay: Infinity
});

const expectedLines = fileContent.split('\n');
if (expectedLines[expectedLines.length - 1] === '') {
expectedLines.pop();
}
const iteratedLines = [];
let iterated = false;
for await (const k of rli) {
// This outer loop should only iterate once.
assert.strictEqual(iterated, false);
iterated = true;

iteratedLines.push(k);
for await (const l of rli) {
iteratedLines.push(l);
}
assert.deepStrictEqual(iteratedLines, expectedLines);
}
assert.deepStrictEqual(iteratedLines, expectedLines);
}
}

testSimple().then(testMutual).then(common.mustCall());
2 changes: 1 addition & 1 deletion tools/doc/type-parser.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const customTypesMap = {

'this': `${jsDocPrefix}Reference/Operators/this`,

'AsyncIterator': 'https://github.com/tc39/proposal-async-iteration',
'AsyncIterator': 'https://tc39.github.io/ecma262/#sec-asynciterator-interface',

'bigint': 'https://github.com/tc39/proposal-bigint',

Expand Down

0 comments on commit 2a7432d

Please sign in to comment.