Skip to content

Commit

Permalink
stream: expose DuplexPair API
Browse files Browse the repository at this point in the history
PR-URL: #34111
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
  • Loading branch information
awwright authored and marco-ippolito committed Aug 19, 2024
1 parent 6a5120f commit 7c21bb9
Show file tree
Hide file tree
Showing 32 changed files with 230 additions and 123 deletions.
32 changes: 30 additions & 2 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@ There are four fundamental stream types within Node.js:
is written and read (for example, [`zlib.createDeflate()`][]).

Additionally, this module includes the utility functions
[`stream.pipeline()`][], [`stream.finished()`][], [`stream.Readable.from()`][]
and [`stream.addAbortSignal()`][].
[`stream.duplexPair()`][],
[`stream.pipeline()`][],
[`stream.finished()`][]
[`stream.Readable.from()`][], and
[`stream.addAbortSignal()`][].

### Streams Promises API

Expand Down Expand Up @@ -2700,6 +2703,30 @@ unless `emitClose` is set in false.
Once `destroy()` has been called, any further calls will be a no-op and no
further errors except from `_destroy()` may be emitted as `'error'`.

#### `stream.duplexPair([options])`

<!-- YAML
added: REPLACEME
-->

* `options` {Object} A value to pass to both [`Duplex`][] constructors,
to set options such as buffering.
* Returns: {Array} of two [`Duplex`][] instances.

The utility function `duplexPair` returns an Array with two items,
each being a `Duplex` stream connected to the other side:

```js
const [ sideA, sideB ] = duplexPair();
```

Whatever is written to one stream is made readable on the other. It provides
behavior analogous to a network connection, where the data written by the client
becomes readable by the server, and vice-versa.

The Duplex streams are symmetrical; one or the other may be used without any
difference in behavior.

### `stream.finished(stream[, options], callback)`

<!-- YAML
Expand Down Expand Up @@ -4872,6 +4899,7 @@ contain multi-byte characters.
[`stream.addAbortSignal()`]: #streamaddabortsignalsignal-stream
[`stream.compose`]: #streamcomposestreams
[`stream.cork()`]: #writablecork
[`stream.duplexPair()`]: #streamduplexpairoptions
[`stream.finished()`]: #streamfinishedstream-options-callback
[`stream.pipe()`]: #readablepipedestination-options
[`stream.pipeline()`]: #streampipelinesource-transforms-destination-callback
Expand Down
62 changes: 62 additions & 0 deletions lib/internal/streams/duplexpair.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
'use strict';
const {
Symbol,
} = primordials;

const { Duplex } = require('stream');
const assert = require('internal/assert');

const kCallback = Symbol('Callback');
const kInitOtherSide = Symbol('InitOtherSide');

