diff --git a/src/core/server/elasticsearch/elasticsearch_service.test.ts b/src/core/server/elasticsearch/elasticsearch_service.test.ts index 022a03e01d37d..2667859f303d4 100644 --- a/src/core/server/elasticsearch/elasticsearch_service.test.ts +++ b/src/core/server/elasticsearch/elasticsearch_service.test.ts @@ -33,6 +33,9 @@ import { ElasticsearchService } from './elasticsearch_service'; import { elasticsearchServiceMock } from './elasticsearch_service.mock'; import { duration } from 'moment'; +const delay = async (durationMs: number) => + await new Promise(resolve => setTimeout(resolve, durationMs)); + let elasticsearchService: ElasticsearchService; const configService = configServiceMock.create(); const deps = { @@ -42,7 +45,7 @@ configService.atPath.mockReturnValue( new BehaviorSubject({ hosts: ['http://1.2.3.4'], healthCheck: { - delay: duration(2000), + delay: duration(10), }, ssl: { verificationMode: 'none', @@ -125,21 +128,21 @@ describe('#setup', () => { const config = MockClusterClient.mock.calls[0][0]; expect(config).toMatchInlineSnapshot(` -Object { - "healthCheckDelay": "PT2S", - "hosts": Array [ - "http://8.8.8.8", - ], - "logQueries": true, - "requestHeadersWhitelist": Array [ - undefined, - ], - "ssl": Object { - "certificate": "certificate-value", - "verificationMode": "none", - }, -} -`); + Object { + "healthCheckDelay": "PT0.01S", + "hosts": Array [ + "http://8.8.8.8", + ], + "logQueries": true, + "requestHeadersWhitelist": Array [ + undefined, + ], + "ssl": Object { + "certificate": "certificate-value", + "verificationMode": "none", + }, + } + `); }); it('falls back to elasticsearch config if custom config not passed', async () => { const setupContract = await elasticsearchService.setup(deps); @@ -150,24 +153,24 @@ Object { const config = MockClusterClient.mock.calls[0][0]; expect(config).toMatchInlineSnapshot(` -Object { - "healthCheckDelay": "PT2S", - "hosts": Array [ - "http://1.2.3.4", - ], - "requestHeadersWhitelist": Array [ - undefined, - ], - "ssl": Object { - "alwaysPresentCertificate": undefined, - "certificate": undefined, - "certificateAuthorities": undefined, - "key": undefined, - "keyPassphrase": undefined, - "verificationMode": "none", - }, -} -`); + Object { + "healthCheckDelay": "PT0.01S", + "hosts": Array [ + "http://1.2.3.4", + ], + "requestHeadersWhitelist": Array [ + undefined, + ], + "ssl": Object { + "alwaysPresentCertificate": undefined, + "certificate": undefined, + "certificateAuthorities": undefined, + "key": undefined, + "keyPassphrase": undefined, + "verificationMode": "none", + }, + } + `); }); it('does not merge elasticsearch hosts if custom config overrides', async () => { @@ -213,6 +216,45 @@ Object { `); }); }); + + it('esNodeVersionCompatibility$ only starts polling when subscribed to', async done => { + const mockAdminClusterClientInstance = elasticsearchServiceMock.createClusterClient(); + const mockDataClusterClientInstance = elasticsearchServiceMock.createClusterClient(); + MockClusterClient.mockImplementationOnce( + () => mockAdminClusterClientInstance + ).mockImplementationOnce(() => mockDataClusterClientInstance); + + mockAdminClusterClientInstance.callAsInternalUser.mockRejectedValue(new Error()); + + const setupContract = await elasticsearchService.setup(deps); + await delay(10); + + expect(mockAdminClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(0); + setupContract.esNodesCompatibility$.subscribe(() => { + expect(mockAdminClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1); + done(); + }); + }); + + it('esNodeVersionCompatibility$ stops polling when unsubscribed from', async done => { + const mockAdminClusterClientInstance = elasticsearchServiceMock.createClusterClient(); + const mockDataClusterClientInstance = elasticsearchServiceMock.createClusterClient(); + MockClusterClient.mockImplementationOnce( + () => mockAdminClusterClientInstance + ).mockImplementationOnce(() => mockDataClusterClientInstance); + + mockAdminClusterClientInstance.callAsInternalUser.mockRejectedValue(new Error()); + + const setupContract = await elasticsearchService.setup(deps); + + expect(mockAdminClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(0); + const sub = setupContract.esNodesCompatibility$.subscribe(async () => { + sub.unsubscribe(); + await delay(100); + expect(mockAdminClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1); + done(); + }); + }); }); describe('#stop', () => { @@ -229,4 +271,27 @@ describe('#stop', () => { expect(mockAdminClusterClientInstance.close).toHaveBeenCalledTimes(1); expect(mockDataClusterClientInstance.close).toHaveBeenCalledTimes(1); }); + + it('stops pollEsNodeVersions even if there are active subscriptions', async done => { + expect.assertions(2); + const mockAdminClusterClientInstance = elasticsearchServiceMock.createCustomClusterClient(); + const mockDataClusterClientInstance = elasticsearchServiceMock.createCustomClusterClient(); + + MockClusterClient.mockImplementationOnce( + () => mockAdminClusterClientInstance + ).mockImplementationOnce(() => mockDataClusterClientInstance); + + mockAdminClusterClientInstance.callAsInternalUser.mockRejectedValue(new Error()); + + const setupContract = await elasticsearchService.setup(deps); + + setupContract.esNodesCompatibility$.subscribe(async () => { + expect(mockAdminClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1); + + await elasticsearchService.stop(); + await delay(100); + expect(mockAdminClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1); + done(); + }); + }); }); diff --git a/src/core/server/elasticsearch/elasticsearch_service.ts b/src/core/server/elasticsearch/elasticsearch_service.ts index 4d897dbd8f7aa..be54094ab9cb9 100644 --- a/src/core/server/elasticsearch/elasticsearch_service.ts +++ b/src/core/server/elasticsearch/elasticsearch_service.ts @@ -17,8 +17,17 @@ * under the License. */ -import { ConnectableObservable, Observable, Subscription } from 'rxjs'; -import { filter, first, map, publishReplay, switchMap, take } from 'rxjs/operators'; +import { ConnectableObservable, Observable, Subscription, Subject } from 'rxjs'; +import { + filter, + first, + map, + publishReplay, + switchMap, + take, + shareReplay, + takeUntil, +} from 'rxjs/operators'; import { CoreService } from '../../types'; import { merge } from '../../utils'; @@ -47,13 +56,8 @@ interface SetupDeps { export class ElasticsearchService implements CoreService { private readonly log: Logger; private readonly config$: Observable; - private subscriptions: { - client?: Subscription; - esNodesCompatibility?: Subscription; - } = { - client: undefined, - esNodesCompatibility: undefined, - }; + private subscription: Subscription | undefined; + private stop$ = new Subject(); private kibanaVersion: string; constructor(private readonly coreContext: CoreContext) { @@ -69,7 +73,7 @@ export class ElasticsearchService implements CoreService { - if (this.subscriptions.client !== undefined) { + if (this.subscription !== undefined) { this.log.error('Clients cannot be changed after they are created'); return false; } @@ -100,7 +104,7 @@ export class ElasticsearchService implements CoreService; - this.subscriptions.client = clients$.connect(); + this.subscription = clients$.connect(); const config = await this.config$.pipe(first()).toPromise(); @@ -164,18 +168,7 @@ export class ElasticsearchService implements CoreService).connect(); - - // TODO: Move to Status Service https://github.com/elastic/kibana/issues/41983 - esNodesCompatibility$.subscribe(({ isCompatible, message }) => { - if (!isCompatible && message) { - this.log.error(message); - } - }); + }).pipe(takeUntil(this.stop$), shareReplay({ refCount: true, bufferSize: 1 })); return { legacy: { config$: clients$.pipe(map(clients => clients.config)) }, @@ -195,12 +188,10 @@ export class ElasticsearchService implements CoreService { + if (!isCompatible && message) { + this.logger.error(message); + } + }); + await this.setupDeps!.elasticsearch.esNodesCompatibility$.pipe( filter(nodes => nodes.isCompatible), take(1)