diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 9247292ff47fce..0b42bcd60c82ee 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -306,8 +306,23 @@ function onStreamClose(code) { if (state.fd !== undefined) tryClose(state.fd); - stream.push(null); - stream[kMaybeDestroy](null, code); + + // Defer destroy we actually emit end. + if (stream._readableState.endEmitted || code !== NGHTTP2_NO_ERROR) { + // If errored or ended, we can destroy immediately. + stream[kMaybeDestroy](null, code); + } else { + // Wait for end to destroy. + stream.on('end', stream[kMaybeDestroy]); + // Push a null so the stream can end whenever the client consumes + // it completely. + stream.push(null); + + // Same as net. + if (stream.readableLength === 0) { + stream.read(0); + } + } } // Receives a chunk of data for a given stream and forwards it on @@ -325,11 +340,19 @@ function onStreamRead(nread, buf) { } return; } + // Last chunk was received. End the readable side. debug(`Http2Stream ${stream[kID]} [Http2Session ` + `${sessionName(stream[kSession][kType])}]: ending readable.`); - stream.push(null); - stream[kMaybeDestroy](); + + // defer this until we actually emit end + if (stream._readableState.endEmitted) { + stream[kMaybeDestroy](); + } else { + stream.on('end', stream[kMaybeDestroy]); + stream.push(null); + stream.read(0); + } } // Called when the remote peer settings have been updated. @@ -1825,21 +1848,25 @@ class Http2Stream extends Duplex { session[kMaybeDestroy](); process.nextTick(emit, this, 'close', code); callback(err); - } + } // The Http2Stream can be destroyed if it has closed and if the readable // side has received the final chunk. [kMaybeDestroy](error, code = NGHTTP2_NO_ERROR) { - if (error == null) { - if (code === NGHTTP2_NO_ERROR && - (!this._readableState.ended || - !this._writableState.ended || - this._writableState.pendingcb > 0 || - !this.closed)) { - return; - } + if (error || code !== NGHTTP2_NO_ERROR) { + this.destroy(error); + return; + } + + // TODO(mcollina): remove usage of _*State properties + if (this._readableState.ended && + this._writableState.ended && + this._writableState.pendingcb === 0 && + this.closed) { + this.destroy(); + // This should return, but eslint complains. + // return } - this.destroy(error); } } diff --git a/test/parallel/test-http2-compat-short-stream-client-server.js b/test/parallel/test-http2-compat-short-stream-client-server.js new file mode 100644 index 00000000000000..f7ef9412106f59 --- /dev/null +++ b/test/parallel/test-http2-compat-short-stream-client-server.js @@ -0,0 +1,50 @@ +'use strict'; + +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); +const assert = require('assert'); +const http2 = require('http2'); +const { Readable } = require('stream'); + +const server = http2.createServer(common.mustCall((req, res) => { + res.setHeader('content-type', 'text/html'); + const input = new Readable({ + read() { + this.push('test'); + this.push(null); + } + }); + input.pipe(res); +})); + +server.listen(0, common.mustCall(() => { + const port = server.address().port; + const client = http2.connect(`http://localhost:${port}`); + + const req = client.request(); + + req.on('response', common.mustCall((headers) => { + assert.strictEqual(headers[':status'], 200); + assert.strictEqual(headers['content-type'], 'text/html'); + })); + + let data = ''; + + const notCallClose = common.mustNotCall(); + + setTimeout(() => { + req.setEncoding('utf8'); + req.removeListener('close', notCallClose); + req.on('close', common.mustCall(() => { + server.close(); + client.close(); + })); + req.on('data', common.mustCallAtLeast((d) => data += d)); + req.on('end', common.mustCall(() => { + assert.strictEqual(data, 'test'); + })); + }, common.platformTimeout(100)); + + req.on('close', notCallClose); +})); diff --git a/test/parallel/test-http2-short-stream-client-server.js b/test/parallel/test-http2-short-stream-client-server.js new file mode 100644 index 00000000000000..e632b8d96b9ea9 --- /dev/null +++ b/test/parallel/test-http2-short-stream-client-server.js @@ -0,0 +1,55 @@ +'use strict'; + +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); +const assert = require('assert'); +const http2 = require('http2'); +const { Readable } = require('stream'); + +const server = http2.createServer(); +server.on('stream', common.mustCall((stream) => { + stream.respond({ + ':status': 200, + 'content-type': 'text/html' + }); + const input = new Readable({ + read() { + this.push('test'); + this.push(null); + } + }); + input.pipe(stream); +})); + + +server.listen(0, common.mustCall(() => { + const port = server.address().port; + const client = http2.connect(`http://localhost:${port}`); + + const req = client.request(); + + req.on('response', common.mustCall((headers) => { + assert.strictEqual(headers[':status'], 200); + assert.strictEqual(headers['content-type'], 'text/html'); + })); + + let data = ''; + + const notCallClose = common.mustNotCall(); + + setTimeout(() => { + req.setEncoding('utf8'); + req.removeListener('close', notCallClose); + req.on('close', common.mustCall(() => { + server.close(); + client.close(); + })); + req.on('data', common.mustCallAtLeast((d) => data += d)); + req.on('end', common.mustCall(() => { + assert.strictEqual(data, 'test'); + })); + }, common.platformTimeout(100)); + + req.on('close', notCallClose); +}));