class DuplexSide extends Duplex {
#otherSide = null;

constructor(options) {
super(options);
this[kCallback] = null;
this.#otherSide = null;
}

[kInitOtherSide](otherSide) {
// Ensure this can only be set once, to enforce encapsulation.
if (this.#otherSide === null) {
this.#otherSide = otherSide;
} else {
assert(this.#otherSide === null);
}
}

_read() {
const callback = this[kCallback];
if (callback) {
this[kCallback] = null;
callback();
}
}

_write(chunk, encoding, callback) {
assert(this.#otherSide !== null);
assert(this.#otherSide[kCallback] === null);
if (chunk.length === 0) {
process.nextTick(callback);
} else {
this.#otherSide.push(chunk);
this.#otherSide[kCallback] = callback;
}
}

_final(callback) {
this.#otherSide.on('end', callback);
this.#otherSide.push(null);
}
}

function duplexPair(options) {
const side0 = new DuplexSide(options);
const side1 = new DuplexSide(options);
side0[kInitOtherSide](side1);
side1[kInitOtherSide](side0);
return [ side0, side1 ];
}
module.exports = duplexPair;
1 change: 1 addition & 0 deletions lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ Stream.Writable = require('internal/streams/writable');
Stream.Duplex = require('internal/streams/duplex');
Stream.Transform = require('internal/streams/transform');
Stream.PassThrough = require('internal/streams/passthrough');
Stream.duplexPair = require('internal/streams/duplexpair');
Stream.pipeline = pipeline;
const { addAbortSignal } = require('internal/streams/add-abort-signal');
Stream.addAbortSignal = addAbortSignal;
Expand Down
9 changes: 0 additions & 9 deletions test/common/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ This directory contains modules used to test the Node.js implementation.
* [CPU Profiler module](#cpu-profiler-module)
* [Debugger module](#debugger-module)
* [DNS module](#dns-module)
* [Duplex pair helper](#duplex-pair-helper)
* [Environment variables](#environment-variables)
* [Fixtures module](#fixtures-module)
* [Heap dump checker module](#heap-dump-checker-module)
Expand Down Expand Up @@ -669,14 +668,6 @@ Reads a Domain String and returns a Buffer containing the domain.
Takes in a parsed Object and writes its fields to a DNS packet as a Buffer
object.

## Duplex pair helper

The `common/duplexpair` module exports a single function `makeDuplexPair`,
which returns an object `{ clientSide, serverSide }` where each side is a
`Duplex` stream connected to the other side.

There is no difference between client or server side beyond their names.

## Environment variables

The behavior of the Node.js test suite can be altered using the following
Expand Down
48 changes: 0 additions & 48 deletions test/common/duplexpair.js

This file was deleted.

1 change: 1 addition & 0 deletions test/parallel/test-bootstrap-modules.js
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ if (common.isMainThread) {
'NativeModule internal/streams/compose',
'NativeModule internal/streams/destroy',
'NativeModule internal/streams/duplex',
'NativeModule internal/streams/duplexpair',
'NativeModule internal/streams/end-of-stream',
'NativeModule internal/streams/from',
'NativeModule internal/streams/legacy',
Expand Down
4 changes: 2 additions & 2 deletions test/parallel/test-gc-tls-external-memory.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');

const makeDuplexPair = require('../common/duplexpair');
const { duplexPair } = require('stream');
const onGC = require('../common/ongc');
const assert = require('assert');
const tls = require('tls');
Expand Down Expand Up @@ -37,7 +37,7 @@ function connect() {
return;
}

const { clientSide, serverSide } = makeDuplexPair();
const [ clientSide, serverSide ] = duplexPair();

const tlsSocket = tls.connect({ socket: clientSide });
tlsSocket.on('error', common.mustCall(connect));
Expand Down
4 changes: 2 additions & 2 deletions test/parallel/test-http-agent-domain-reused-gc.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
const common = require('../common');
const http = require('http');
const async_hooks = require('async_hooks');
const makeDuplexPair = require('../common/duplexpair');
const { duplexPair } = require('stream');

// Regression test for https://github.com/nodejs/node/issues/30122
// When a domain is attached to an http Agent’s ReusedHandle object, that
Expand Down Expand Up @@ -36,7 +36,7 @@ async_hooks.createHook({
// attached to too many objects that use strong references (timers, the network
// socket handle, etc.) and wrap the client side in a JSStreamSocket so we don’t
// have to implement the whole _handle API ourselves.
const { serverSide, clientSide } = makeDuplexPair();
const [ serverSide, clientSide ] = duplexPair();
const JSStreamSocket = require('internal/js_stream_socket');
const wrappedClientSide = new JSStreamSocket(clientSide);

Expand Down
12 changes: 6 additions & 6 deletions test/parallel/test-http-generic-streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
const common = require('../common');
const assert = require('assert');
const http = require('http');
const MakeDuplexPair = require('../common/duplexpair');
const { duplexPair } = require('stream');

// Test 1: Simple HTTP test, no keep-alive.
{
Expand All @@ -13,7 +13,7 @@ const MakeDuplexPair = require('../common/duplexpair');
res.end(testData);
}));

const { clientSide, serverSide } = MakeDuplexPair();
const [ clientSide, serverSide ] = duplexPair();
server.emit('connection', serverSide);

const req = http.request({
Expand All @@ -37,7 +37,7 @@ const MakeDuplexPair = require('../common/duplexpair');
res.end(testData);
}, 2));

const { clientSide, serverSide } = MakeDuplexPair();
const [ clientSide, serverSide ] = duplexPair();
server.emit('connection', serverSide);

function doRequest(cb) {
Expand Down Expand Up @@ -77,7 +77,7 @@ const MakeDuplexPair = require('../common/duplexpair');
});
}));

const { clientSide, serverSide } = MakeDuplexPair();
const [ clientSide, serverSide ] = duplexPair();
server.emit('connection', serverSide);
clientSide.on('end', common.mustCall());
serverSide.on('end', common.mustCall());
Expand Down Expand Up @@ -117,7 +117,7 @@ const MakeDuplexPair = require('../common/duplexpair');

}));

const { clientSide, serverSide } = MakeDuplexPair();
const [ clientSide, serverSide ] = duplexPair();
server.emit('connection', serverSide);
clientSide.on('end', common.mustCall());
serverSide.on('end', common.mustCall());
Expand All @@ -143,7 +143,7 @@ const MakeDuplexPair = require('../common/duplexpair');
{
const server = http.createServer(common.mustNotCall());

const { clientSide, serverSide } = MakeDuplexPair();
const [ clientSide, serverSide ] = duplexPair();
server.emit('connection', serverSide);

server.on('clientError', common.mustCall());
Expand Down
10 changes: 5 additions & 5 deletions test/parallel/test-http-insecure-parser-per-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
const common = require('../common');
const assert = require('assert');
const http = require('http');
const MakeDuplexPair = require('../common/duplexpair');
const { duplexPair } = require('stream');

// Test that setting the `maxHeaderSize` option works on a per-stream-basis.

// Test 1: The server sends an invalid header.
{
const { clientSide, serverSide } = MakeDuplexPair();
const [ clientSide, serverSide ] = duplexPair();

const req = http.request({
createConnection: common.mustCall(() => clientSide),
Expand All @@ -30,7 +30,7 @@ const MakeDuplexPair = require('../common/duplexpair');

// Test 2: The same as Test 1 except without the option, to make sure it fails.
{
const { clientSide, serverSide } = MakeDuplexPair();
const [ clientSide, serverSide ] = duplexPair();

const req = http.request({
createConnection: common.mustCall(() => clientSide)
Expand Down Expand Up @@ -59,7 +59,7 @@ const MakeDuplexPair = require('../common/duplexpair');

server.on('clientError', common.mustNotCall());

const { clientSide, serverSide } = MakeDuplexPair();
const [ clientSide, serverSide ] = duplexPair();
serverSide.server = server;
server.emit('connection', serverSide);

Expand All @@ -75,7 +75,7 @@ const MakeDuplexPair = require('../common/duplexpair');

server.on('clientError', common.mustCall());

const { clientSide, serverSide } = MakeDuplexPair();
const [ clientSide, serverSide ] = duplexPair();
serverSide.server = server;
server.emit('connection', serverSide);

Expand Down
10 changes: 5 additions & 5 deletions test/parallel/test-http-max-header-size-per-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
const common = require('../common');
const assert = require('assert');
const http = require('http');
const MakeDuplexPair = require('../common/duplexpair');
const { duplexPair } = require('stream');

// Test that setting the `maxHeaderSize` option works on a per-stream-basis.

// Test 1: The server sends larger headers than what would otherwise be allowed.
{
const { clientSide, serverSide } = MakeDuplexPair();
const [ clientSide, serverSide ] = duplexPair();

const req = http.request({
createConnection: common.mustCall(() => clientSide),
Expand All @@ -29,7 +29,7 @@ const MakeDuplexPair = require('../common/duplexpair');

// Test 2: The same as Test 1 except without the option, to make sure it fails.
{
const { clientSide, serverSide } = MakeDuplexPair();
const [ clientSide, serverSide ] = duplexPair();

const req = http.request({
createConnection: common.mustCall(() => clientSide)
Expand Down Expand Up @@ -57,7 +57,7 @@ const MakeDuplexPair = require('../common/duplexpair');

server.on('clientError', common.mustNotCall());

const { clientSide, serverSide } = MakeDuplexPair();
const [ clientSide, serverSide ] = duplexPair();
serverSide.server = server;
server.emit('connection', serverSide);

Expand All @@ -73,7 +73,7 @@ const MakeDuplexPair = require('../common/duplexpair');

server.on('clientError', common.mustCall());

const { clientSide, serverSide } = MakeDuplexPair();
const [ clientSide, serverSide ] = duplexPair();
serverSide.server = server;
server.emit('connection', serverSide);

Expand Down
4 changes: 2 additions & 2 deletions test/parallel/test-http-sync-write-error-during-continue.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
const common = require('../common');
const assert = require('assert');
const http = require('http');
const MakeDuplexPair = require('../common/duplexpair');
const { duplexPair } = require('stream');

// Regression test for the crash reported in
// https://github.com/nodejs/node/issues/15102 (httpParser.finish() is called
// during httpParser.execute()):

{
const { clientSide, serverSide } = MakeDuplexPair();
const [ clientSide, serverSide ] = duplexPair();

serverSide.on('data', common.mustCall((data) => {
assert.strictEqual(data.toString('utf8'), `\
Expand Down
Loading

0 comments on commit 7c21bb9

Please sign in to comment.