Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: Expose DuplexPair API #34111

Merged
merged 1 commit into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@ There are four fundamental stream types within Node.js:
is written and read (for example, [`zlib.createDeflate()`][]).

Additionally, this module includes the utility functions
[`stream.pipeline()`][], [`stream.finished()`][], [`stream.Readable.from()`][]
and [`stream.addAbortSignal()`][].
[`stream.duplexPair()`][],
[`stream.pipeline()`][],
[`stream.finished()`][]
[`stream.Readable.from()`][], and
[`stream.addAbortSignal()`][].

### Streams Promises API

Expand Down Expand Up @@ -2681,6 +2684,30 @@ unless `emitClose` is set in false.
Once `destroy()` has been called, any further calls will be a no-op and no
further errors except from `_destroy()` may be emitted as `'error'`.

#### `stream.duplexPair([options])`

<!-- YAML
added: REPLACEME
-->

* `options` {Object} A value to pass to both [`Duplex`][] constructors,
to set options such as buffering.
* Returns: {Array} of two [`Duplex`][] instances.

The utility function `duplexPair` returns an Array with two items,
each being a `Duplex` stream connected to the other side:

```js
const [ sideA, sideB ] = duplexPair();
```

Whatever is written to one stream is made readable on the other. It provides
behavior analogous to a network connection, where the data written by the client
becomes readable by the server, and vice-versa.

The Duplex streams are symmetrical; one or the other may be used without any
difference in behavior.

### `stream.finished(stream[, options], callback)`

<!-- YAML
Expand Down Expand Up @@ -4873,6 +4900,7 @@ contain multi-byte characters.
[`stream.addAbortSignal()`]: #streamaddabortsignalsignal-stream
[`stream.compose`]: #streamcomposestreams
[`stream.cork()`]: #writablecork
[`stream.duplexPair()`]: #streamduplexpairoptions
[`stream.finished()`]: #streamfinishedstream-options-callback
[`stream.pipe()`]: #readablepipedestination-options
[`stream.pipeline()`]: #streampipelinesource-transforms-destination-callback
Expand Down
62 changes: 62 additions & 0 deletions lib/internal/streams/duplexpair.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
'use strict';
const {
Symbol,
} = primordials;

const { Duplex } = require('stream');
const assert = require('internal/assert');

const kCallback = Symbol('Callback');
const kInitOtherSide = Symbol('InitOtherSide');

