Skip to content

Commit 7e82a56

Browse files
mscdexrvagg
authored andcommitted
http: allow async createConnection()
This commit adds support for async createConnection() implementations and is still backwards compatible with synchronous createConnection() implementations. This commit also makes the http client more friendly with generic stream objects produced by createConnection() by checking stream.writable instead of stream.destroyed as the latter is currently a net.Socket-ism and not set by the core stream implementations. PR-URL: #4638 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
1 parent 9e6ad2d commit 7e82a56

File tree

4 files changed

+173
-64
lines changed

4 files changed

+173
-64
lines changed

doc/api/http.markdown

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,18 @@ options.agent = keepAliveAgent;
117117
http.request(options, onResponseCallback);
118118
```
119119

120+
### agent.createConnection(options[, callback])
121+
122+
Produces a socket/stream to be used for HTTP requests.
123+
124+
By default, this function is the same as [`net.createConnection()`][]. However,
125+
custom Agents may override this method in case greater flexibility is desired.
126+
127+
A socket/stream can be supplied in one of two ways: by returning the
128+
socket/stream from this function, or by passing the socket/stream to `callback`.
129+
130+
`callback` has a signature of `(err, stream)`.
131+
120132
### agent.destroy()
121133

122134
Destroy any sockets that are currently in use by the agent.
@@ -1112,6 +1124,10 @@ Options:
11121124
- `Agent` object: explicitly use the passed in `Agent`.
11131125
- `false`: opts out of connection pooling with an Agent, defaults request to
11141126
`Connection: close`.
1127+
- `createConnection`: A function that produces a socket/stream to use for the
1128+
request when the `agent` option is not used. This can be used to avoid
1129+
creating a custom Agent class just to override the default `createConnection`
1130+
function. See [`agent.createConnection()`][] for more details.
11151131

11161132
The optional `callback` parameter will be added as a one time listener for
11171133
the `'response'` event.
@@ -1187,6 +1203,7 @@ There are a few special headers that should be noted.
11871203
[`'listening'`]: net.html#net_event_listening
11881204
[`'response'`]: #http_event_response
11891205
[`Agent`]: #http_class_http_agent
1206+
[`agent.createConnection`]: #http_agent_createconnection
11901207
[`Buffer`]: buffer.html#buffer_buffer
11911208
[`destroy()`]: #http_agent_destroy
11921209
[`EventEmitter`]: events.html#events_class_events_eventemitter
@@ -1198,6 +1215,7 @@ There are a few special headers that should be noted.
11981215
[`http.Server`]: #http_class_http_server
11991216
[`http.ServerResponse`]: #http_class_http_serverresponse
12001217
[`message.headers`]: #http_message_headers
1218+
[`net.createConnection`]: net.html#net_net_createconnection_options_connectlistener
12011219
[`net.Server`]: net.html#net_class_net_server
12021220
[`net.Server.close()`]: net.html#net_server_close_callback
12031221
[`net.Server.listen()`]: net.html#net_server_listen_handle_callback

lib/_http_agent.js

Lines changed: 67 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ function Agent(options) {
4444
var name = self.getName(options);
4545
debug('agent.on(free)', name);
4646

47-
if (!socket.destroyed &&
47+
if (socket.writable &&
4848
self.requests[name] && self.requests[name].length) {
4949
self.requests[name].shift().onSocket(socket);
5050
if (self.requests[name].length === 0) {
@@ -57,7 +57,7 @@ function Agent(options) {
5757
var req = socket._httpMessage;
5858
if (req &&
5959
req.shouldKeepAlive &&
60-
!socket.destroyed &&
60+
socket.writable &&
6161
self.keepAlive) {
6262
var freeSockets = self.freeSockets[name];
6363
var freeLen = freeSockets ? freeSockets.length : 0;
@@ -138,7 +138,15 @@ Agent.prototype.addRequest = function(req, options) {
138138
} else if (sockLen < this.maxSockets) {
139139
debug('call onSocket', sockLen, freeLen);
140140
// If we are under maxSockets create a new one.
141-
req.onSocket(this.createSocket(req, options));
141+
this.createSocket(req, options, function(err, newSocket) {
142+
if (err) {
143+
process.nextTick(function() {
144+
req.emit('error', err);
145+
});
146+
return;
147+
}
148+
req.onSocket(newSocket);
149+
});
142150
} else {
143151
debug('wait for socket');
144152
// We are over limit so we'll add it to the queue.
@@ -149,18 +157,16 @@ Agent.prototype.addRequest = function(req, options) {
149157
}
150158
};
151159

152-
Agent.prototype.createSocket = function(req, options) {
160+
Agent.prototype.createSocket = function(req, options, cb) {
153161
var self = this;
154162
options = util._extend({}, options);
155163
options = util._extend(options, self.options);
156164

157165
if (!options.servername) {
158166
options.servername = options.host;
159-
if (req) {
160-
var hostHeader = req.getHeader('host');
161-
if (hostHeader) {
162-
options.servername = hostHeader.replace(/:.*$/, '');
163-
}
167+
const hostHeader = req.getHeader('host');
168+
if (hostHeader) {
169+
options.servername = hostHeader.replace(/:.*$/, '');
164170
}
165171
}
166172

@@ -169,48 +175,58 @@ Agent.prototype.createSocket = function(req, options) {
169175

170176
debug('createConnection', name, options);
171177
options.encoding = null;
172-
var s = self.createConnection(options);
173-
if (!self.sockets[name]) {
174-
self.sockets[name] = [];
175-
}
176-
this.sockets[name].push(s);
177-
debug('sockets', name, this.sockets[name].length);
178+
var called = false;
179+
const newSocket = self.createConnection(options, oncreate);
180+
if (newSocket)
181+
oncreate(null, newSocket);
182+
function oncreate(err, s) {
183+
if (called)
184+
return;
185+
called = true;
186+
if (err)
187+
return cb(err);
188+
if (!self.sockets[name]) {
189+
self.sockets[name] = [];
190+
}
191+
self.sockets[name].push(s);
192+
debug('sockets', name, self.sockets[name].length);
178193

179-
function onFree() {
180-
self.emit('free', s, options);
181-
}
182-
s.on('free', onFree);
183-
184-
function onClose(err) {
185-
debug('CLIENT socket onClose');
186-
// This is the only place where sockets get removed from the Agent.
187-
// If you want to remove a socket from the pool, just close it.
188-
// All socket errors end in a close event anyway.
189-
self.removeSocket(s, options);
190-
}
191-
s.on('close', onClose);
192-
193-
function onRemove() {
194-
// We need this function for cases like HTTP 'upgrade'
195-
// (defined by WebSockets) where we need to remove a socket from the
196-
// pool because it'll be locked up indefinitely
197-
debug('CLIENT socket onRemove');
198-
self.removeSocket(s, options);
199-
s.removeListener('close', onClose);
200-
s.removeListener('free', onFree);
201-
s.removeListener('agentRemove', onRemove);
194+
function onFree() {
195+
self.emit('free', s, options);
196+
}
197+
s.on('free', onFree);
198+
199+
function onClose(err) {
200+
debug('CLIENT socket onClose');
201+
// This is the only place where sockets get removed from the Agent.
202+
// If you want to remove a socket from the pool, just close it.
203+
// All socket errors end in a close event anyway.
204+
self.removeSocket(s, options);
205+
}
206+
s.on('close', onClose);
207+
208+
function onRemove() {
209+
// We need this function for cases like HTTP 'upgrade'
210+
// (defined by WebSockets) where we need to remove a socket from the
211+
// pool because it'll be locked up indefinitely
212+
debug('CLIENT socket onRemove');
213+
self.removeSocket(s, options);
214+
s.removeListener('close', onClose);
215+
s.removeListener('free', onFree);
216+
s.removeListener('agentRemove', onRemove);
217+
}
218+
s.on('agentRemove', onRemove);
219+
cb(null, s);
202220
}
203-
s.on('agentRemove', onRemove);
204-
return s;
205221
};
206222

207223
Agent.prototype.removeSocket = function(s, options) {
208224
var name = this.getName(options);
209-
debug('removeSocket', name, 'destroyed:', s.destroyed);
225+
debug('removeSocket', name, 'writable:', s.writable);
210226
var sets = [this.sockets];
211227

212228
// If the socket was destroyed, remove it from the free buffers too.
213-
if (s.destroyed)
229+
if (!s.writable)
214230
sets.push(this.freeSockets);
215231

216232
for (var sk = 0; sk < sets.length; sk++) {
@@ -231,7 +247,15 @@ Agent.prototype.removeSocket = function(s, options) {
231247
debug('removeSocket, have a request, make a socket');
232248
var req = this.requests[name][0];
233249
// If we have pending requests and a socket gets closed make a new one
234-
this.createSocket(req, options).emit('free');
250+
this.createSocket(req, options, function(err, newSocket) {
251+
if (err) {
252+
process.nextTick(function() {
253+
req.emit('error', err);
254+
});
255+
return;
256+
}
257+
newSocket.emit('free');
258+
});
235259
}
236260
};
237261

lib/_http_client.js

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ function ClientRequest(options, cb) {
3333
if (agent === false) {
3434
agent = new defaultAgent.constructor();
3535
} else if ((agent === null || agent === undefined) &&
36-
!options.createConnection) {
36+
typeof options.createConnection !== 'function') {
3737
agent = defaultAgent;
3838
}
3939
self.agent = agent;
@@ -118,10 +118,20 @@ function ClientRequest(options, cb) {
118118
self._renderHeaders());
119119
}
120120

121+
var called = false;
121122
if (self.socketPath) {
122123
self._last = true;
123124
self.shouldKeepAlive = false;
124-
self.onSocket(self.agent.createConnection({ path: self.socketPath }));
125+
const optionsPath = {
126+
path: self.socketPath
127+
};
128+
const newSocket = self.agent.createConnection(optionsPath, oncreate);
129+
if (newSocket && !called) {
130+
called = true;
131+
self.onSocket(newSocket);
132+
} else {
133+
return;
134+
}
125135
} else if (self.agent) {
126136
// If there is an agent we should default to Connection:keep-alive,
127137
// but only if the Agent will actually reuse the connection!
@@ -139,14 +149,37 @@ function ClientRequest(options, cb) {
139149
// No agent, default to Connection:close.
140150
self._last = true;
141151
self.shouldKeepAlive = false;
142-
if (options.createConnection) {
143-
self.onSocket(options.createConnection(options));
152+
if (typeof options.createConnection === 'function') {
153+
const newSocket = options.createConnection(options, oncreate);
154+
if (newSocket && !called) {
155+
called = true;
156+
self.onSocket(newSocket);
157+
} else {
158+
return;
159+
}
144160
} else {
145161
debug('CLIENT use net.createConnection', options);
146162
self.onSocket(net.createConnection(options));
147163
}
148164
}
149165

166+
function oncreate(err, socket) {
167+
if (called)
168+
return;
169+
called = true;
170+
if (err) {
171+
process.nextTick(function() {
172+
self.emit('error', err);
173+
});
174+
return;
175+
}
176+
self.onSocket(socket);
177+
self._deferToConnect(null, null, function() {
178+
self._flush();
179+
self = null;
180+
});
181+
}
182+
150183
self._deferToConnect(null, null, function() {
151184
self._flush();
152185
self = null;
Lines changed: 51 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,61 @@
11
'use strict';
2-
var common = require('../common');
3-
var assert = require('assert');
4-
var http = require('http');
5-
var net = require('net');
2+
const common = require('../common');
3+
const http = require('http');
4+
const net = require('net');
5+
const assert = require('assert');
66

7-
var create = 0;
8-
var response = 0;
9-
process.on('exit', function() {
10-
assert.equal(1, create, 'createConnection() http option was not called');
11-
assert.equal(1, response, 'http server "request" callback was not called');
12-
});
13-
14-
var server = http.createServer(function(req, res) {
7+
const server = http.createServer(common.mustCall(function(req, res) {
158
res.end();
16-
response++;
17-
}).listen(common.PORT, '127.0.0.1', function() {
18-
http.get({ createConnection: createConnection }, function(res) {
9+
}, 4)).listen(common.PORT, '127.0.0.1', function() {
10+
let fn = common.mustCall(createConnection);
11+
http.get({ createConnection: fn }, function(res) {
1912
res.resume();
20-
server.close();
13+
fn = common.mustCall(createConnectionAsync);
14+
http.get({ createConnection: fn }, function(res) {
15+
res.resume();
16+
fn = common.mustCall(createConnectionBoth1);
17+
http.get({ createConnection: fn }, function(res) {
18+
res.resume();
19+
fn = common.mustCall(createConnectionBoth2);
20+
http.get({ createConnection: fn }, function(res) {
21+
res.resume();
22+
fn = common.mustCall(createConnectionError);
23+
http.get({ createConnection: fn }, function(res) {
24+
assert.fail(null, null, 'Unexpected response callback');
25+
}).on('error', common.mustCall(function(err) {
26+
assert.equal(err.message, 'Could not create socket');
27+
server.close();
28+
}));
29+
});
30+
});
31+
});
2132
});
2233
});
2334

2435
function createConnection() {
25-
create++;
2636
return net.createConnection(common.PORT, '127.0.0.1');
2737
}
38+
39+
function createConnectionAsync(options, cb) {
40+
setImmediate(function() {
41+
cb(null, net.createConnection(common.PORT, '127.0.0.1'));
42+
});
43+
}
44+
45+
function createConnectionBoth1(options, cb) {
46+
const socket = net.createConnection(common.PORT, '127.0.0.1');
47+
setImmediate(function() {
48+
cb(null, socket);
49+
});
50+
return socket;
51+
}
52+
53+
function createConnectionBoth2(options, cb) {
54+
const socket = net.createConnection(common.PORT, '127.0.0.1');
55+
cb(null, socket);
56+
return socket;
57+
}
58+
59+
function createConnectionError(options, cb) {
60+
process.nextTick(cb, new Error('Could not create socket'));
61+
}

0 commit comments

Comments
 (0)