Skip to content

Commit

Permalink
Synchronously update internal sockets length so http.Agent pooling is…
Browse files Browse the repository at this point in the history
… used (#300)

* Synchronously update internal sockets length so http.Agent pooling is used

Fixes #299

* Fix some comments

* Use fewer ts-expect-error comments

* Implement getName method to get both http/https names

* Fix linting

* Create seven-colts-flash.md

* Make getName() public and add tests for it

* Add test for keepAlive+maxSockets when using https
  • Loading branch information
lukekarrys authored Mar 30, 2024
1 parent b5f94e3 commit e62863c
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 8 deletions.
5 changes: 5 additions & 0 deletions .changeset/seven-colts-flash.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"agent-base": patch
---

Synchronously update internal sockets length so `http.Agent` pooling is used
85 changes: 77 additions & 8 deletions packages/agent-base/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import * as net from 'net';
import * as tls from 'tls';
import * as http from 'http';
import { Agent as HttpsAgent } from 'https';
import type { Duplex } from 'stream';

export * from './helpers';
Expand Down Expand Up @@ -77,6 +78,65 @@ export abstract class Agent extends http.Agent {
);
}

// In order to support async signatures in `connect()` and Node's native
// connection pooling in `http.Agent`, the array of sockets for each origin
// has to be updated synchronously. This is so the length of the array is
// accurate when `addRequest()` is next called. We achieve this by creating a
// fake socket and adding it to `sockets[origin]` and incrementing
// `totalSocketCount`.
private incrementSockets(name: string) {
// If `maxSockets` and `maxTotalSockets` are both Infinity then there is no
// need to create a fake socket because Node.js native connection pooling
// will never be invoked.
if (this.maxSockets === Infinity && this.maxTotalSockets === Infinity) {
return null;
}
// All instances of `sockets` are expected TypeScript errors. The
// alternative is to add it as a private property of this class but that
// will break TypeScript subclassing.
if (!this.sockets[name]) {
// @ts-expect-error `sockets` is readonly in `@types/node`
this.sockets[name] = [];
}
const fakeSocket = new net.Socket({ writable: false });
(this.sockets[name] as net.Socket[]).push(fakeSocket);
// @ts-expect-error `totalSocketCount` isn't defined in `@types/node`
this.totalSocketCount++;
return fakeSocket;
}

private decrementSockets(name: string, socket: null | net.Socket) {
if (!this.sockets[name] || socket === null) {
return;
}
const sockets = this.sockets[name] as net.Socket[];
const index = sockets.indexOf(socket);
if (index !== -1) {
sockets.splice(index, 1);
// @ts-expect-error `totalSocketCount` isn't defined in `@types/node`
this.totalSocketCount--;
if (sockets.length === 0) {
// @ts-expect-error `sockets` is readonly in `@types/node`
delete this.sockets[name];
}
}
}

// In order to properly update the socket pool, we need to call `getName()` on
// the core `https.Agent` if it is a secureEndpoint.
getName(options: AgentConnectOpts): string {
const secureEndpoint =
typeof options.secureEndpoint === 'boolean'
? options.secureEndpoint
: this.isSecureEndpoint(options);
if (secureEndpoint) {
// @ts-expect-error `getName()` isn't defined in `@types/node`
return HttpsAgent.prototype.getName.call(this, options);
}
// @ts-expect-error `getName()` isn't defined in `@types/node`
return super.getName(options);
}

