From 9e0d18fd3f7e1fcd31ba6f714564d6fdd98b2eb9 Mon Sep 17 00:00:00 2001 From: Carlos Lopez Date: Sun, 14 Jun 2020 11:10:32 -0400 Subject: [PATCH] http2: use and support non-empty DATA frame with END_STREAM flag Adds support for reading from a stream where the final frame is a non-empty DATA frame with the END_STREAM flag set, instead of hanging waiting for another frame. When writing to a stream, uses a END_STREAM flag on final DATA frame instead of adding an empty DATA frame. BREAKING: http2 client now expects servers to properly support END_STREAM flag Fixes: https://github.com/nodejs/node/issues/31309 Fixes: https://github.com/nodejs/node/issues/33891 Refs: https://nghttp2.org/documentation/types.html#c.nghttp2_on_data_chunk_recv_callback PR-URL: https://github.com/nodejs/node/pull/33875 Backport-PR-URL: https://github.com/nodejs/node/pull/34838 Reviewed-By: Anna Henningsen Reviewed-By: James M Snell --- lib/internal/http2/core.js | 104 ++++++++++++++---- src/node_http2.cc | 13 ++- .../test-http2-misbehaving-multiplex.js | 56 +++++++--- .../test-http2-pack-end-stream-flag.js | 61 ++++++++++ test/parallel/test-http2-padding-aligned.js | 2 +- test/parallel/test-http2-perf_hooks.js | 2 +- 6 files changed, 191 insertions(+), 47 deletions(-) create mode 100644 test/parallel/test-http2-pack-end-stream-flag.js diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index f4a53c931850fd..1bd0a9f7ae90bc 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -1136,6 +1136,7 @@ class Http2Session extends EventEmitter { streams: new Map(), pendingStreams: new Set(), pendingAck: 0, + shutdownWritableCalled: false, writeQueueSize: 0, originSet: undefined }; @@ -1702,6 +1703,26 @@ function afterShutdown(status) { stream[kMaybeDestroy](); } +function shutdownWritable(callback) { + const handle = this[kHandle]; + if (!handle) return callback(); + const state = this[kState]; + if (state.shutdownWritableCalled) { + // Backport v14.x: Session required for debugging stream object + // debugStreamObj(this, 'shutdownWritable() already called'); + return callback(); + } + state.shutdownWritableCalled = true; + + const req = new ShutdownWrap(); + req.oncomplete = afterShutdown; + req.callback = callback; + req.handle = handle; + const err = handle.shutdown(req); + if (err === 1) // synchronous finish + return afterShutdown.call(req, 0); +} + function finishSendTrailers(stream, headersList) { // The stream might be destroyed and in that case // there is nothing to do. @@ -1962,10 +1983,48 @@ class Http2Stream extends Duplex { let req; + let waitingForWriteCallback = true; + let waitingForEndCheck = true; + let writeCallbackErr; + let endCheckCallbackErr; + const done = () => { + if (waitingForEndCheck || waitingForWriteCallback) return; + const err = writeCallbackErr || endCheckCallbackErr; + // writeGeneric does not destroy on error and + // we cannot enable autoDestroy, + // so make sure to destroy on error. + if (err) { + this.destroy(err); + } + cb(err); + }; + const writeCallback = (err) => { + waitingForWriteCallback = false; + writeCallbackErr = err; + done(); + }; + const endCheckCallback = (err) => { + waitingForEndCheck = false; + endCheckCallbackErr = err; + done(); + }; + // Shutdown write stream right after last chunk is sent + // so final DATA frame can include END_STREAM flag + process.nextTick(() => { + if (writeCallbackErr || + !this._writableState.ending || + this._writableState.buffered.length || + (this[kState].flags & STREAM_FLAGS_HAS_TRAILERS)) + return endCheckCallback(); + // Backport v14.x: Session required for debugging stream object + // debugStreamObj(this, 'shutting down writable on last write'); + shutdownWritable.call(this, endCheckCallback); + }); + if (writev) - req = writevGeneric(this, data, cb); + req = writevGeneric(this, data, writeCallback); else - req = writeGeneric(this, data, encoding, cb); + req = writeGeneric(this, data, encoding, writeCallback); trackWriteState(this, req.bytes); } @@ -1979,21 +2038,13 @@ class Http2Stream extends Duplex { } _final(cb) { - const handle = this[kHandle]; if (this.pending) { this.once('ready', () => this._final(cb)); - } else if (handle !== undefined) { - debugStreamObj(this, '_final shutting down'); - const req = new ShutdownWrap(); - req.oncomplete = afterShutdown; - req.callback = cb; - req.handle = handle; - const err = handle.shutdown(req); - if (err === 1) // synchronous finish - return afterShutdown.call(req, 0); - } else { - cb(); + return; } + // Backport v14.x: Session required for debugging stream object + // debugStreamObj(this, 'shutting down writable on _final'); + shutdownWritable.call(this, cb); } _read(nread) { @@ -2098,11 +2149,20 @@ class Http2Stream extends Duplex { debugStream(this[kID] || 'pending', session[kType], 'destroying stream'); const state = this[kState]; - const sessionCode = session[kState].goawayCode || - session[kState].destroyCode; - const code = err != null ? - sessionCode || NGHTTP2_INTERNAL_ERROR : - state.rstCode || sessionCode; + const sessionState = session[kState]; + const sessionCode = sessionState.goawayCode || sessionState.destroyCode; + + // If a stream has already closed successfully, there is no error + // to report from this stream, even if the session has errored. + // This can happen if the stream was already in process of destroying + // after a successful close, but the session had a error between + // this stream's close and destroy operations. + // Previously, this always overrode a successful close operation code + // NGHTTP2_NO_ERROR (0) with sessionCode because the use of the || operator. + const code = (err != null ? + (sessionCode || NGHTTP2_INTERNAL_ERROR) : + (this.closed ? this.rstCode : sessionCode) + ); const hasHandle = handle !== undefined; if (!this.closed) @@ -2111,13 +2171,13 @@ class Http2Stream extends Duplex { if (hasHandle) { handle.destroy(); - session[kState].streams.delete(id); + sessionState.streams.delete(id); } else { - session[kState].pendingStreams.delete(this); + sessionState.pendingStreams.delete(this); } // Adjust the write queue size for accounting - session[kState].writeQueueSize -= state.writeQueueSize; + sessionState.writeQueueSize -= state.writeQueueSize; state.writeQueueSize = 0; // RST code 8 not emitted as an error as its used by clients to signify diff --git a/src/node_http2.cc b/src/node_http2.cc index 5aa817a1ac8442..b5b38fc976158b 100644 --- a/src/node_http2.cc +++ b/src/node_http2.cc @@ -732,7 +732,7 @@ ssize_t Http2Session::OnMaxFrameSizePadding(size_t frameLen, // quite expensive. This is a potential performance optimization target later. ssize_t Http2Session::ConsumeHTTP2Data() { CHECK_NOT_NULL(stream_buf_.base); - CHECK_LT(stream_buf_offset_, stream_buf_.len); + CHECK_LE(stream_buf_offset_, stream_buf_.len); size_t read_len = stream_buf_.len - stream_buf_offset_; // multiple side effects. @@ -753,11 +753,11 @@ ssize_t Http2Session::ConsumeHTTP2Data() { CHECK_GT(ret, 0); CHECK_LE(static_cast(ret), read_len); - if (static_cast(ret) < read_len) { - // Mark the remainder of the data as available for later consumption. - stream_buf_offset_ += ret; - return ret; - } + // Mark the remainder of the data as available for later consumption. + // Even if all bytes were received, a paused stream may delay the + // nghttp2_on_frame_recv_callback which may have an END_STREAM flag. + stream_buf_offset_ += ret; + return ret; } // We are done processing the current input chunk. @@ -1093,6 +1093,7 @@ int Http2Session::OnDataChunkReceived(nghttp2_session* handle, if (session->is_write_in_progress()) { CHECK(session->is_reading_stopped()); session->set_receive_paused(); + Debug(session, "receive paused"); return NGHTTP2_ERR_PAUSE; } diff --git a/test/parallel/test-http2-misbehaving-multiplex.js b/test/parallel/test-http2-misbehaving-multiplex.js index fbd8add8906b7e..0e057e1ed28e7a 100644 --- a/test/parallel/test-http2-misbehaving-multiplex.js +++ b/test/parallel/test-http2-misbehaving-multiplex.js @@ -2,6 +2,7 @@ // Flags: --expose-internals const common = require('../common'); +const assert = require('assert'); if (!common.hasCrypto) common.skip('missing crypto'); @@ -13,16 +14,36 @@ const h2test = require('../common/http2'); let client; const server = h2.createServer(); +let gotFirstStreamId1; server.on('stream', common.mustCall((stream) => { stream.respond(); stream.end('ok'); - // The error will be emitted asynchronously - stream.on('error', common.expectsError({ - constructor: NghttpError, - code: 'ERR_HTTP2_ERROR', - message: 'Stream was already closed or invalid' - })); + // Http2Server should be fast enough to respond to and close + // the first streams with ID 1 and ID 3 without errors. + + // Test for errors in 'close' event to ensure no errors on some streams. + stream.on('error', () => {}); + stream.on('close', (err) => { + if (stream.id === 1) { + if (gotFirstStreamId1) { + // We expect our outgoing frames to fail on Stream ID 1 the second time + // because a stream with ID 1 was already closed before. + common.expectsError({ + constructor: NghttpError, + code: 'ERR_HTTP2_ERROR', + message: 'Stream was already closed or invalid' + }); + return; + } + gotFirstStreamId1 = true; + } + assert.strictEqual(err, undefined); + }); + + // Stream ID 5 should never reach the server + assert.notStrictEqual(stream.id, 5); + }, 2)); server.on('session', common.mustCall((session) => { @@ -35,26 +56,27 @@ server.on('session', common.mustCall((session) => { const settings = new h2test.SettingsFrame(); const settingsAck = new h2test.SettingsFrame(true); -const head1 = new h2test.HeadersFrame(1, h2test.kFakeRequestHeaders, 0, true); -const head2 = new h2test.HeadersFrame(3, h2test.kFakeRequestHeaders, 0, true); -const head3 = new h2test.HeadersFrame(1, h2test.kFakeRequestHeaders, 0, true); -const head4 = new h2test.HeadersFrame(5, h2test.kFakeRequestHeaders, 0, true); +// HeadersFrame(id, payload, padding, END_STREAM) +const id1 = new h2test.HeadersFrame(1, h2test.kFakeRequestHeaders, 0, true); +const id3 = new h2test.HeadersFrame(3, h2test.kFakeRequestHeaders, 0, true); +const id5 = new h2test.HeadersFrame(5, h2test.kFakeRequestHeaders, 0, true); server.listen(0, () => { client = net.connect(server.address().port, () => { client.write(h2test.kClientMagic, () => { client.write(settings.data, () => { client.write(settingsAck.data); - // This will make it ok. - client.write(head1.data, () => { - // This will make it ok. - client.write(head2.data, () => { + // Stream ID 1 frame will make it OK. + client.write(id1.data, () => { + // Stream ID 3 frame will make it OK. + client.write(id3.data, () => { + // A second Stream ID 1 frame should fail. // This will cause an error to occur because the client is // attempting to reuse an already closed stream. This must // cause the server session to be torn down. - client.write(head3.data, () => { - // This won't ever make it to the server - client.write(head4.data); + client.write(id1.data, () => { + // This Stream ID 5 frame will never make it to the server + client.write(id5.data); }); }); }); diff --git a/test/parallel/test-http2-pack-end-stream-flag.js b/test/parallel/test-http2-pack-end-stream-flag.js new file mode 100644 index 00000000000000..f6bb4452d95a77 --- /dev/null +++ b/test/parallel/test-http2-pack-end-stream-flag.js @@ -0,0 +1,61 @@ +'use strict'; + +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); +const assert = require('assert'); +const http2 = require('http2'); + +const { PerformanceObserver } = require('perf_hooks'); + +const server = http2.createServer(); + +server.on('stream', (stream, headers) => { + stream.respond({ + 'content-type': 'text/html', + ':status': 200 + }); + switch (headers[':path']) { + case '/singleEnd': + stream.end('OK'); + break; + case '/sequentialEnd': + stream.write('OK'); + stream.end(); + break; + case '/delayedEnd': + stream.write('OK', () => stream.end()); + break; + } +}); + +function testRequest(path, targetFrameCount, callback) { + const obs = new PerformanceObserver((list, observer) => { + const entry = list.getEntries()[0]; + if (entry.name !== 'Http2Session') return; + if (entry.type !== 'client') return; + assert.strictEqual(entry.framesReceived, targetFrameCount); + observer.disconnect(); + callback(); + }); + obs.observe({ entryTypes: ['http2'] }); + const client = http2.connect(`http://localhost:${server.address().port}`, () => { + const req = client.request({ ':path': path }); + req.resume(); + req.end(); + req.on('end', () => client.close()); + }); +} + +// SETTINGS => SETTINGS => HEADERS => DATA +const MIN_FRAME_COUNT = 4; + +server.listen(0, () => { + testRequest('/singleEnd', MIN_FRAME_COUNT, () => { + testRequest('/sequentialEnd', MIN_FRAME_COUNT, () => { + testRequest('/delayedEnd', MIN_FRAME_COUNT + 1, () => { + server.close(); + }); + }); + }); +}); diff --git a/test/parallel/test-http2-padding-aligned.js b/test/parallel/test-http2-padding-aligned.js index 432e3e8629f379..f687c02a98dc6e 100644 --- a/test/parallel/test-http2-padding-aligned.js +++ b/test/parallel/test-http2-padding-aligned.js @@ -26,7 +26,7 @@ const makeDuplexPair = require('../common/duplexpair'); // The lengths of the expected writes... note that this is highly // sensitive to how the internals are implemented. const serverLengths = [24, 9, 9, 32]; - const clientLengths = [9, 9, 48, 9, 1, 21, 1, 16]; + const clientLengths = [9, 9, 48, 9, 1, 21, 1]; // Adjust for the 24-byte preamble and two 9-byte settings frames, and // the result must be equally divisible by 8 diff --git a/test/parallel/test-http2-perf_hooks.js b/test/parallel/test-http2-perf_hooks.js index 0fcbc323e01301..1023d70ff73f2c 100644 --- a/test/parallel/test-http2-perf_hooks.js +++ b/test/parallel/test-http2-perf_hooks.js @@ -30,7 +30,7 @@ const obs = new PerformanceObserver(common.mustCall((items) => { break; case 'client': assert.strictEqual(entry.streamCount, 1); - assert.strictEqual(entry.framesReceived, 8); + assert.strictEqual(entry.framesReceived, 7); break; default: assert.fail('invalid Http2Session type');