-
Notifications
You must be signed in to change notification settings - Fork 8.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Create HTTP Agent manager #137748
Create HTTP Agent manager #137748
Changes from 35 commits
4265fe3
b3a2d1c
2f0550b
24e731d
bf19f70
941f5db
ca8d434
93d033c
0e8591c
582e4e7
11431a3
a3b153d
46232e7
f891b0a
8270d13
a8b7915
0954407
210a326
df464cc
9acb1dc
41fe166
1aa1a4c
8149c53
e2b53d6
fca3af2
0d0ba54
5dcb57a
5038277
0ff81aa
01a4d17
53a25f1
afe9f35
5aae94b
74a4886
72d8388
290bcff
86b14b9
6331e35
9437590
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0 and the Server Side Public License, v 1; you may not use this file except | ||
* in compliance with, at your election, the Elastic License 2.0 or the Server | ||
* Side Public License, v 1. | ||
*/ | ||
|
||
import { AgentManager } from './agent_manager'; | ||
import { Agent as HttpAgent } from 'http'; | ||
import { Agent as HttpsAgent } from 'https'; | ||
|
||
jest.mock('http'); | ||
jest.mock('https'); | ||
|
||
const HttpAgentMock = HttpAgent as jest.Mock<HttpAgent>; | ||
const HttpsAgentMock = HttpsAgent as jest.Mock<HttpsAgent>; | ||
|
||
describe('AgentManager', () => { | ||
afterEach(() => { | ||
HttpAgentMock.mockClear(); | ||
HttpsAgentMock.mockClear(); | ||
}); | ||
|
||
describe('#getAgentFactory()', () => { | ||
it('provides factories which are different at each call', () => { | ||
const agentManager = new AgentManager(); | ||
const agentFactory1 = agentManager.getAgentFactory(); | ||
const agentFactory2 = agentManager.getAgentFactory(); | ||
expect(agentFactory1).not.toEqual(agentFactory2); | ||
}); | ||
|
||
describe('one agent factory', () => { | ||
it('provides instances of the http and https Agent classes', () => { | ||
const mockedHttpAgent = new HttpAgent(); | ||
HttpAgentMock.mockImplementationOnce(() => mockedHttpAgent); | ||
const mockedHttpsAgent = new HttpsAgent(); | ||
HttpsAgentMock.mockImplementationOnce(() => mockedHttpsAgent); | ||
const agentManager = new AgentManager(); | ||
const agentFactory = agentManager.getAgentFactory(); | ||
const httpAgent = agentFactory({ url: new URL('http://elastic-node-1:9200') }); | ||
const httpsAgent = agentFactory({ url: new URL('https://elastic-node-1:9200') }); | ||
expect(httpAgent).toEqual(mockedHttpAgent); | ||
expect(httpsAgent).toEqual(mockedHttpsAgent); | ||
}); | ||
|
||
it('provides Agents with a valid default configuration', () => { | ||
const agentManager = new AgentManager(); | ||
const agentFactory = agentManager.getAgentFactory(); | ||
agentFactory({ url: new URL('http://elastic-node-1:9200') }); | ||
expect(HttpAgent).toBeCalledTimes(1); | ||
expect(HttpAgent).toBeCalledWith({ | ||
keepAlive: true, | ||
keepAliveMsecs: 50000, | ||
maxFreeSockets: 256, | ||
maxSockets: 256, | ||
scheduling: 'lifo', | ||
}); | ||
}); | ||
|
||
it('takes into account the provided configurations', () => { | ||
const agentManager = new AgentManager({ maxFreeSockets: 32, maxSockets: 2048 }); | ||
const agentFactory = agentManager.getAgentFactory({ | ||
maxSockets: 1024, | ||
scheduling: 'fifo', | ||
}); | ||
agentFactory({ url: new URL('http://elastic-node-1:9200') }); | ||
expect(HttpAgent).toBeCalledTimes(1); | ||
expect(HttpAgent).toBeCalledWith({ | ||
keepAlive: true, | ||
keepAliveMsecs: 50000, | ||
maxFreeSockets: 32, | ||
maxSockets: 1024, | ||
scheduling: 'fifo', | ||
}); | ||
}); | ||
|
||
it('provides Agents that match the URLs protocol', () => { | ||
const agentManager = new AgentManager(); | ||
const agentFactory = agentManager.getAgentFactory(); | ||
agentFactory({ url: new URL('http://elastic-node-1:9200') }); | ||
expect(HttpAgent).toHaveBeenCalledTimes(1); | ||
expect(HttpsAgent).toHaveBeenCalledTimes(0); | ||
agentFactory({ url: new URL('https://elastic-node-3:9200') }); | ||
expect(HttpAgent).toHaveBeenCalledTimes(1); | ||
expect(HttpsAgent).toHaveBeenCalledTimes(1); | ||
}); | ||
|
||
it('provides the same Agent iif URLs use the same protocol', () => { | ||
const agentManager = new AgentManager(); | ||
const agentFactory = agentManager.getAgentFactory(); | ||
const agent1 = agentFactory({ url: new URL('http://elastic-node-1:9200') }); | ||
const agent2 = agentFactory({ url: new URL('http://elastic-node-2:9200') }); | ||
const agent3 = agentFactory({ url: new URL('https://elastic-node-3:9200') }); | ||
const agent4 = agentFactory({ url: new URL('https://elastic-node-4:9200') }); | ||
|
||
expect(agent1).toEqual(agent2); | ||
expect(agent1).not.toEqual(agent3); | ||
expect(agent3).toEqual(agent4); | ||
}); | ||
|
||
it('dereferences an agent instance when the agent is closed', () => { | ||
const agentManager = new AgentManager(); | ||
const agentFactory = agentManager.getAgentFactory(); | ||
const agent = agentFactory({ url: new URL('http://elastic-node-1:9200') }); | ||
// eslint-disable-next-line dot-notation | ||
expect(agentManager['httpStore'].has(agent)).toEqual(true); | ||
agent.destroy(); | ||
// eslint-disable-next-line dot-notation | ||
expect(agentManager['httpStore'].has(agent)).toEqual(false); | ||
}); | ||
}); | ||
|
||
describe('two agent factories', () => { | ||
it('never provide the same Agent instance even if they use the same type', () => { | ||
const agentManager = new AgentManager(); | ||
const agentFactory1 = agentManager.getAgentFactory(); | ||
const agentFactory2 = agentManager.getAgentFactory(); | ||
const agent1 = agentFactory1({ url: new URL('http://elastic-node-1:9200') }); | ||
const agent2 = agentFactory2({ url: new URL('http://elastic-node-1:9200') }); | ||
expect(agent1).not.toEqual(agent2); | ||
}); | ||
}); | ||
}); | ||
}); |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0 and the Server Side Public License, v 1; you may not use this file except | ||
* in compliance with, at your election, the Elastic License 2.0 or the Server | ||
* Side Public License, v 1. | ||
*/ | ||
|
||
import { Agent as HttpAgent } from 'http'; | ||
import { Agent as HttpsAgent } from 'https'; | ||
import { ConnectionOptions, HttpAgentOptions } from '@elastic/elasticsearch'; | ||
|
||
const HTTPS = 'https:'; | ||
const DEFAULT_CONFIG: HttpAgentOptions = { | ||
keepAlive: true, | ||
keepAliveMsecs: 50000, | ||
maxSockets: 256, | ||
maxFreeSockets: 256, | ||
scheduling: 'lifo', | ||
}; | ||
|
||
export type NetworkAgent = HttpAgent | HttpsAgent; | ||
export type AgentFactory = (connectionOpts: ConnectionOptions) => NetworkAgent; | ||
|
||
/** | ||
* Allows obtaining Agent factories, which can then be fed into elasticsearch-js's Client class. | ||
* Ideally, we should obtain one Agent factory for each ES Client class. | ||
* This allows using the same Agent across all the Pools and Connections of the Client (one per ES node). | ||
* | ||
* Agent instances are stored internally to allow collecting metrics (nbr of active/idle connections to ES). | ||
* | ||
* Using the same Agent factory across multiple ES Client instances is strongly discouraged, cause ES Client | ||
* exposes methods that can modify the underlying pools, effectively impacting the connections of other Clients. | ||
* @internal | ||
**/ | ||
export class AgentManager { | ||
// Stores Https Agent instances | ||
private httpsStore: Set<HttpsAgent>; | ||
// Stores Http Agent instances | ||
private httpStore: Set<HttpAgent>; | ||
|
||
constructor(private agentOptions: HttpAgentOptions = DEFAULT_CONFIG) { | ||
this.httpsStore = new Set(); | ||
this.httpStore = new Set(); | ||
} | ||
|
||
public getAgentFactory(agentOptions?: HttpAgentOptions): AgentFactory { | ||
// a given agent factory always provides the same Agent instances (for the same protocol) | ||
// we keep references to the instances at factory level, to be able to reuse them | ||
let httpAgent: HttpAgent; | ||
let httpsAgent: HttpsAgent; | ||
|
||
return (connectionOpts: ConnectionOptions): NetworkAgent => { | ||
if (connectionOpts.url.protocol === HTTPS) { | ||
if (!httpsAgent) { | ||
rudolf marked this conversation as resolved.
Show resolved
Hide resolved
|
||
const config = Object.assign( | ||
{}, | ||
DEFAULT_CONFIG, | ||
this.agentOptions, | ||
agentOptions, | ||
connectionOpts.tls | ||
); | ||
httpsAgent = new HttpsAgent(config); | ||
this.httpsStore.add(httpsAgent); | ||
dereferenceOnDestroy(this.httpsStore, httpsAgent); | ||
} | ||
|
||
return httpsAgent; | ||
} | ||
|
||
if (!httpAgent) { | ||
const config = Object.assign({}, DEFAULT_CONFIG, this.agentOptions, agentOptions); | ||
httpAgent = new HttpAgent(config); | ||
this.httpStore.add(httpAgent); | ||
dereferenceOnDestroy(this.httpStore, httpAgent); | ||
} | ||
|
||
return httpAgent; | ||
}; | ||
} | ||
} | ||
|
||
const dereferenceOnDestroy = (protocolStore: Set<NetworkAgent>, agent: NetworkAgent) => { | ||
const doDestroy = agent.destroy.bind(agent); | ||
agent.destroy = () => { | ||
protocolStore.delete(agent); | ||
doDestroy(); | ||
}; | ||
}; |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,7 +9,7 @@ | |
import { ConnectionOptions as TlsConnectionOptions } from 'tls'; | ||
import { URL } from 'url'; | ||
import { Duration } from 'moment'; | ||
import type { ClientOptions } from '@elastic/elasticsearch/lib/client'; | ||
import type { ClientOptions, HttpAgentOptions } from '@elastic/elasticsearch'; | ||
import type { ElasticsearchClientConfig } from '@kbn/core-elasticsearch-server'; | ||
import { DEFAULT_HEADERS } from './headers'; | ||
|
||
|
@@ -23,8 +23,8 @@ import { DEFAULT_HEADERS } from './headers'; | |
export function parseClientOptions( | ||
config: ElasticsearchClientConfig, | ||
scoped: boolean | ||
): ClientOptions { | ||
const clientOptions: ClientOptions = { | ||
): ClientOptions & { agent: HttpAgentOptions } { | ||
const clientOptions: ClientOptions & { agent: HttpAgentOptions } = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NIT: I would introduce an explicit type instead of an inline def export type ParsedClientOptions = Omit<ClientOptions, 'agent'> & { agent: HttpAgentOptions } |
||
sniffOnStart: config.sniffOnStart, | ||
sniffOnConnectionFault: config.sniffOnConnectionFault, | ||
headers: { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ import { httpServerMock, httpServiceMock } from '@kbn/core-http-server-mocks'; | |
import type { ElasticsearchClientConfig } from '@kbn/core-elasticsearch-server'; | ||
import { ClusterClient } from './cluster_client'; | ||
import { DEFAULT_HEADERS } from './headers'; | ||
import { AgentManager } from './agent_manager'; | ||
|
||
const createConfig = ( | ||
parts: Partial<ElasticsearchClientConfig> = {} | ||
|
@@ -83,17 +84,51 @@ describe('ClusterClient', () => { | |
expect(configureClientMock).toHaveBeenCalledTimes(2); | ||
expect(configureClientMock).toHaveBeenCalledWith(config, { | ||
logger, | ||
agentManager: undefined, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NIT: add test to ensure that the agent manager is passed down to the |
||
type: 'custom-type', | ||
getExecutionContext: getExecutionContextMock, | ||
}); | ||
expect(configureClientMock).toHaveBeenCalledWith(config, { | ||
logger, | ||
agentManager: undefined, | ||
type: 'custom-type', | ||
getExecutionContext: getExecutionContextMock, | ||
scoped: true, | ||
}); | ||
}); | ||
|
||
describe('when an AgentManager is provided', () => { | ||
it('calls configureClient passing in the provided AgentManager', () => { | ||
const config = createConfig(); | ||
const getExecutionContextMock = jest.fn(); | ||
const agentManager = new AgentManager(); | ||
|
||
new ClusterClient({ | ||
config, | ||
logger, | ||
authHeaders, | ||
agentManager, | ||
type: 'custom-type', | ||
getExecutionContext: getExecutionContextMock, | ||
}); | ||
|
||
expect(configureClientMock).toHaveBeenCalledTimes(2); | ||
expect(configureClientMock).toHaveBeenCalledWith(config, { | ||
logger, | ||
agentManager, | ||
type: 'custom-type', | ||
getExecutionContext: getExecutionContextMock, | ||
}); | ||
expect(configureClientMock).toHaveBeenCalledWith(config, { | ||
logger, | ||
agentManager, | ||
type: 'custom-type', | ||
getExecutionContext: getExecutionContextMock, | ||
scoped: true, | ||
}); | ||
}); | ||
}); | ||
|
||
describe('#asInternalUser', () => { | ||
it('returns the internal client', () => { | ||
const clusterClient = new ClusterClient({ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,7 @@ import { ScopedClusterClient } from './scoped_cluster_client'; | |
import { DEFAULT_HEADERS } from './headers'; | ||
import { createInternalErrorHandler, InternalUnauthorizedErrorHandler } from './retry_unauthorized'; | ||
import { createTransport } from './create_transport'; | ||
import { AgentManager } from './agent_manager'; | ||
|
||
const noop = () => undefined; | ||
|
||
|
@@ -47,25 +48,33 @@ export class ClusterClient implements ICustomClusterClient { | |
authHeaders, | ||
getExecutionContext = noop, | ||
getUnauthorizedErrorHandler = noop, | ||
agentManager, | ||
}: { | ||
config: ElasticsearchClientConfig; | ||
logger: Logger; | ||
type: string; | ||
authHeaders?: IAuthHeadersStorage; | ||
getExecutionContext?: () => string | undefined; | ||
getUnauthorizedErrorHandler?: () => UnauthorizedErrorHandler | undefined; | ||
agentManager?: AgentManager; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. May we want to make this mandatory instead of optional? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Having it optional is an easy way to fallback to the previous behavior (as per my comment above). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. when an agentManager is not supplied I'd say our clients no longer behave as expected e.g. the meaning of maxSockets will change. It's only core using this constructor so it would be easy to change if we switch to undici. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fair enough, I'll make it mandatory. |
||
}) { | ||
this.config = config; | ||
this.authHeaders = authHeaders; | ||
this.getExecutionContext = getExecutionContext; | ||
this.getUnauthorizedErrorHandler = getUnauthorizedErrorHandler; | ||
|
||
this.asInternalUser = configureClient(config, { logger, type, getExecutionContext }); | ||
this.asInternalUser = configureClient(config, { | ||
logger, | ||
type, | ||
getExecutionContext, | ||
agentManager, | ||
}); | ||
this.rootScopedClient = configureClient(config, { | ||
logger, | ||
type, | ||
getExecutionContext, | ||
scoped: true, | ||
agentManager, | ||
}); | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TIL: that's true in a test environment with
I would expect both to be
undefined
given the constructor has been mocked without specifying return values.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Jest seems to be mocking the class automatically, and returning an instance that mocks all its methods too.