Skip to content

Commit

Permalink
Splits migrationsv2 actions and unit tests into separate files (#101200)
Browse files Browse the repository at this point in the history
* Splits migrationsv2 actions and unit tests into separate files

* Moves actions integration tests
  • Loading branch information
TinaHeiligers authored and kibanamachine committed Jun 3, 2021
1 parent fdbdddf commit e1d0771
Show file tree
Hide file tree
Showing 44 changed files with 2,544 additions and 1,539 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors';
import { errors as EsErrors } from '@elastic/elasticsearch';
jest.mock('./catch_retryable_es_client_errors');
import { elasticsearchClientMock } from '../../../elasticsearch/client/mocks';
import { bulkOverwriteTransformedDocuments } from './bulk_overwrite_transformed_documents';

describe('bulkOverwriteTransformedDocuments', () => {
beforeEach(() => {
jest.clearAllMocks();
});

// Create a mock client that rejects all methods with a 503 status code
// response.
const retryableError = new EsErrors.ResponseError(
elasticsearchClientMock.createApiResponse({
statusCode: 503,
body: { error: { type: 'es_type', reason: 'es_reason' } },
})
);
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
);
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = bulkOverwriteTransformedDocuments({
client,
index: 'new_index',
transformedDocs: [],
refresh: 'wait_for',
});
try {
await task();
} catch (e) {
/** ignore */
}

expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import * as Either from 'fp-ts/lib/Either';
import * as TaskEither from 'fp-ts/lib/TaskEither';
import type { estypes } from '@elastic/elasticsearch';
import { ElasticsearchClient } from '../../../elasticsearch';
import type { SavedObjectsRawDoc } from '../../serialization';
import {
catchRetryableEsClientErrors,
RetryableEsClientError,
} from './catch_retryable_es_client_errors';
import { WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE } from './constants';

/** @internal */
export interface BulkOverwriteTransformedDocumentsParams {
client: ElasticsearchClient;
index: string;
transformedDocs: SavedObjectsRawDoc[];
refresh?: estypes.Refresh;
}
/**
* Write the up-to-date transformed documents to the index, overwriting any
* documents that are still on their outdated version.
*/
export const bulkOverwriteTransformedDocuments = ({
client,
index,
transformedDocs,
refresh = false,
}: BulkOverwriteTransformedDocumentsParams): TaskEither.TaskEither<
RetryableEsClientError,
'bulk_index_succeeded'
> => () => {
return client
.bulk({
// Because we only add aliases in the MARK_VERSION_INDEX_READY step we
// can't bulkIndex to an alias with require_alias=true. This means if
// users tamper during this operation (delete indices or restore a
// snapshot), we could end up auto-creating an index without the correct
// mappings. Such tampering could lead to many other problems and is
// probably unlikely so for now we'll accept this risk and wait till
// system indices puts in place a hard control.
require_alias: false,
wait_for_active_shards: WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE,
refresh,
filter_path: ['items.*.error'],
body: transformedDocs.flatMap((doc) => {
return [
{
index: {
_index: index,
_id: doc._id,
// overwrite existing documents
op_type: 'index',
// use optimistic concurrency control to ensure that outdated
// documents are only overwritten once with the latest version
if_seq_no: doc._seq_no,
if_primary_term: doc._primary_term,
},
},
doc._source,
];
}),
})
.then((res) => {
// Filter out version_conflict_engine_exception since these just mean
// that another instance already updated these documents
const errors = (res.body.items ?? []).filter(
(item) => item.index?.error?.type !== 'version_conflict_engine_exception'
);
if (errors.length === 0) {
return Either.right('bulk_index_succeeded' as const);
} else {
throw new Error(JSON.stringify(errors));
}
})
.catch(catchRetryableEsClientErrors);
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import { errors as EsErrors } from '@elastic/elasticsearch';
import { cloneIndex } from './clone_index';
import { setWriteBlock } from './set_write_block';
import { elasticsearchClientMock } from '../../../elasticsearch/client/mocks';
import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors';
jest.mock('./catch_retryable_es_client_errors');

describe('cloneIndex', () => {
beforeEach(() => {
jest.clearAllMocks();
});

// Create a mock client that rejects all methods with a 503 status code
// response.
const retryableError = new EsErrors.ResponseError(
elasticsearchClientMock.createApiResponse({
statusCode: 503,
body: { error: { type: 'es_type', reason: 'es_reason' } },
})
);
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
);

const nonRetryableError = new Error('crash');
const clientWithNonRetryableError = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createErrorTransportRequestPromise(nonRetryableError)
);

it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = cloneIndex({
client,
source: 'my_source_index',
target: 'my_target_index',
});
try {
await task();
} catch (e) {
/** ignore */
}
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
});

it('re-throws non retry-able errors', async () => {
const task = setWriteBlock({
client: clientWithNonRetryableError,
index: 'my_index',
});
await task();
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(nonRetryableError);
});
});
141 changes: 141 additions & 0 deletions src/core/server/saved_objects/migrationsv2/actions/clone_index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import * as Either from 'fp-ts/lib/Either';
import * as TaskEither from 'fp-ts/lib/TaskEither';
import { errors as EsErrors } from '@elastic/elasticsearch';
import { pipe } from 'fp-ts/lib/pipeable';
import { ElasticsearchClient } from '../../../elasticsearch';
import {
catchRetryableEsClientErrors,
RetryableEsClientError,
} from './catch_retryable_es_client_errors';
import type { IndexNotFound, AcknowledgeResponse } from './';
import { waitForIndexStatusYellow } from './wait_for_index_status_yellow';
import {
DEFAULT_TIMEOUT,
INDEX_AUTO_EXPAND_REPLICAS,
INDEX_NUMBER_OF_SHARDS,
WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE,
} from './constants';
export type CloneIndexResponse = AcknowledgeResponse;