class DuplexSide extends Duplex {
#otherSide = null;

constructor(options) {
super(options);
this[kCallback] = null;
this.#otherSide = null;
}

[kInitOtherSide](otherSide) {
// Ensure this can only be set once, to enforce encapsulation.
if (this.#otherSide === null) {
this.#otherSide = otherSide;
} else {
assert(this.#otherSide === null);
}
}

_read() {
const callback = this[kCallback];
if (callback) {
this[kCallback] = null;
callback();
}
}

_write(chunk, encoding, callback) {
assert(this.#otherSide !== null);
assert(this.#otherSide[kCallback] === null);
if (chunk.length === 0) {
process.nextTick(callback);
} else {
this.#otherSide.push(chunk);
this.#otherSide[kCallback] = callback;
}
}

_final(callback) {
this.#otherSide.on('end', callback);
this.#otherSide.push(null);
}
}

function duplexPair(options) {
const side0 = new DuplexSide(options);
const side1 = new DuplexSide(options);
side0[kInitOtherSide](side1);
side1[kInitOtherSide](side0);
return [ side0, side1 ];
}
module.exports = duplexPair;
1 change: 1 addition & 0 deletions lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ Stream.Writable = require('internal/streams/writable');
Stream.Duplex = require('internal/streams/duplex');
Stream.Transform = require('internal/streams/transform');
Stream.PassThrough = require('internal/streams/passthrough');
Stream.duplexPair = require('internal/streams/duplexpair');
Stream.pipeline = pipeline;
const { addAbortSignal } = require('internal/streams/add-abort-signal');
Stream.addAbortSignal = addAbortSignal;
Expand Down
9 changes: 0 additions & 9 deletions test/common/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ This directory contains modules used to test the Node.js implementation.
* [CPU Profiler module](#cpu-profiler-module)
* [Debugger module](#debugger-module)
* [DNS module](#dns-module)
* [Duplex pair helper](#duplex-pair-helper)
* [Environment variables](#environment-variables)
* [Fixtures module](#fixtures-module)
* [Heap dump checker module](#heap-dump-checker-module)
Expand Down Expand Up @@ -669,14 +668,6 @@ Reads a Domain String and returns a Buffer containing the domain.
Takes in a parsed Object and writes its fields to a DNS packet as a Buffer
object.

## Duplex pair helper

The `common/duplexpair` module exports a single function `makeDuplexPair`,
which returns an object `{ clientSide, serverSide }` where each side is a
`Duplex` stream connected to the other side.

There is no difference between client or server side beyond their names.

## Environment variables

The behavior of the Node.js test suite can be altered using the following
Expand Down
48 changes: 0 additions & 48 deletions test/common/duplexpair.js

This file was deleted.

1 change: 1 addition & 0 deletions test/parallel/test-bootstrap-modules.js
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ if (common.isMainThread) {
'NativeModule internal/streams/compose',
'NativeModule internal/streams/destroy',
'NativeModule internal/streams/duplex',
'NativeModule internal/streams/duplexpair',
'NativeModule internal/streams/end-of-stream',
'NativeModule internal/streams/from',
'NativeModule internal/streams/legacy',
Expand Down
4 changes: 2 additions & 2 deletions test/parallel/test-gc-tls-external-memory.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');

const makeDuplexPair = require('../common/duplexpair');
const { duplexPair } = require('stream');
const onGC = require('../common/ongc');
const assert = require('assert');
const tls = require('tls');
Expand Down Expand Up @@ -37,7 +37,7 @@ function connect() {
return;
}

const { clientSide, serverSide } = makeDuplexPair();
const [ clientSide, serverSide ] = duplexPair();

const tlsSocket = tls.connect({ socket: clientSide });
tlsSocket.on('error', common.mustCall(connect));
Expand Down
4 changes: 2 additions & 2 deletions test/parallel/test-http-agent-domain-reused-gc.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
const common = require('../common');
const http = require('http');
const async_hooks = require('async_hooks');
const makeDuplexPair = require('../common/duplexpair');
const { duplexPair } = require('stream');

// Regression test for https://github.com/nodejs/node/issues/30122
// When a domain is attached to an http Agent’s ReusedHandle object, that
Expand Down Expand Up @@ -36,7 +36,7 @@ async_hooks.createHook({
// attached to too many objects that use strong references (timers, the network
// socket handle, etc.) and wrap the client side in a JSStreamSocket so we don’t
// have to implement the whole _handle API ourselves.
const { serverSide, clientSide } = makeDuplexPair();
const [ serverSide, clientSide ] = duplexPair();
const JSStreamSocket = require('internal/js_stream_socket');
const wrappedClientSide = new JSStreamSocket(clientSide);

Expand Down
12 changes: 6 additions & 6 deletions test/parallel/test-http-generic-streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
const common = require('../common');
const assert = require('assert');
const http = require('http');
const MakeDuplexPair = require('../common/duplexpair');
const { duplexPair } = require('stream');

// Test 1: Simple HTTP test, no keep-alive.
{
Expand All @@ -13,7 +13,7 @@ const MakeDuplexPair = require('../common/duplexpair');
res.end(testData);
}));

const { clientSide, serverSide } = MakeDuplexPair();
const [ clientSide, serverSide ] = duplexPair();
server.emit('connection', serverSide);

const req = http.request({
Expand All @@ -37,7 +37,7 @@ const MakeDuplexPair = require('../common/duplexpair');
res.end(testData);
}, 2));

const { clientSide, serverSide } = MakeDuplexPair();
const [ clientSide, serverSide ] = duplexPair();
server.emit('connection', serverSide);

function doRequest(cb) {
Expand Down Expand Up @@ -77,7 +77,7 @@ const MakeDuplexPair = require('../common/duplexpair');
});
}));

const { clientSide, serverSide } = MakeDuplexPair();
const [ clientSide, serverSide ] = duplexPair();
server.emit('connection', serverSide);
clientSide.on('end', common.mustCall());
serverSide.on('end', common.mustCall());
Expand Down Expand Up @@ -117,7 +117,7 @@ const MakeDuplexPair = require('../common/duplexpair');

}));

const { clientSide, serverSide } = MakeDuplexPair();
const [ clientSide, serverSide ] = duplexPair();
server.emit('connection', serverSide);
clientSide.on('end', common.mustCall());
serverSide.on('end', common.mustCall());
Expand All @@ -143,7 +143,7 @@ const MakeDuplexPair = require('../common/duplexpair');
{
const server = http.createServer(common.mustNotCall());

const { clientSide, serverSide } = MakeDuplexPair();
const [ clientSide, serverSide ] = duplexPair();
server.emit('connection', serverSide);

server.on('clientError', common.mustCall());
Expand Down
10 changes: 5 additions & 5 deletions test/parallel/test-http-insecure-parser-per-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
const common = require('../common');
const assert = require('assert');
const http = require('http');
const MakeDuplexPair = require('../common/duplexpair');
const { duplexPair } = require('stream');

// Test that setting the `maxHeaderSize` option works on a per-stream-basis.

// Test 1: The server sends an invalid header.
{
const { clientSide, serverSide } = MakeDuplexPair();
const [ clientSide, serverSide ] = duplexPair();

const req = http.request({
createConnection: common.mustCall(() => clientSide),
Expand All @@ -30,7 +30,7 @@ const MakeDuplexPair = require('../common/duplexpair');

// Test 2: The same as Test 1 except without the option, to make sure it fails.
{
const { clientSide, serverSide } = MakeDuplexPair();
const [ clientSide, serverSide ] = duplexPair();

const req = http.request({
createConnection: common.mustCall(() => clientSide)
Expand Down Expand Up @@ -59,7 +59,7 @@ const MakeDuplexPair = require('../common/duplexpair');

server.on('clientError', common.mustNotCall());

const { clientSide, serverSide } = MakeDuplexPair();
const [ clientSide, serverSide ] = duplexPair();
serverSide.server = server;
server.emit('connection', serverSide);

Expand All @@ -75,7 +75,7 @@ const MakeDuplexPair = require('../common/duplexpair');

server.on('clientError', common.mustCall());

const { clientSide, serverSide } = MakeDuplexPair();
const [ clientSide, serverSide ] = duplexPair();
serverSide.server = server;
server.emit('connection', serverSide);

Expand Down
10 changes: 5 additions & 5 deletions test/parallel/test-http-max-header-size-per-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
const common = require('../common');
const assert = require('assert');
const http = require('http');
const MakeDuplexPair = require('../common/duplexpair');
const { duplexPair } = require('stream');

// Test that setting the `maxHeaderSize` option works on a per-stream-basis.

// Test 1: The server sends larger headers than what would otherwise be allowed.
{
const { clientSide, serverSide } = MakeDuplexPair();
const [ clientSide, serverSide ] = duplexPair();

const req = http.request({
createConnection: common.mustCall(() => clientSide),
Expand All @@ -29,7 +29,7 @@ const MakeDuplexPair = require('../common/duplexpair');

// Test 2: The same as Test 1 except without the option, to make sure it fails.
{
const { clientSide, serverSide } = MakeDuplexPair();
const [ clientSide, serverSide ] = duplexPair();

const req = http.request({
createConnection: common.mustCall(() => clientSide)
Expand Down Expand Up @@ -57,7 +57,7 @@ const MakeDuplexPair = require('../common/duplexpair');

server.on('clientError', common.mustNotCall());

const { clientSide, serverSide } = MakeDuplexPair();
const [ clientSide, serverSide ] = duplexPair();
serverSide.server = server;
server.emit('connection', serverSide);

Expand All @@ -73,7 +73,7 @@ const MakeDuplexPair = require('../common/duplexpair');

server.on('clientError', common.mustCall());

const { clientSide, serverSide } = MakeDuplexPair();
const [ clientSide, serverSide ] = duplexPair();
serverSide.server = server;
server.emit('connection', serverSide);

Expand Down
4 changes: 2 additions & 2 deletions test/parallel/test-http-sync-write-error-during-continue.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
const common = require('../common');
const assert = require('assert');
const http = require('http');
const MakeDuplexPair = require('../common/duplexpair');
const { duplexPair } = require('stream');

// Regression test for the crash reported in
// https://github.com/nodejs/node/issues/15102 (httpParser.finish() is called
// during httpParser.execute()):

{
const { clientSide, serverSide } = MakeDuplexPair();
const [ clientSide, serverSide ] = duplexPair();

serverSide.on('data', common.mustCall((data) => {
assert.strictEqual(data.toString('utf8'), `\
Expand Down
Loading
Loading