From 50536d1e02ad42bdf262381034805378b98bfa53 Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Sat, 16 Jul 2022 13:16:51 +0000 Subject: [PATCH] fix: premature close with chunked transfer encoding and for async iterators in Node 12 (#1172) * fix: premature close with chunked transfer encoding and for async iterators in Node 12 This PR backports the fix from #1064 to the `2.x.x` branch following the [comment here](https://github.com/node-fetch/node-fetch/pull/1064#issuecomment-849167400). I had to add some extra babel config to allow using the `for await..of` syntax in the tests. The config is only needed for the tests as this syntax is not used in the implementation. * chore: fix up tests for node 6+ * chore: codecov dropped support for node < 8 without shipping major * chore: npm7 strips empty dependencies hash during install * chore: pin deps to versions that work on node 4 * chore: do not emit close error after aborting a request * chore: test on node 4-16 * chore: simplify chunked transer encoding bad ending * chore: avoid calling .destroy as it is not in every node.js release * chore: listen for response close as socket is reused and shows warnings --- .babelrc | 6 ++-- .travis.yml | 2 ++ README.md | 43 +++++++++++++++++++++++++++++ package.json | 4 ++- src/index.js | 68 +++++++++++++++++++++++++++++++++++++++++++++- test/server.js | 28 +++++++++++++++++++ test/test.js | 74 ++++++++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 221 insertions(+), 4 deletions(-) diff --git a/.babelrc b/.babelrc index 6a95c25e7..901200bce 100644 --- a/.babelrc +++ b/.babelrc @@ -14,7 +14,8 @@ } ] ], plugins: [ - './build/babel-plugin' + './build/babel-plugin', + 'transform-async-generator-functions' ] }, coverage: { @@ -31,7 +32,8 @@ ], plugins: [ [ 'istanbul', { exclude: [ 'src/blob.js', 'build', 'test' ] } ], - './build/babel-plugin' + './build/babel-plugin', + 'transform-async-generator-functions' ] }, rollup: { diff --git a/.travis.yml b/.travis.yml index 3bb109e15..7d7081b33 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,6 +4,8 @@ node_js: - "6" - "8" - "10" + - "12" + - "14" - "node" env: - FORMDATA_VERSION=1.0.0 diff --git a/README.md b/README.md index 2dde74289..4f87a59a0 100644 --- a/README.md +++ b/README.md @@ -188,6 +188,49 @@ fetch('https://assets-cdn.github.com/images/modules/logos_page/Octocat.png') }); ``` +In Node.js 14 you can also use async iterators to read `body`; however, be careful to catch +errors -- the longer a response runs, the more likely it is to encounter an error. + +```js +const fetch = require('node-fetch'); +const response = await fetch('https://httpbin.org/stream/3'); +try { + for await (const chunk of response.body) { + console.dir(JSON.parse(chunk.toString())); + } +} catch (err) { + console.error(err.stack); +} +``` + +In Node.js 12 you can also use async iterators to read `body`; however, async iterators with streams +did not mature until Node.js 14, so you need to do some extra work to ensure you handle errors +directly from the stream and wait on it response to fully close. + +```js +const fetch = require('node-fetch'); +const read = async body => { + let error; + body.on('error', err => { + error = err; + }); + for await (const chunk of body) { + console.dir(JSON.parse(chunk.toString())); + } + return new Promise((resolve, reject) => { + body.on('close', () => { + error ? reject(error) : resolve(); + }); + }); +}; +try { + const response = await fetch('https://httpbin.org/stream/3'); + await read(response.body); +} catch (err) { + console.error(err.stack); +} +``` + #### Buffer If you prefer to cache binary data in full, use buffer(). (NOTE: `buffer()` is a `node-fetch`-only API) diff --git a/package.json b/package.json index 3c1bd8da7..ace7d8c4f 100644 --- a/package.json +++ b/package.json @@ -53,7 +53,9 @@ "abortcontroller-polyfill": "^1.3.0", "babel-core": "^6.26.3", "babel-plugin-istanbul": "^4.1.6", - "babel-preset-env": "^1.6.1", + "babel-plugin-transform-async-generator-functions": "^6.24.1", + "babel-polyfill": "^6.26.0", + "babel-preset-env": "1.4.0", "babel-register": "^6.16.3", "chai": "^3.5.0", "chai-as-promised": "^7.1.1", diff --git a/src/index.js b/src/index.js index 93945f6d8..1a25e800d 100644 --- a/src/index.js +++ b/src/index.js @@ -67,7 +67,7 @@ export default function fetch(url, opts) { let error = new AbortError('The user aborted a request.'); reject(error); if (request.body && request.body instanceof Stream.Readable) { - request.body.destroy(error); + destroyStream(request.body, error); } if (!response || !response.body) return; response.body.emit('error', error); @@ -108,9 +108,41 @@ export default function fetch(url, opts) { req.on('error', err => { reject(new FetchError(`request to ${request.url} failed, reason: ${err.message}`, 'system', err)); + + if (response && response.body) { + destroyStream(response.body, err); + } + finalize(); }); + fixResponseChunkedTransferBadEnding(req, err => { + if (signal && signal.aborted) { + return + } + + destroyStream(response.body, err); + }); + + /* c8 ignore next 18 */ + if (parseInt(process.version.substring(1)) < 14) { + // Before Node.js 14, pipeline() does not fully support async iterators and does not always + // properly handle when the socket close/end events are out of order. + req.on('socket', s => { + s.addListener('close', hadError => { + // if a data listener is still present we didn't end cleanly + const hasDataListener = s.listenerCount('data') > 0 + + // if end happened before close but the socket didn't emit an error, do it now + if (response && hasDataListener && !hadError && !(signal && signal.aborted)) { + const err = new Error('Premature close'); + err.code = 'ERR_STREAM_PREMATURE_CLOSE'; + response.body.emit('error', err); + } + }); + }); + } + req.on('response', res => { clearTimeout(reqTimeout); @@ -303,6 +335,40 @@ export default function fetch(url, opts) { }; +function fixResponseChunkedTransferBadEnding(request, errorCallback) { + let socket; + + request.on('socket', s => { + socket = s; + }); + + request.on('response', response => { + const {headers} = response; + if (headers['transfer-encoding'] === 'chunked' && !headers['content-length']) { + response.once('close', hadError => { + // if a data listener is still present we didn't end cleanly + const hasDataListener = socket.listenerCount('data') > 0; + + if (hasDataListener && !hadError) { + const err = new Error('Premature close'); + err.code = 'ERR_STREAM_PREMATURE_CLOSE'; + errorCallback(err); + } + }); + } + }); +} + +function destroyStream (stream, err) { + if (stream.destroy) { + stream.destroy(err); + } else { + // node < 8 + stream.emit('error', err); + stream.end(); + } +} + /** * Redirect code matching * diff --git a/test/server.js b/test/server.js index ffe4733ff..ffff88877 100644 --- a/test/server.js +++ b/test/server.js @@ -329,6 +329,34 @@ export default class TestServer { res.destroy(); } + if (p === '/error/premature/chunked') { + res.writeHead(200, { + 'Content-Type': 'application/json', + 'Transfer-Encoding': 'chunked' + }); + + // Transfer-Encoding: 'chunked' sends chunk sizes followed by the + // chunks - https://en.wikipedia.org/wiki/Chunked_transfer_encoding + const sendChunk = (obj) => { + const data = JSON.stringify(obj) + + res.write(`${data.length}\r\n`) + res.write(`${data}\r\n`) + } + + sendChunk({data: 'hi'}) + + setTimeout(() => { + sendChunk({data: 'bye'}) + }, 200); + + setTimeout(() => { + // should send '0\r\n\r\n' to end the response properly but instead + // just close the connection + res.destroy(); + }, 400); + } + if (p === '/error/json') { res.statusCode = 200; res.setHeader('Content-Type', 'application/json'); diff --git a/test/test.js b/test/test.js index 719b5b960..36fe7703e 100644 --- a/test/test.js +++ b/test/test.js @@ -1,4 +1,7 @@ +import 'babel-core/register' +import 'babel-polyfill' + // test tools import chai from 'chai'; import chaiPromised from 'chai-as-promised'; @@ -552,6 +555,77 @@ describe('node-fetch', () => { .and.have.property('code', 'ECONNRESET'); }); + it('should handle network-error in chunked response', () => { + const url = `${base}error/premature/chunked`; + return fetch(url).then(res => { + expect(res.status).to.equal(200); + expect(res.ok).to.be.true; + + return expect(new Promise((resolve, reject) => { + res.body.on('error', reject); + res.body.on('close', resolve); + })).to.eventually.be.rejectedWith(Error, 'Premature close') + .and.have.property('code', 'ERR_STREAM_PREMATURE_CLOSE'); + }); + }); + + // Skip test if streams are not async iterators (node < 10) + const itAsyncIterator = Boolean(new stream.PassThrough()[Symbol.asyncIterator]) ? it : it.skip; + + itAsyncIterator('should handle network-error in chunked response async iterator', () => { + const url = `${base}error/premature/chunked`; + return fetch(url).then(res => { + expect(res.status).to.equal(200); + expect(res.ok).to.be.true; + + const read = async body => { + const chunks = []; + + if (process.version < 'v14') { + // In Node.js 12, some errors don't come out in the async iterator; we have to pick + // them up from the event-emitter and then throw them after the async iterator + let error; + body.on('error', err => { + error = err; + }); + + for await (const chunk of body) { + chunks.push(chunk); + } + + if (error) { + throw error; + } + + return new Promise(resolve => { + body.on('close', () => resolve(chunks)); + }); + } + + for await (const chunk of body) { + chunks.push(chunk); + } + + return chunks; + }; + + return expect(read(res.body)) + .to.eventually.be.rejectedWith(Error, 'Premature close') + .and.have.property('code', 'ERR_STREAM_PREMATURE_CLOSE'); + }); + }); + + it('should handle network-error in chunked response in consumeBody', () => { + const url = `${base}error/premature/chunked`; + return fetch(url).then(res => { + expect(res.status).to.equal(200); + expect(res.ok).to.be.true; + + return expect(res.text()) + .to.eventually.be.rejectedWith(Error, 'Premature close'); + }); + }); + it('should handle DNS-error response', function() { const url = 'http://domain.invalid'; return expect(fetch(url)).to.eventually.be.rejected