From a949d7955693a77d6a89eacf9a603db1ee96c3e8 Mon Sep 17 00:00:00 2001 From: dead_horse Date: Wed, 2 Dec 2015 13:21:38 +0800 Subject: [PATCH] fix: should not destroy streams use black-hole-stream to make sure stream's data has been read --- lib/response.js | 14 ++++++++-- package.json | 7 ++++- test/application.js | 67 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 85 insertions(+), 3 deletions(-) diff --git a/lib/response.js b/lib/response.js index 8027c19e7..06ca13eec 100644 --- a/lib/response.js +++ b/lib/response.js @@ -6,6 +6,7 @@ */ var contentDisposition = require('content-disposition'); +var BlackHoleStream = require('black-hole-stream'); var ensureErrorHandler = require('error-inject'); var getType = require('mime-types').contentType; var onFinish = require('on-finished'); @@ -15,6 +16,8 @@ var typeis = require('type-is').is; var statuses = require('statuses'); var destroy = require('destroy'); var assert = require('assert'); +var Stream = require('stream'); +var http = require('http'); var path = require('path'); var vary = require('vary'); var extname = path.extname; @@ -161,8 +164,15 @@ module.exports = { } // stream - if ('function' == typeof val.pipe) { - onFinish(this.res, destroy.bind(null, val)); + if (val instanceof Stream) { + onFinish(this.res, function(){ + // don't destroy http IncomingMessage, keep `keep-alive` conncetion alive. + if (val instanceof http.IncomingMessage) { + if (val.readable) val.pipe(new BlackHoleStream()); + } else { + destroy(val); + } + }); ensureErrorHandler(val, this.ctx.onerror); // overwriting diff --git a/package.json b/package.json index 42933856d..c217f4aec 100644 --- a/package.json +++ b/package.json @@ -19,6 +19,7 @@ "license": "MIT", "dependencies": { "accepts": "^1.2.2", + "black-hole-stream": "0.0.1", "co": "^4.4.0", "composition": "^2.1.1", "content-disposition": "~0.5.0", @@ -43,14 +44,18 @@ "vary": "^1.0.0" }, "devDependencies": { + "agentkeepalive": "~2.0.3", "babel": "^5.0.0", + "freeport": "~1.0.5", "istanbul": "^0.4.0", "make-lint": "^1.0.1", "mocha": "^2.0.1", + "pedding": "^1.0.0", "should": "^6.0.3", "should-http": "0.0.3", "supertest": "^1.0.1", - "test-console": "^0.7.1" + "test-console": "^0.7.1", + "urllib": "^2.5.0" }, "engines": { "node": ">= 0.12.0", diff --git a/test/application.js b/test/application.js index 0c1947c4e..ddb0958ca 100644 --- a/test/application.js +++ b/test/application.js @@ -2,9 +2,13 @@ 'use strict'; var stderr = require('test-console').stderr; +var Agent = require('agentkeepalive'); var request = require('supertest'); var statuses = require('statuses'); +var freeport = require('freeport'); +var pedding = require('pedding'); var assert = require('assert'); +var urllib = require('urllib'); var http = require('http'); var koa = require('..'); var fs = require('fs'); @@ -872,6 +876,69 @@ describe('app.respond', function(){ .get('/') .expect(404, done); }) + + it('should ensure stream do not leak', function(done){ + done = pedding(3, done); + var app = koa(); + let stream1 = fs.createReadStream(__filename); + let stream2 = fs.createReadStream(__filename); + stream1.once('close', done); + stream2.once('close', done); + + app.use(function *(){ + this.body = stream1; + this.body = stream2; + }); + + var server = app.listen(); + + request(server) + .head('/') + .expect(200, done); + }) + }) + + describe('when .body is a http keepalive IncomingMessage', function(){ + var target; + var port; + before(function(done){ + var app = koa(); + app.use(function *(){ + this.body = fs.createReadStream(__filename); + }); + + freeport(function(err, p){ + port = p || 12384; + target = app.listen(port, done); + }); + }) + + after(function(){ + target.close(); + }) + + it('should not destroy keepalive connection', function(done){ + done = pedding(2, done); + var app = koa(); + app.use(function *(){ + var remote = yield urllib.request('http://127.0.0.1:' + port, { + streaming: true, + agent: new Agent() + }); + var res = remote.res; + this.body = res; + res.once('end', function(){ + assert.equal(res.readable, false); + assert.equal(res.socket.destroyed, false); + done(); + }); + }); + + var server = app.listen(); + request(server) + .head('/') + .expect(200, done); + }) }) describe('when .body is an Object', function(){