Skip to content

Commit

Permalink
Merge branch 'master' into remote_clusters/np
Browse files Browse the repository at this point in the history
  • Loading branch information
elasticmachine authored Mar 2, 2020
2 parents f561cac + c9ebeb7 commit 02b6ca6
Show file tree
Hide file tree
Showing 54 changed files with 931 additions and 4,836 deletions.
12 changes: 5 additions & 7 deletions src/cli/serve/serve.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ import { getConfigPath } from '../../core/server/path';
import { bootstrap } from '../../core/server';
import { readKeystore } from './read_keystore';

import { DEV_SSL_CERT_PATH, DEV_SSL_KEY_PATH } from '../dev_ssl';

function canRequire(path) {
try {
require.resolve(path);
Expand Down Expand Up @@ -90,7 +88,7 @@ function applyConfigOverrides(rawConfig, opts, extraCliOptions) {

if (opts.ssl) {
// @kbn/dev-utils is part of devDependencies
const { CA_CERT_PATH } = require('@kbn/dev-utils');
const { CA_CERT_PATH, KBN_KEY_PATH, KBN_CERT_PATH } = require('@kbn/dev-utils');
const customElasticsearchHosts = opts.elasticsearch
? opts.elasticsearch.split(',')
: [].concat(get('elasticsearch.hosts') || []);
Expand All @@ -104,6 +102,7 @@ function applyConfigOverrides(rawConfig, opts, extraCliOptions) {
ensureNotDefined('server.ssl.key');
ensureNotDefined('server.ssl.keystore.path');
ensureNotDefined('server.ssl.truststore.path');
ensureNotDefined('server.ssl.certificateAuthorities');
ensureNotDefined('elasticsearch.ssl.certificateAuthorities');

const elasticsearchHosts = (
Expand All @@ -121,10 +120,9 @@ function applyConfigOverrides(rawConfig, opts, extraCliOptions) {
});

set('server.ssl.enabled', true);
// TODO: change this cert/key to KBN_CERT_PATH and KBN_KEY_PATH from '@kbn/dev-utils'; will require some work to avoid breaking
// functional tests. Once that is done, the existing test cert/key at DEV_SSL_CERT_PATH and DEV_SSL_KEY_PATH can be deleted.
set('server.ssl.certificate', DEV_SSL_CERT_PATH);
set('server.ssl.key', DEV_SSL_KEY_PATH);
set('server.ssl.certificate', KBN_CERT_PATH);
set('server.ssl.key', KBN_KEY_PATH);
set('server.ssl.certificateAuthorities', CA_CERT_PATH);
set('elasticsearch.hosts', elasticsearchHosts);
set('elasticsearch.ssl.certificateAuthorities', CA_CERT_PATH);
}
Expand Down
133 changes: 99 additions & 34 deletions src/core/server/elasticsearch/elasticsearch_service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ import { ElasticsearchService } from './elasticsearch_service';
import { elasticsearchServiceMock } from './elasticsearch_service.mock';
import { duration } from 'moment';

const delay = async (durationMs: number) =>
await new Promise(resolve => setTimeout(resolve, durationMs));

let elasticsearchService: ElasticsearchService;
const configService = configServiceMock.create();
const deps = {
Expand All @@ -42,7 +45,7 @@ configService.atPath.mockReturnValue(
new BehaviorSubject({
hosts: ['http://1.2.3.4'],
healthCheck: {
delay: duration(2000),
delay: duration(10),
},
ssl: {
verificationMode: 'none',
Expand Down Expand Up @@ -125,21 +128,21 @@ describe('#setup', () => {

const config = MockClusterClient.mock.calls[0][0];
expect(config).toMatchInlineSnapshot(`
Object {
"healthCheckDelay": "PT2S",
"hosts": Array [
"http://8.8.8.8",
],
"logQueries": true,
"requestHeadersWhitelist": Array [
undefined,
],
"ssl": Object {
"certificate": "certificate-value",
"verificationMode": "none",
},
}
`);
Object {
"healthCheckDelay": "PT0.01S",
"hosts": Array [
"http://8.8.8.8",
],
"logQueries": true,
"requestHeadersWhitelist": Array [
undefined,
],
"ssl": Object {
"certificate": "certificate-value",
"verificationMode": "none",
},
}
`);
});
it('falls back to elasticsearch config if custom config not passed', async () => {
const setupContract = await elasticsearchService.setup(deps);
Expand All @@ -150,24 +153,24 @@ Object {

const config = MockClusterClient.mock.calls[0][0];
expect(config).toMatchInlineSnapshot(`
Object {
"healthCheckDelay": "PT2S",
"hosts": Array [
"http://1.2.3.4",
],
"requestHeadersWhitelist": Array [
undefined,
],
"ssl": Object {
"alwaysPresentCertificate": undefined,
"certificate": undefined,
"certificateAuthorities": undefined,
"key": undefined,
"keyPassphrase": undefined,
"verificationMode": "none",
},
}
`);
Object {
"healthCheckDelay": "PT0.01S",
"hosts": Array [
"http://1.2.3.4",
],
"requestHeadersWhitelist": Array [
undefined,
],
"ssl": Object {
"alwaysPresentCertificate": undefined,
"certificate": undefined,
"certificateAuthorities": undefined,
"key": undefined,
"keyPassphrase": undefined,
"verificationMode": "none",
},
}
`);
});

it('does not merge elasticsearch hosts if custom config overrides', async () => {
Expand Down Expand Up @@ -213,6 +216,45 @@ Object {
`);
});
});

it('esNodeVersionCompatibility$ only starts polling when subscribed to', async done => {
const mockAdminClusterClientInstance = elasticsearchServiceMock.createClusterClient();
const mockDataClusterClientInstance = elasticsearchServiceMock.createClusterClient();
MockClusterClient.mockImplementationOnce(
() => mockAdminClusterClientInstance
).mockImplementationOnce(() => mockDataClusterClientInstance);

mockAdminClusterClientInstance.callAsInternalUser.mockRejectedValue(new Error());

const setupContract = await elasticsearchService.setup(deps);
await delay(10);

expect(mockAdminClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(0);
setupContract.esNodesCompatibility$.subscribe(() => {
expect(mockAdminClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1);
done();
});
});

it('esNodeVersionCompatibility$ stops polling when unsubscribed from', async done => {
const mockAdminClusterClientInstance = elasticsearchServiceMock.createClusterClient();
const mockDataClusterClientInstance = elasticsearchServiceMock.createClusterClient();
MockClusterClient.mockImplementationOnce(
() => mockAdminClusterClientInstance
).mockImplementationOnce(() => mockDataClusterClientInstance);

mockAdminClusterClientInstance.callAsInternalUser.mockRejectedValue(new Error());

const setupContract = await elasticsearchService.setup(deps);

expect(mockAdminClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(0);
const sub = setupContract.esNodesCompatibility$.subscribe(async () => {
sub.unsubscribe();
await delay(100);
expect(mockAdminClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1);
done();
});
});
});

describe('#stop', () => {
Expand All @@ -229,4 +271,27 @@ describe('#stop', () => {
expect(mockAdminClusterClientInstance.close).toHaveBeenCalledTimes(1);
expect(mockDataClusterClientInstance.close).toHaveBeenCalledTimes(1);
});

it('stops pollEsNodeVersions even if there are active subscriptions', async done => {
expect.assertions(2);
const mockAdminClusterClientInstance = elasticsearchServiceMock.createCustomClusterClient();
const mockDataClusterClientInstance = elasticsearchServiceMock.createCustomClusterClient();

MockClusterClient.mockImplementationOnce(
() => mockAdminClusterClientInstance
).mockImplementationOnce(() => mockDataClusterClientInstance);

mockAdminClusterClientInstance.callAsInternalUser.mockRejectedValue(new Error());

const setupContract = await elasticsearchService.setup(deps);

setupContract.esNodesCompatibility$.subscribe(async () => {
expect(mockAdminClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1);

await elasticsearchService.stop();
await delay(100);
expect(mockAdminClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1);
done();
});
});
});
49 changes: 20 additions & 29 deletions src/core/server/elasticsearch/elasticsearch_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,17 @@
* under the License.
*/

import { ConnectableObservable, Observable, Subscription } from 'rxjs';
import { filter, first, map, publishReplay, switchMap, take } from 'rxjs/operators';
import { ConnectableObservable, Observable, Subscription, Subject } from 'rxjs';
import {
filter,
first,
map,
publishReplay,
switchMap,
take,
shareReplay,
takeUntil,
} from 'rxjs/operators';

import { CoreService } from '../../types';
import { merge } from '../../utils';
Expand Down Expand Up @@ -47,13 +56,8 @@ interface SetupDeps {
export class ElasticsearchService implements CoreService<InternalElasticsearchServiceSetup> {
private readonly log: Logger;
private readonly config$: Observable<ElasticsearchConfig>;
private subscriptions: {
client?: Subscription;
esNodesCompatibility?: Subscription;
} = {
client: undefined,
esNodesCompatibility: undefined,
};
private subscription: Subscription | undefined;
private stop$ = new Subject();
private kibanaVersion: string;

constructor(private readonly coreContext: CoreContext) {
Expand All @@ -69,7 +73,7 @@ export class ElasticsearchService implements CoreService<InternalElasticsearchSe

const clients$ = this.config$.pipe(
filter(() => {
if (this.subscriptions.client !== undefined) {
if (this.subscription !== undefined) {
this.log.error('Clients cannot be changed after they are created');
return false;
}
Expand Down Expand Up @@ -100,7 +104,7 @@ export class ElasticsearchService implements CoreService<InternalElasticsearchSe
publishReplay(1)
) as ConnectableObservable<CoreClusterClients>;

this.subscriptions.client = clients$.connect();
this.subscription = clients$.connect();

const config = await this.config$.pipe(first()).toPromise();

Expand Down Expand Up @@ -164,18 +168,7 @@ export class ElasticsearchService implements CoreService<InternalElasticsearchSe
ignoreVersionMismatch: config.ignoreVersionMismatch,
esVersionCheckInterval: config.healthCheckDelay.asMilliseconds(),
kibanaVersion: this.kibanaVersion,
}).pipe(publishReplay(1));

this.subscriptions.esNodesCompatibility = (esNodesCompatibility$ as ConnectableObservable<
unknown
>).connect();

// TODO: Move to Status Service https://github.com/elastic/kibana/issues/41983
esNodesCompatibility$.subscribe(({ isCompatible, message }) => {
if (!isCompatible && message) {
this.log.error(message);
}
});
}).pipe(takeUntil(this.stop$), shareReplay({ refCount: true, bufferSize: 1 }));

return {
legacy: { config$: clients$.pipe(map(clients => clients.config)) },
Expand All @@ -195,12 +188,10 @@ export class ElasticsearchService implements CoreService<InternalElasticsearchSe

public async stop() {
this.log.debug('Stopping elasticsearch service');
// TODO(TS-3.7-ESLINT)
// eslint-disable-next-line no-unused-expressions
this.subscriptions.client?.unsubscribe();
// eslint-disable-next-line no-unused-expressions
this.subscriptions.esNodesCompatibility?.unsubscribe();
this.subscriptions = { client: undefined, esNodesCompatibility: undefined };
if (this.subscription !== undefined) {
this.subscription.unsubscribe();
}
this.stop$.next();
}

private createClusterClient(
Expand Down
13 changes: 13 additions & 0 deletions src/core/server/elasticsearch/retry_call_cluster.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,19 @@ describe('migrationsRetryCallCluster', () => {
});
});

it('retries ES API calls that rejects with snapshot_in_progress_exception', () => {
expect.assertions(1);
const callEsApi = jest.fn();
let i = 0;
callEsApi.mockImplementation(() => {
return i++ <= 2
? Promise.reject({ body: { error: { type: 'snapshot_in_progress_exception' } } })
: Promise.resolve('success');
});
const retried = migrationsRetryCallCluster(callEsApi, mockLogger.get('mock log'), 1);
return expect(retried('endpoint')).resolves.toMatchInlineSnapshot(`"success"`);
});

it('rejects when ES API calls reject with other errors', async () => {
expect.assertions(3);
const callEsApi = jest.fn();
Expand Down
13 changes: 3 additions & 10 deletions src/core/server/elasticsearch/retry_call_cluster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ export function migrationsRetryCallCluster(
error instanceof esErrors.AuthenticationException ||
error instanceof esErrors.AuthorizationException ||
// @ts-ignore
error instanceof esErrors.Gone
error instanceof esErrors.Gone ||
error?.body?.error?.type === 'snapshot_in_progress_exception'
);
},
timer(delay),
Expand All @@ -85,15 +86,7 @@ export function migrationsRetryCallCluster(
*
* @param apiCaller
*/

// TODO: Replace with APICaller from './scoped_cluster_client' once #46668 is merged
export function retryCallCluster(
apiCaller: (
endpoint: string,
clientParams: Record<string, any>,
options?: CallAPIOptions
) => Promise<any>
) {
export function retryCallCluster(apiCaller: APICaller) {
return (endpoint: string, clientParams: Record<string, any> = {}, options?: CallAPIOptions) => {
return defer(() => apiCaller(endpoint, clientParams, options))
.pipe(
Expand Down
8 changes: 8 additions & 0 deletions src/core/server/saved_objects/saved_objects_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,14 @@ export class SavedObjectsService
this.logger.info(
'Waiting until all Elasticsearch nodes are compatible with Kibana before starting saved objects migrations...'
);

// TODO: Move to Status Service https://github.com/elastic/kibana/issues/41983
this.setupDeps!.elasticsearch.esNodesCompatibility$.subscribe(({ isCompatible, message }) => {
if (!isCompatible && message) {
this.logger.error(message);
}
});

await this.setupDeps!.elasticsearch.esNodesCompatibility$.pipe(
filter(nodes => nodes.isCompatible),
take(1)
Expand Down
Loading

0 comments on commit 02b6ca6

Please sign in to comment.