createSocket(
req: http.ClientRequest,
options: AgentConnectOpts,
Expand All @@ -86,17 +146,26 @@ export abstract class Agent extends http.Agent {
...options,
secureEndpoint: this.isSecureEndpoint(options),
};
const name = this.getName(connectOpts);
const fakeSocket = this.incrementSockets(name);
Promise.resolve()
.then(() => this.connect(req, connectOpts))
.then((socket) => {
if (socket instanceof http.Agent) {
// @ts-expect-error `addRequest()` isn't defined in `@types/node`
return socket.addRequest(req, connectOpts);
.then(
(socket) => {
this.decrementSockets(name, fakeSocket);
if (socket instanceof http.Agent) {
// @ts-expect-error `addRequest()` isn't defined in `@types/node`
return socket.addRequest(req, connectOpts);
}
this[INTERNAL].currentSocket = socket;
// @ts-expect-error `createSocket()` isn't defined in `@types/node`
super.createSocket(req, options, cb);
},
(err) => {
this.decrementSockets(name, fakeSocket);
cb(err);
}
this[INTERNAL].currentSocket = socket;
// @ts-expect-error `createSocket()` isn't defined in `@types/node`
super.createSocket(req, options, cb);
}, cb);
);
}

createConnection(): Duplex {
Expand Down
117 changes: 117 additions & 0 deletions packages/agent-base/test/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ describe('Agent (TypeScript)', () => {
) {
gotCallback = true;
assert(opts.secureEndpoint === false);
assert.equal(this.getName(opts), `127.0.0.1:${port}:`);
return net.connect(opts);
}
}
Expand Down Expand Up @@ -308,6 +309,60 @@ describe('Agent (TypeScript)', () => {
server2.close();
}
});

it('should support `keepAlive: true` with `maxSockets`', async () => {
let reqCount = 0;
let connectCount = 0;

class MyAgent extends Agent {
async connect(
_req: http.ClientRequest,
opts: AgentConnectOpts
) {
connectCount++;
assert(opts.secureEndpoint === false);
await sleep(10);
return net.connect(opts);
}
}
const agent = new MyAgent({ keepAlive: true, maxSockets: 1 });

const server = http.createServer(async (req, res) => {
expect(req.headers.connection).toEqual('keep-alive');
reqCount++;
await sleep(10);
res.end();
});
const addr = await listen(server);

try {
const resPromise = req(new URL('/foo', addr), { agent });
const res2Promise = req(new URL('/another', addr), {
agent,
});

const res = await resPromise;
expect(reqCount).toEqual(1);
expect(connectCount).toEqual(1);
expect(res.headers.connection).toEqual('keep-alive');

res.resume();
const s1 = res.socket;
await once(s1, 'free');

const res2 = await res2Promise;
expect(reqCount).toEqual(2);
expect(connectCount).toEqual(1);
expect(res2.headers.connection).toEqual('keep-alive');
assert(res2.socket === s1);

res2.resume();
await once(res2.socket, 'free');
} finally {
agent.destroy();
server.close();
}
});
});

describe('"https" module', () => {
Expand All @@ -322,6 +377,10 @@ describe('Agent (TypeScript)', () => {
): net.Socket {
gotCallback = true;
assert(opts.secureEndpoint === true);
assert.equal(
this.getName(opts),
`127.0.0.1:${port}::::::::false:::::::::::::`
);
return tls.connect(opts);
}
}
Expand Down Expand Up @@ -509,5 +568,63 @@ describe('Agent (TypeScript)', () => {
server.close();
}
});

it('should support `keepAlive: true` with `maxSockets`', async () => {
let reqCount = 0;
let connectCount = 0;

class MyAgent extends Agent {
async connect(
_req: http.ClientRequest,
opts: AgentConnectOpts
) {
connectCount++;
assert(opts.secureEndpoint === true);
await sleep(10);
return tls.connect(opts);
}
}
const agent = new MyAgent({ keepAlive: true, maxSockets: 1 });

const server = https.createServer(sslOptions, async (req, res) => {
expect(req.headers.connection).toEqual('keep-alive');
reqCount++;
await sleep(10);
res.end();
});
const addr = await listen(server);

try {
const resPromise = req(new URL('/foo', addr), {
agent,
rejectUnauthorized: false,
});
const res2Promise = req(new URL('/another', addr), {
agent,
rejectUnauthorized: false,
});

const res = await resPromise;
expect(reqCount).toEqual(1);
expect(connectCount).toEqual(1);
expect(res.headers.connection).toEqual('keep-alive');

res.resume();
const s1 = res.socket;
await once(s1, 'free');

const res2 = await res2Promise;
expect(reqCount).toEqual(2);
expect(connectCount).toEqual(1);
expect(res2.headers.connection).toEqual('keep-alive');
assert(res2.socket === s1);

res2.resume();
await once(res2.socket, 'free');
} finally {
agent.destroy();
server.close();
}
});
});
});

0 comments on commit e62863c

Please sign in to comment.