/** @internal */
export interface CloneIndexParams {
client: ElasticsearchClient;
source: string;
target: string;
/** only used for testing */
timeout?: string;
}
/**
* Makes a clone of the source index into the target.
*
* @remarks
* This method adds some additional logic to the ES clone index API:
* - it is idempotent, if it gets called multiple times subsequent calls will
* wait for the first clone operation to complete (up to 60s)
* - the first call will wait up to 120s for the cluster state and all shards
* to be updated.
*/
export const cloneIndex = ({
client,
source,
target,
timeout = DEFAULT_TIMEOUT,
}: CloneIndexParams): TaskEither.TaskEither<
RetryableEsClientError | IndexNotFound,
CloneIndexResponse
> => {
const cloneTask: TaskEither.TaskEither<
RetryableEsClientError | IndexNotFound,
AcknowledgeResponse
> = () => {
return client.indices
.clone(
{
index: source,
target,
wait_for_active_shards: WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE,
body: {
settings: {
index: {
// The source we're cloning from will have a write block set, so
// we need to remove it to allow writes to our newly cloned index
'blocks.write': false,
number_of_shards: INDEX_NUMBER_OF_SHARDS,
auto_expand_replicas: INDEX_AUTO_EXPAND_REPLICAS,
// Set an explicit refresh interval so that we don't inherit the
// value from incorrectly configured index templates (not required
// after we adopt system indices)
refresh_interval: '1s',
// Bump priority so that recovery happens before newer indices
priority: 10,
},
},
},
timeout,
},
{ maxRetries: 0 /** handle retry ourselves for now */ }
)
.then((res) => {
/**
* - acknowledged=false, we timed out before the cluster state was
* updated with the newly created index, but it probably will be
* created sometime soon.
* - shards_acknowledged=false, we timed out before all shards were
* started
* - acknowledged=true, shards_acknowledged=true, cloning complete
*/
return Either.right({
acknowledged: res.body.acknowledged,
shardsAcknowledged: res.body.shards_acknowledged,
});
})
.catch((error: EsErrors.ResponseError) => {
if (error?.body?.error?.type === 'index_not_found_exception') {
return Either.left({
type: 'index_not_found_exception' as const,
index: error.body.error.index,
});
} else if (error?.body?.error?.type === 'resource_already_exists_exception') {
/**
* If the target index already exists it means a previous clone
* operation had already been started. However, we can't be sure
* that all shards were started so return shardsAcknowledged: false
*/
return Either.right({
acknowledged: true,
shardsAcknowledged: false,
});
} else {
throw error;
}
})
.catch(catchRetryableEsClientErrors);
};

return pipe(
cloneTask,
TaskEither.chain((res) => {
if (res.acknowledged && res.shardsAcknowledged) {
// If the cluster state was updated and all shards ackd we're done
return TaskEither.right(res);
} else {
// Otherwise, wait until the target index has a 'green' status.
return pipe(
waitForIndexStatusYellow({ client, index: target, timeout }),
TaskEither.map((value) => {
/** When the index status is 'green' we know that all shards were started */
return { acknowledged: true, shardsAcknowledged: true };
})
);
}
})
);
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors';
jest.mock('./catch_retryable_es_client_errors');
import { errors as EsErrors } from '@elastic/elasticsearch';
import { elasticsearchClientMock } from '../../../elasticsearch/client/mocks';
import { closePit } from './close_pit';

describe('closePit', () => {
beforeEach(() => {
jest.clearAllMocks();
});

// Create a mock client that rejects all methods with a 503 status code
// response.
const retryableError = new EsErrors.ResponseError(
elasticsearchClientMock.createApiResponse({
statusCode: 503,
body: { error: { type: 'es_type', reason: 'es_reason' } },
})
);
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
);

it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = closePit({ client, pitId: 'pitId' });
try {
await task();
} catch (e) {
/** ignore */
}
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
});
});
Loading

0 comments on commit e1d0771

Please sign in to comment.