From e1d077145eb74ffb6ef6a7f2c36473e80b4c0cbd Mon Sep 17 00:00:00 2001 From: "Christiane (Tina) Heiligers" Date: Thu, 3 Jun 2021 09:54:37 -0700 Subject: [PATCH] Splits migrationsv2 actions and unit tests into separate files (#101200) * Splits migrationsv2 actions and unit tests into separate files * Moves actions integration tests --- ...lk_overwrite_transformed_documents.test.ts | 46 + .../bulk_overwrite_transformed_documents.ts | 84 ++ .../migrationsv2/actions/clone_index.test.ts | 60 + .../migrationsv2/actions/clone_index.ts | 141 ++ .../migrationsv2/actions/close_pit.test.ts | 40 + .../migrationsv2/actions/close_pit.ts | 41 + .../migrationsv2/actions/constants.ts | 20 + .../migrationsv2/actions/create_index.test.ts | 59 + .../migrationsv2/actions/create_index.ts | 145 ++ .../actions/fetch_indices.test.ts | 37 + .../migrationsv2/actions/fetch_indices.ts | 49 + .../migrationsv2/actions/index.test.ts | 346 ----- .../migrationsv2/actions/index.ts | 1267 ++--------------- .../integration_tests/actions.test.ts | 14 +- .../migrationsv2/actions/open_pit.test.ts | 40 + .../migrationsv2/actions/open_pit.ts | 43 + .../actions/pickup_updated_mappings.test.ts | 39 + .../actions/pickup_updated_mappings.ts | 57 + .../actions/read_with_pit.test.ts | 45 + .../migrationsv2/actions/read_with_pit.ts | 92 ++ .../actions/refresh_index.test.ts | 42 + .../migrationsv2/actions/refresh_index.ts | 40 + .../migrationsv2/actions/reindex.test.ts | 48 + .../migrationsv2/actions/reindex.ts | 90 ++ .../actions/remove_write_block.test.ts | 53 + .../actions/remove_write_block.ts | 60 + .../search_for_outdated_documents.test.ts | 69 + .../actions/search_for_outdated_documents.ts | 77 + .../actions/set_write_block.test.ts | 52 + .../migrationsv2/actions/set_write_block.ts | 73 + .../migrationsv2/actions/transform_docs.ts | 30 + .../actions/update_aliases.test.ts | 55 + .../migrationsv2/actions/update_aliases.ts | 98 ++ .../update_and_pickup_mappings.test.ts | 45 + .../actions/update_and_pickup_mappings.ts | 80 ++ .../migrationsv2/actions/verify_reindex.ts | 52 + .../wait_for_index_status_yellow.test.ts | 44 + .../actions/wait_for_index_status_yellow.ts | 45 + ...t_for_pickup_updated_mappings_task.test.ts | 59 + .../wait_for_pickup_updated_mappings_task.ts | 43 + .../actions/wait_for_reindex_task.test.ts | 56 + .../actions/wait_for_reindex_task.ts | 65 + .../actions/wait_for_task.test.ts | 47 + .../migrationsv2/actions/wait_for_task.ts | 95 ++ 44 files changed, 2544 insertions(+), 1539 deletions(-) create mode 100644 src/core/server/saved_objects/migrationsv2/actions/bulk_overwrite_transformed_documents.test.ts create mode 100644 src/core/server/saved_objects/migrationsv2/actions/bulk_overwrite_transformed_documents.ts create mode 100644 src/core/server/saved_objects/migrationsv2/actions/clone_index.test.ts create mode 100644 src/core/server/saved_objects/migrationsv2/actions/clone_index.ts create mode 100644 src/core/server/saved_objects/migrationsv2/actions/close_pit.test.ts create mode 100644 src/core/server/saved_objects/migrationsv2/actions/close_pit.ts create mode 100644 src/core/server/saved_objects/migrationsv2/actions/constants.ts create mode 100644 src/core/server/saved_objects/migrationsv2/actions/create_index.test.ts create mode 100644 src/core/server/saved_objects/migrationsv2/actions/create_index.ts create mode 100644 src/core/server/saved_objects/migrationsv2/actions/fetch_indices.test.ts create mode 100644 src/core/server/saved_objects/migrationsv2/actions/fetch_indices.ts delete mode 100644 src/core/server/saved_objects/migrationsv2/actions/index.test.ts rename src/core/server/saved_objects/migrationsv2/{ => actions}/integration_tests/actions.test.ts (99%) create mode 100644 src/core/server/saved_objects/migrationsv2/actions/open_pit.test.ts create mode 100644 src/core/server/saved_objects/migrationsv2/actions/open_pit.ts create mode 100644 src/core/server/saved_objects/migrationsv2/actions/pickup_updated_mappings.test.ts create mode 100644 src/core/server/saved_objects/migrationsv2/actions/pickup_updated_mappings.ts create mode 100644 src/core/server/saved_objects/migrationsv2/actions/read_with_pit.test.ts create mode 100644 src/core/server/saved_objects/migrationsv2/actions/read_with_pit.ts create mode 100644 src/core/server/saved_objects/migrationsv2/actions/refresh_index.test.ts create mode 100644 src/core/server/saved_objects/migrationsv2/actions/refresh_index.ts create mode 100644 src/core/server/saved_objects/migrationsv2/actions/reindex.test.ts create mode 100644 src/core/server/saved_objects/migrationsv2/actions/reindex.ts create mode 100644 src/core/server/saved_objects/migrationsv2/actions/remove_write_block.test.ts create mode 100644 src/core/server/saved_objects/migrationsv2/actions/remove_write_block.ts create mode 100644 src/core/server/saved_objects/migrationsv2/actions/search_for_outdated_documents.test.ts create mode 100644 src/core/server/saved_objects/migrationsv2/actions/search_for_outdated_documents.ts create mode 100644 src/core/server/saved_objects/migrationsv2/actions/set_write_block.test.ts create mode 100644 src/core/server/saved_objects/migrationsv2/actions/set_write_block.ts create mode 100644 src/core/server/saved_objects/migrationsv2/actions/transform_docs.ts create mode 100644 src/core/server/saved_objects/migrationsv2/actions/update_aliases.test.ts create mode 100644 src/core/server/saved_objects/migrationsv2/actions/update_aliases.ts create mode 100644 src/core/server/saved_objects/migrationsv2/actions/update_and_pickup_mappings.test.ts create mode 100644 src/core/server/saved_objects/migrationsv2/actions/update_and_pickup_mappings.ts create mode 100644 src/core/server/saved_objects/migrationsv2/actions/verify_reindex.ts create mode 100644 src/core/server/saved_objects/migrationsv2/actions/wait_for_index_status_yellow.test.ts create mode 100644 src/core/server/saved_objects/migrationsv2/actions/wait_for_index_status_yellow.ts create mode 100644 src/core/server/saved_objects/migrationsv2/actions/wait_for_pickup_updated_mappings_task.test.ts create mode 100644 src/core/server/saved_objects/migrationsv2/actions/wait_for_pickup_updated_mappings_task.ts create mode 100644 src/core/server/saved_objects/migrationsv2/actions/wait_for_reindex_task.test.ts create mode 100644 src/core/server/saved_objects/migrationsv2/actions/wait_for_reindex_task.ts create mode 100644 src/core/server/saved_objects/migrationsv2/actions/wait_for_task.test.ts create mode 100644 src/core/server/saved_objects/migrationsv2/actions/wait_for_task.ts diff --git a/src/core/server/saved_objects/migrationsv2/actions/bulk_overwrite_transformed_documents.test.ts b/src/core/server/saved_objects/migrationsv2/actions/bulk_overwrite_transformed_documents.test.ts new file mode 100644 index 0000000000000..8ff9591798fd4 --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/bulk_overwrite_transformed_documents.test.ts @@ -0,0 +1,46 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors'; +import { errors as EsErrors } from '@elastic/elasticsearch'; +jest.mock('./catch_retryable_es_client_errors'); +import { elasticsearchClientMock } from '../../../elasticsearch/client/mocks'; +import { bulkOverwriteTransformedDocuments } from './bulk_overwrite_transformed_documents'; + +describe('bulkOverwriteTransformedDocuments', () => { + beforeEach(() => { + jest.clearAllMocks(); + }); + + // Create a mock client that rejects all methods with a 503 status code + // response. + const retryableError = new EsErrors.ResponseError( + elasticsearchClientMock.createApiResponse({ + statusCode: 503, + body: { error: { type: 'es_type', reason: 'es_reason' } }, + }) + ); + const client = elasticsearchClientMock.createInternalClient( + elasticsearchClientMock.createErrorTransportRequestPromise(retryableError) + ); + it('calls catchRetryableEsClientErrors when the promise rejects', async () => { + const task = bulkOverwriteTransformedDocuments({ + client, + index: 'new_index', + transformedDocs: [], + refresh: 'wait_for', + }); + try { + await task(); + } catch (e) { + /** ignore */ + } + + expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); + }); +}); diff --git a/src/core/server/saved_objects/migrationsv2/actions/bulk_overwrite_transformed_documents.ts b/src/core/server/saved_objects/migrationsv2/actions/bulk_overwrite_transformed_documents.ts new file mode 100644 index 0000000000000..830a8efccc7eb --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/bulk_overwrite_transformed_documents.ts @@ -0,0 +1,84 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import * as Either from 'fp-ts/lib/Either'; +import * as TaskEither from 'fp-ts/lib/TaskEither'; +import type { estypes } from '@elastic/elasticsearch'; +import { ElasticsearchClient } from '../../../elasticsearch'; +import type { SavedObjectsRawDoc } from '../../serialization'; +import { + catchRetryableEsClientErrors, + RetryableEsClientError, +} from './catch_retryable_es_client_errors'; +import { WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE } from './constants'; + +/** @internal */ +export interface BulkOverwriteTransformedDocumentsParams { + client: ElasticsearchClient; + index: string; + transformedDocs: SavedObjectsRawDoc[]; + refresh?: estypes.Refresh; +} +/** + * Write the up-to-date transformed documents to the index, overwriting any + * documents that are still on their outdated version. + */ +export const bulkOverwriteTransformedDocuments = ({ + client, + index, + transformedDocs, + refresh = false, +}: BulkOverwriteTransformedDocumentsParams): TaskEither.TaskEither< + RetryableEsClientError, + 'bulk_index_succeeded' +> => () => { + return client + .bulk({ + // Because we only add aliases in the MARK_VERSION_INDEX_READY step we + // can't bulkIndex to an alias with require_alias=true. This means if + // users tamper during this operation (delete indices or restore a + // snapshot), we could end up auto-creating an index without the correct + // mappings. Such tampering could lead to many other problems and is + // probably unlikely so for now we'll accept this risk and wait till + // system indices puts in place a hard control. + require_alias: false, + wait_for_active_shards: WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE, + refresh, + filter_path: ['items.*.error'], + body: transformedDocs.flatMap((doc) => { + return [ + { + index: { + _index: index, + _id: doc._id, + // overwrite existing documents + op_type: 'index', + // use optimistic concurrency control to ensure that outdated + // documents are only overwritten once with the latest version + if_seq_no: doc._seq_no, + if_primary_term: doc._primary_term, + }, + }, + doc._source, + ]; + }), + }) + .then((res) => { + // Filter out version_conflict_engine_exception since these just mean + // that another instance already updated these documents + const errors = (res.body.items ?? []).filter( + (item) => item.index?.error?.type !== 'version_conflict_engine_exception' + ); + if (errors.length === 0) { + return Either.right('bulk_index_succeeded' as const); + } else { + throw new Error(JSON.stringify(errors)); + } + }) + .catch(catchRetryableEsClientErrors); +}; diff --git a/src/core/server/saved_objects/migrationsv2/actions/clone_index.test.ts b/src/core/server/saved_objects/migrationsv2/actions/clone_index.test.ts new file mode 100644 index 0000000000000..84b4b00bc7e7f --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/clone_index.test.ts @@ -0,0 +1,60 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import { errors as EsErrors } from '@elastic/elasticsearch'; +import { cloneIndex } from './clone_index'; +import { setWriteBlock } from './set_write_block'; +import { elasticsearchClientMock } from '../../../elasticsearch/client/mocks'; +import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors'; +jest.mock('./catch_retryable_es_client_errors'); + +describe('cloneIndex', () => { + beforeEach(() => { + jest.clearAllMocks(); + }); + + // Create a mock client that rejects all methods with a 503 status code + // response. + const retryableError = new EsErrors.ResponseError( + elasticsearchClientMock.createApiResponse({ + statusCode: 503, + body: { error: { type: 'es_type', reason: 'es_reason' } }, + }) + ); + const client = elasticsearchClientMock.createInternalClient( + elasticsearchClientMock.createErrorTransportRequestPromise(retryableError) + ); + + const nonRetryableError = new Error('crash'); + const clientWithNonRetryableError = elasticsearchClientMock.createInternalClient( + elasticsearchClientMock.createErrorTransportRequestPromise(nonRetryableError) + ); + + it('calls catchRetryableEsClientErrors when the promise rejects', async () => { + const task = cloneIndex({ + client, + source: 'my_source_index', + target: 'my_target_index', + }); + try { + await task(); + } catch (e) { + /** ignore */ + } + expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); + }); + + it('re-throws non retry-able errors', async () => { + const task = setWriteBlock({ + client: clientWithNonRetryableError, + index: 'my_index', + }); + await task(); + expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(nonRetryableError); + }); +}); diff --git a/src/core/server/saved_objects/migrationsv2/actions/clone_index.ts b/src/core/server/saved_objects/migrationsv2/actions/clone_index.ts new file mode 100644 index 0000000000000..5674535c80328 --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/clone_index.ts @@ -0,0 +1,141 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import * as Either from 'fp-ts/lib/Either'; +import * as TaskEither from 'fp-ts/lib/TaskEither'; +import { errors as EsErrors } from '@elastic/elasticsearch'; +import { pipe } from 'fp-ts/lib/pipeable'; +import { ElasticsearchClient } from '../../../elasticsearch'; +import { + catchRetryableEsClientErrors, + RetryableEsClientError, +} from './catch_retryable_es_client_errors'; +import type { IndexNotFound, AcknowledgeResponse } from './'; +import { waitForIndexStatusYellow } from './wait_for_index_status_yellow'; +import { + DEFAULT_TIMEOUT, + INDEX_AUTO_EXPAND_REPLICAS, + INDEX_NUMBER_OF_SHARDS, + WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE, +} from './constants'; +export type CloneIndexResponse = AcknowledgeResponse; + +/** @internal */ +export interface CloneIndexParams { + client: ElasticsearchClient; + source: string; + target: string; + /** only used for testing */ + timeout?: string; +} +/** + * Makes a clone of the source index into the target. + * + * @remarks + * This method adds some additional logic to the ES clone index API: + * - it is idempotent, if it gets called multiple times subsequent calls will + * wait for the first clone operation to complete (up to 60s) + * - the first call will wait up to 120s for the cluster state and all shards + * to be updated. + */ +export const cloneIndex = ({ + client, + source, + target, + timeout = DEFAULT_TIMEOUT, +}: CloneIndexParams): TaskEither.TaskEither< + RetryableEsClientError | IndexNotFound, + CloneIndexResponse +> => { + const cloneTask: TaskEither.TaskEither< + RetryableEsClientError | IndexNotFound, + AcknowledgeResponse + > = () => { + return client.indices + .clone( + { + index: source, + target, + wait_for_active_shards: WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE, + body: { + settings: { + index: { + // The source we're cloning from will have a write block set, so + // we need to remove it to allow writes to our newly cloned index + 'blocks.write': false, + number_of_shards: INDEX_NUMBER_OF_SHARDS, + auto_expand_replicas: INDEX_AUTO_EXPAND_REPLICAS, + // Set an explicit refresh interval so that we don't inherit the + // value from incorrectly configured index templates (not required + // after we adopt system indices) + refresh_interval: '1s', + // Bump priority so that recovery happens before newer indices + priority: 10, + }, + }, + }, + timeout, + }, + { maxRetries: 0 /** handle retry ourselves for now */ } + ) + .then((res) => { + /** + * - acknowledged=false, we timed out before the cluster state was + * updated with the newly created index, but it probably will be + * created sometime soon. + * - shards_acknowledged=false, we timed out before all shards were + * started + * - acknowledged=true, shards_acknowledged=true, cloning complete + */ + return Either.right({ + acknowledged: res.body.acknowledged, + shardsAcknowledged: res.body.shards_acknowledged, + }); + }) + .catch((error: EsErrors.ResponseError) => { + if (error?.body?.error?.type === 'index_not_found_exception') { + return Either.left({ + type: 'index_not_found_exception' as const, + index: error.body.error.index, + }); + } else if (error?.body?.error?.type === 'resource_already_exists_exception') { + /** + * If the target index already exists it means a previous clone + * operation had already been started. However, we can't be sure + * that all shards were started so return shardsAcknowledged: false + */ + return Either.right({ + acknowledged: true, + shardsAcknowledged: false, + }); + } else { + throw error; + } + }) + .catch(catchRetryableEsClientErrors); + }; + + return pipe( + cloneTask, + TaskEither.chain((res) => { + if (res.acknowledged && res.shardsAcknowledged) { + // If the cluster state was updated and all shards ackd we're done + return TaskEither.right(res); + } else { + // Otherwise, wait until the target index has a 'green' status. + return pipe( + waitForIndexStatusYellow({ client, index: target, timeout }), + TaskEither.map((value) => { + /** When the index status is 'green' we know that all shards were started */ + return { acknowledged: true, shardsAcknowledged: true }; + }) + ); + } + }) + ); +}; diff --git a/src/core/server/saved_objects/migrationsv2/actions/close_pit.test.ts b/src/core/server/saved_objects/migrationsv2/actions/close_pit.test.ts new file mode 100644 index 0000000000000..5d9696239a61e --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/close_pit.test.ts @@ -0,0 +1,40 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors'; +jest.mock('./catch_retryable_es_client_errors'); +import { errors as EsErrors } from '@elastic/elasticsearch'; +import { elasticsearchClientMock } from '../../../elasticsearch/client/mocks'; +import { closePit } from './close_pit'; + +describe('closePit', () => { + beforeEach(() => { + jest.clearAllMocks(); + }); + + // Create a mock client that rejects all methods with a 503 status code + // response. + const retryableError = new EsErrors.ResponseError( + elasticsearchClientMock.createApiResponse({ + statusCode: 503, + body: { error: { type: 'es_type', reason: 'es_reason' } }, + }) + ); + const client = elasticsearchClientMock.createInternalClient( + elasticsearchClientMock.createErrorTransportRequestPromise(retryableError) + ); + + it('calls catchRetryableEsClientErrors when the promise rejects', async () => { + const task = closePit({ client, pitId: 'pitId' }); + try { + await task(); + } catch (e) { + /** ignore */ + } + expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); + }); +}); diff --git a/src/core/server/saved_objects/migrationsv2/actions/close_pit.ts b/src/core/server/saved_objects/migrationsv2/actions/close_pit.ts new file mode 100644 index 0000000000000..d421950c839e2 --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/close_pit.ts @@ -0,0 +1,41 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import * as Either from 'fp-ts/lib/Either'; +import * as TaskEither from 'fp-ts/lib/TaskEither'; +import { ElasticsearchClient } from '../../../elasticsearch'; +import { + catchRetryableEsClientErrors, + RetryableEsClientError, +} from './catch_retryable_es_client_errors'; + +/** @internal */ +export interface ClosePitParams { + client: ElasticsearchClient; + pitId: string; +} +/* + * Closes PIT. + * See https://www.elastic.co/guide/en/elasticsearch/reference/current/point-in-time-api.html + * */ +export const closePit = ({ + client, + pitId, +}: ClosePitParams): TaskEither.TaskEither => () => { + return client + .closePointInTime({ + body: { id: pitId }, + }) + .then((response) => { + if (!response.body.succeeded) { + throw new Error(`Failed to close PointInTime with id: ${pitId}`); + } + return Either.right({}); + }) + .catch(catchRetryableEsClientErrors); +}; diff --git a/src/core/server/saved_objects/migrationsv2/actions/constants.ts b/src/core/server/saved_objects/migrationsv2/actions/constants.ts new file mode 100644 index 0000000000000..5d0d2ffe5d695 --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/constants.ts @@ -0,0 +1,20 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +/** + * Batch size for updateByQuery and reindex operations. + * Uses the default value of 1000 for Elasticsearch reindex operation. + */ +export const BATCH_SIZE = 1_000; +export const DEFAULT_TIMEOUT = '60s'; +/** Allocate 1 replica if there are enough data nodes, otherwise continue with 0 */ +export const INDEX_AUTO_EXPAND_REPLICAS = '0-1'; +/** ES rule of thumb: shards should be several GB to 10's of GB, so Kibana is unlikely to cross that limit */ +export const INDEX_NUMBER_OF_SHARDS = 1; +/** Wait for all shards to be active before starting an operation */ +export const WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE = 'all'; diff --git a/src/core/server/saved_objects/migrationsv2/actions/create_index.test.ts b/src/core/server/saved_objects/migrationsv2/actions/create_index.test.ts new file mode 100644 index 0000000000000..d5d906898943c --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/create_index.test.ts @@ -0,0 +1,59 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors'; +import { errors as EsErrors } from '@elastic/elasticsearch'; +jest.mock('./catch_retryable_es_client_errors'); +import { elasticsearchClientMock } from '../../../elasticsearch/client/mocks'; +import { createIndex } from './create_index'; +import { setWriteBlock } from './set_write_block'; + +describe('createIndex', () => { + beforeEach(() => { + jest.clearAllMocks(); + }); + + // Create a mock client that rejects all methods with a 503 status code + // response. + const retryableError = new EsErrors.ResponseError( + elasticsearchClientMock.createApiResponse({ + statusCode: 503, + body: { error: { type: 'es_type', reason: 'es_reason' } }, + }) + ); + const client = elasticsearchClientMock.createInternalClient( + elasticsearchClientMock.createErrorTransportRequestPromise(retryableError) + ); + + const nonRetryableError = new Error('crash'); + const clientWithNonRetryableError = elasticsearchClientMock.createInternalClient( + elasticsearchClientMock.createErrorTransportRequestPromise(nonRetryableError) + ); + it('calls catchRetryableEsClientErrors when the promise rejects', async () => { + const task = createIndex({ + client, + indexName: 'new_index', + mappings: { properties: {} }, + }); + try { + await task(); + } catch (e) { + /** ignore */ + } + + expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); + }); + it('re-throws non retry-able errors', async () => { + const task = setWriteBlock({ + client: clientWithNonRetryableError, + index: 'my_index', + }); + await task(); + expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(nonRetryableError); + }); +}); diff --git a/src/core/server/saved_objects/migrationsv2/actions/create_index.ts b/src/core/server/saved_objects/migrationsv2/actions/create_index.ts new file mode 100644 index 0000000000000..47ee44e762db7 --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/create_index.ts @@ -0,0 +1,145 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import * as Either from 'fp-ts/lib/Either'; +import * as TaskEither from 'fp-ts/lib/TaskEither'; +import { pipe } from 'fp-ts/lib/pipeable'; +import type { estypes } from '@elastic/elasticsearch'; +import { AcknowledgeResponse } from './index'; +import { ElasticsearchClient } from '../../../elasticsearch'; +import { IndexMapping } from '../../mappings'; +import { + catchRetryableEsClientErrors, + RetryableEsClientError, +} from './catch_retryable_es_client_errors'; +import { + DEFAULT_TIMEOUT, + INDEX_AUTO_EXPAND_REPLICAS, + WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE, +} from './constants'; +import { waitForIndexStatusYellow } from './wait_for_index_status_yellow'; + +function aliasArrayToRecord(aliases: string[]): Record { + const result: Record = {}; + for (const alias of aliases) { + result[alias] = {}; + } + return result; +} + +/** @internal */ +export interface CreateIndexParams { + client: ElasticsearchClient; + indexName: string; + mappings: IndexMapping; + aliases?: string[]; +} +/** + * Creates an index with the given mappings + * + * @remarks + * This method adds some additional logic to the ES create index API: + * - it is idempotent, if it gets called multiple times subsequent calls will + * wait for the first create operation to complete (up to 60s) + * - the first call will wait up to 120s for the cluster state and all shards + * to be updated. + */ +export const createIndex = ({ + client, + indexName, + mappings, + aliases = [], +}: CreateIndexParams): TaskEither.TaskEither => { + const createIndexTask: TaskEither.TaskEither< + RetryableEsClientError, + AcknowledgeResponse + > = () => { + const aliasesObject = aliasArrayToRecord(aliases); + + return client.indices + .create( + { + index: indexName, + // wait until all shards are available before creating the index + // (since number_of_shards=1 this does not have any effect atm) + wait_for_active_shards: WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE, + // Wait up to 60s for the cluster state to update and all shards to be + // started + timeout: DEFAULT_TIMEOUT, + body: { + mappings, + aliases: aliasesObject, + settings: { + index: { + // ES rule of thumb: shards should be several GB to 10's of GB, so + // Kibana is unlikely to cross that limit. + number_of_shards: 1, + auto_expand_replicas: INDEX_AUTO_EXPAND_REPLICAS, + // Set an explicit refresh interval so that we don't inherit the + // value from incorrectly configured index templates (not required + // after we adopt system indices) + refresh_interval: '1s', + // Bump priority so that recovery happens before newer indices + priority: 10, + }, + }, + }, + }, + { maxRetries: 0 /** handle retry ourselves for now */ } + ) + .then((res) => { + /** + * - acknowledged=false, we timed out before the cluster state was + * updated on all nodes with the newly created index, but it + * probably will be created sometime soon. + * - shards_acknowledged=false, we timed out before all shards were + * started + * - acknowledged=true, shards_acknowledged=true, index creation complete + */ + return Either.right({ + acknowledged: res.body.acknowledged, + shardsAcknowledged: res.body.shards_acknowledged, + }); + }) + .catch((error) => { + if (error?.body?.error?.type === 'resource_already_exists_exception') { + /** + * If the target index already exists it means a previous create + * operation had already been started. However, we can't be sure + * that all shards were started so return shardsAcknowledged: false + */ + return Either.right({ + acknowledged: true, + shardsAcknowledged: false, + }); + } else { + throw error; + } + }) + .catch(catchRetryableEsClientErrors); + }; + + return pipe( + createIndexTask, + TaskEither.chain((res) => { + if (res.acknowledged && res.shardsAcknowledged) { + // If the cluster state was updated and all shards ackd we're done + return TaskEither.right('create_index_succeeded'); + } else { + // Otherwise, wait until the target index has a 'yellow' status. + return pipe( + waitForIndexStatusYellow({ client, index: indexName, timeout: DEFAULT_TIMEOUT }), + TaskEither.map(() => { + /** When the index status is 'yellow' we know that all shards were started */ + return 'create_index_succeeded'; + }) + ); + } + }) + ); +}; diff --git a/src/core/server/saved_objects/migrationsv2/actions/fetch_indices.test.ts b/src/core/server/saved_objects/migrationsv2/actions/fetch_indices.test.ts new file mode 100644 index 0000000000000..0dab1728b6ef2 --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/fetch_indices.test.ts @@ -0,0 +1,37 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors'; +import { errors as EsErrors } from '@elastic/elasticsearch'; +jest.mock('./catch_retryable_es_client_errors'); +import { elasticsearchClientMock } from '../../../elasticsearch/client/mocks'; +import { fetchIndices } from './fetch_indices'; + +describe('fetchIndices', () => { + beforeEach(() => { + jest.clearAllMocks(); + }); + const retryableError = new EsErrors.ResponseError( + elasticsearchClientMock.createApiResponse({ + statusCode: 503, + body: { error: { type: 'es_type', reason: 'es_reason' } }, + }) + ); + const client = elasticsearchClientMock.createInternalClient( + elasticsearchClientMock.createErrorTransportRequestPromise(retryableError) + ); + it('calls catchRetryableEsClientErrors when the promise rejects', async () => { + const task = fetchIndices({ client, indices: ['my_index'] }); + try { + await task(); + } catch (e) { + /** ignore */ + } + expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); + }); +}); diff --git a/src/core/server/saved_objects/migrationsv2/actions/fetch_indices.ts b/src/core/server/saved_objects/migrationsv2/actions/fetch_indices.ts new file mode 100644 index 0000000000000..3847252eb6db1 --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/fetch_indices.ts @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +import * as TaskEither from 'fp-ts/lib/TaskEither'; +import * as Either from 'fp-ts/lib/Either'; +import { IndexMapping } from '../../mappings'; +import { ElasticsearchClient } from '../../../elasticsearch'; +import { + catchRetryableEsClientErrors, + RetryableEsClientError, +} from './catch_retryable_es_client_errors'; +export type FetchIndexResponse = Record< + string, + { aliases: Record; mappings: IndexMapping; settings: unknown } +>; + +/** @internal */ +export interface FetchIndicesParams { + client: ElasticsearchClient; + indices: string[]; +} + +/** + * Fetches information about the given indices including aliases, mappings and + * settings. + */ +export const fetchIndices = ({ + client, + indices, +}: FetchIndicesParams): TaskEither.TaskEither => + // @ts-expect-error @elastic/elasticsearch IndexState.alias and IndexState.mappings should be required + () => { + return client.indices + .get( + { + index: indices, + ignore_unavailable: true, // Don't return an error for missing indices. Note this *will* include closed indices, the docs are misleading https://github.com/elastic/elasticsearch/issues/63607 + }, + { ignore: [404], maxRetries: 0 } + ) + .then(({ body }) => { + return Either.right(body); + }) + .catch(catchRetryableEsClientErrors); + }; diff --git a/src/core/server/saved_objects/migrationsv2/actions/index.test.ts b/src/core/server/saved_objects/migrationsv2/actions/index.test.ts deleted file mode 100644 index 05da335d70884..0000000000000 --- a/src/core/server/saved_objects/migrationsv2/actions/index.test.ts +++ /dev/null @@ -1,346 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0 and the Server Side Public License, v 1; you may not use this file except - * in compliance with, at your election, the Elastic License 2.0 or the Server - * Side Public License, v 1. - */ - -import * as Actions from './'; -import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors'; -import { errors as EsErrors } from '@elastic/elasticsearch'; -jest.mock('./catch_retryable_es_client_errors'); -import { elasticsearchClientMock } from '../../../elasticsearch/client/mocks'; -import * as Option from 'fp-ts/lib/Option'; - -describe('actions', () => { - beforeEach(() => { - jest.clearAllMocks(); - }); - - // Create a mock client that rejects all methods with a 503 status code - // response. - const retryableError = new EsErrors.ResponseError( - elasticsearchClientMock.createApiResponse({ - statusCode: 503, - body: { error: { type: 'es_type', reason: 'es_reason' } }, - }) - ); - const client = elasticsearchClientMock.createInternalClient( - elasticsearchClientMock.createErrorTransportRequestPromise(retryableError) - ); - - const nonRetryableError = new Error('crash'); - const clientWithNonRetryableError = elasticsearchClientMock.createInternalClient( - elasticsearchClientMock.createErrorTransportRequestPromise(nonRetryableError) - ); - - describe('fetchIndices', () => { - it('calls catchRetryableEsClientErrors when the promise rejects', async () => { - const task = Actions.fetchIndices({ client, indices: ['my_index'] }); - try { - await task(); - } catch (e) { - /** ignore */ - } - expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); - }); - }); - - describe('setWriteBlock', () => { - it('calls catchRetryableEsClientErrors when the promise rejects', async () => { - const task = Actions.setWriteBlock({ client, index: 'my_index' }); - try { - await task(); - } catch (e) { - /** ignore */ - } - expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); - }); - it('re-throws non retry-able errors', async () => { - const task = Actions.setWriteBlock({ - client: clientWithNonRetryableError, - index: 'my_index', - }); - await task(); - expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(nonRetryableError); - }); - }); - - describe('cloneIndex', () => { - it('calls catchRetryableEsClientErrors when the promise rejects', async () => { - const task = Actions.cloneIndex({ - client, - source: 'my_source_index', - target: 'my_target_index', - }); - try { - await task(); - } catch (e) { - /** ignore */ - } - expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); - }); - it('re-throws non retry-able errors', async () => { - const task = Actions.setWriteBlock({ - client: clientWithNonRetryableError, - index: 'my_index', - }); - await task(); - expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(nonRetryableError); - }); - }); - - describe('pickupUpdatedMappings', () => { - it('calls catchRetryableEsClientErrors when the promise rejects', async () => { - const task = Actions.pickupUpdatedMappings(client, 'my_index'); - try { - await task(); - } catch (e) { - /** ignore */ - } - expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); - }); - }); - - describe('openPit', () => { - it('calls catchRetryableEsClientErrors when the promise rejects', async () => { - const task = Actions.openPit({ client, index: 'my_index' }); - try { - await task(); - } catch (e) { - /** ignore */ - } - expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); - }); - }); - - describe('readWithPit', () => { - it('calls catchRetryableEsClientErrors when the promise rejects', async () => { - const task = Actions.readWithPit({ - client, - pitId: 'pitId', - query: { match_all: {} }, - batchSize: 10_000, - }); - try { - await task(); - } catch (e) { - /** ignore */ - } - expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); - }); - }); - - describe('closePit', () => { - it('calls catchRetryableEsClientErrors when the promise rejects', async () => { - const task = Actions.closePit({ client, pitId: 'pitId' }); - try { - await task(); - } catch (e) { - /** ignore */ - } - expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); - }); - }); - - describe('reindex', () => { - it('calls catchRetryableEsClientErrors when the promise rejects', async () => { - const task = Actions.reindex({ - client, - sourceIndex: 'my_source_index', - targetIndex: 'my_target_index', - reindexScript: Option.none, - requireAlias: false, - unusedTypesQuery: {}, - }); - try { - await task(); - } catch (e) { - /** ignore */ - } - expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); - }); - }); - - describe('waitForReindexTask', () => { - it('calls catchRetryableEsClientErrors when the promise rejects', async () => { - const task = Actions.waitForReindexTask({ client, taskId: 'my task id', timeout: '60s' }); - try { - await task(); - } catch (e) { - /** ignore */ - } - - expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); - }); - it('re-throws non retry-able errors', async () => { - const task = Actions.setWriteBlock({ - client: clientWithNonRetryableError, - index: 'my_index', - }); - await task(); - expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(nonRetryableError); - }); - }); - - describe('waitForPickupUpdatedMappingsTask', () => { - it('calls catchRetryableEsClientErrors when the promise rejects', async () => { - const task = Actions.waitForPickupUpdatedMappingsTask({ - client, - taskId: 'my task id', - timeout: '60s', - }); - try { - await task(); - } catch (e) { - /** ignore */ - } - - expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); - }); - it('re-throws non retry-able errors', async () => { - const task = Actions.setWriteBlock({ - client: clientWithNonRetryableError, - index: 'my_index', - }); - await task(); - expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(nonRetryableError); - }); - }); - - describe('updateAliases', () => { - it('calls catchRetryableEsClientErrors when the promise rejects', async () => { - const task = Actions.updateAliases({ client, aliasActions: [] }); - try { - await task(); - } catch (e) { - /** ignore */ - } - - expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); - }); - it('re-throws non retry-able errors', async () => { - const task = Actions.setWriteBlock({ - client: clientWithNonRetryableError, - index: 'my_index', - }); - await task(); - expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(nonRetryableError); - }); - }); - - describe('createIndex', () => { - it('calls catchRetryableEsClientErrors when the promise rejects', async () => { - const task = Actions.createIndex({ - client, - indexName: 'new_index', - mappings: { properties: {} }, - }); - try { - await task(); - } catch (e) { - /** ignore */ - } - - expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); - }); - it('re-throws non retry-able errors', async () => { - const task = Actions.setWriteBlock({ - client: clientWithNonRetryableError, - index: 'my_index', - }); - await task(); - expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(nonRetryableError); - }); - }); - - describe('updateAndPickupMappings', () => { - it('calls catchRetryableEsClientErrors when the promise rejects', async () => { - const task = Actions.updateAndPickupMappings({ - client, - index: 'new_index', - mappings: { properties: {} }, - }); - try { - await task(); - } catch (e) { - /** ignore */ - } - - expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); - }); - }); - - describe('searchForOutdatedDocuments', () => { - it('calls catchRetryableEsClientErrors when the promise rejects', async () => { - const task = Actions.searchForOutdatedDocuments(client, { - batchSize: 1000, - targetIndex: 'new_index', - outdatedDocumentsQuery: {}, - }); - - try { - await task(); - } catch (e) { - /** ignore */ - } - - expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); - }); - - it('configures request according to given parameters', async () => { - const esClient = elasticsearchClientMock.createInternalClient(); - const query = {}; - const targetIndex = 'new_index'; - const batchSize = 1000; - const task = Actions.searchForOutdatedDocuments(esClient, { - batchSize, - targetIndex, - outdatedDocumentsQuery: query, - }); - - await task(); - - expect(esClient.search).toHaveBeenCalledTimes(1); - expect(esClient.search).toHaveBeenCalledWith( - expect.objectContaining({ - index: targetIndex, - size: batchSize, - body: expect.objectContaining({ query }), - }) - ); - }); - }); - - describe('bulkOverwriteTransformedDocuments', () => { - it('calls catchRetryableEsClientErrors when the promise rejects', async () => { - const task = Actions.bulkOverwriteTransformedDocuments({ - client, - index: 'new_index', - transformedDocs: [], - refresh: 'wait_for', - }); - try { - await task(); - } catch (e) { - /** ignore */ - } - - expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); - }); - }); - - describe('refreshIndex', () => { - it('calls catchRetryableEsClientErrors when the promise rejects', async () => { - const task = Actions.refreshIndex({ client, targetIndex: 'target_index' }); - try { - await task(); - } catch (e) { - /** ignore */ - } - - expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); - }); - }); -}); diff --git a/src/core/server/saved_objects/migrationsv2/actions/index.ts b/src/core/server/saved_objects/migrationsv2/actions/index.ts index 905d64947298e..98d7167ffc31a 100644 --- a/src/core/server/saved_objects/migrationsv2/actions/index.ts +++ b/src/core/server/saved_objects/migrationsv2/actions/index.ts @@ -6,1231 +6,126 @@ * Side Public License, v 1. */ -import * as Either from 'fp-ts/lib/Either'; -import * as TaskEither from 'fp-ts/lib/TaskEither'; -import * as Option from 'fp-ts/lib/Option'; -import type { estypes } from '@elastic/elasticsearch'; -import { errors as EsErrors } from '@elastic/elasticsearch'; -import type { ElasticsearchClientError, ResponseError } from '@elastic/elasticsearch/lib/errors'; -import { pipe } from 'fp-ts/lib/pipeable'; -import { flow } from 'fp-ts/lib/function'; -import { ElasticsearchClient } from '../../../elasticsearch'; -import { IndexMapping } from '../../mappings'; -import type { SavedObjectsRawDoc, SavedObjectsRawDocSource } from '../../serialization'; -import type { TransformRawDocs } from '../types'; -import { - catchRetryableEsClientErrors, - RetryableEsClientError, -} from './catch_retryable_es_client_errors'; -import { - DocumentsTransformFailed, - DocumentsTransformSuccess, -} from '../../migrations/core/migrate_raw_docs'; -export type { RetryableEsClientError }; - -/** - * Batch size for updateByQuery and reindex operations. - * Uses the default value of 1000 for Elasticsearch reindex operation. - */ -const BATCH_SIZE = 1_000; -const DEFAULT_TIMEOUT = '60s'; -/** Allocate 1 replica if there are enough data nodes, otherwise continue with 0 */ -const INDEX_AUTO_EXPAND_REPLICAS = '0-1'; -/** ES rule of thumb: shards should be several GB to 10's of GB, so Kibana is unlikely to cross that limit */ -const INDEX_NUMBER_OF_SHARDS = 1; -/** Wait for all shards to be active before starting an operation */ -const WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE = 'all'; - -// Map of left response 'type' string -> response interface -export interface ActionErrorTypeMap { - wait_for_task_completion_timeout: WaitForTaskCompletionTimeout; - retryable_es_client_error: RetryableEsClientError; - index_not_found_exception: IndexNotFound; - target_index_had_write_block: TargetIndexHadWriteBlock; - incompatible_mapping_exception: IncompatibleMappingException; - alias_not_found_exception: AliasNotFound; - remove_index_not_a_concrete_index: RemoveIndexNotAConcreteIndex; - documents_transform_failed: DocumentsTransformFailed; -} - -/** - * Type guard for narrowing the type of a left - */ -export function isLeftTypeof( - res: any, - typeString: T -): res is ActionErrorTypeMap[T] { - return res.type === typeString; -} - -export type FetchIndexResponse = Record< - string, - { aliases: Record; mappings: IndexMapping; settings: unknown } ->; +import { RetryableEsClientError } from './catch_retryable_es_client_errors'; +import { DocumentsTransformFailed } from '../../migrations/core/migrate_raw_docs'; -/** @internal */ -export interface FetchIndicesParams { - client: ElasticsearchClient; - indices: string[]; -} - -/** - * Fetches information about the given indices including aliases, mappings and - * settings. - */ -export const fetchIndices = ({ - client, - indices, -}: FetchIndicesParams): TaskEither.TaskEither => - // @ts-expect-error @elastic/elasticsearch IndexState.alias and IndexState.mappings should be required - () => { - return client.indices - .get( - { - index: indices, - ignore_unavailable: true, // Don't return an error for missing indices. Note this *will* include closed indices, the docs are misleading https://github.com/elastic/elasticsearch/issues/63607 - }, - { ignore: [404], maxRetries: 0 } - ) - .then(({ body }) => { - return Either.right(body); - }) - .catch(catchRetryableEsClientErrors); - }; +export { + BATCH_SIZE, + DEFAULT_TIMEOUT, + INDEX_AUTO_EXPAND_REPLICAS, + INDEX_NUMBER_OF_SHARDS, + WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE, +} from './constants'; -export interface IndexNotFound { - type: 'index_not_found_exception'; - index: string; -} +export type { RetryableEsClientError }; -/** @internal */ -export interface SetWriteBlockParams { - client: ElasticsearchClient; - index: string; -} -/** - * Sets a write block in place for the given index. If the response includes - * `acknowledged: true` all in-progress writes have drained and no further - * writes to this index will be possible. - * - * The first time the write block is added to an index the response will - * include `shards_acknowledged: true` but once the block is in place, - * subsequent calls return `shards_acknowledged: false` - */ -export const setWriteBlock = ({ - client, - index, -}: SetWriteBlockParams): TaskEither.TaskEither< - IndexNotFound | RetryableEsClientError, - 'set_write_block_succeeded' -> => () => { - return ( - client.indices - .addBlock<{ - acknowledged: boolean; - shards_acknowledged: boolean; - }>( - { - index, - block: 'write', - }, - { maxRetries: 0 /** handle retry ourselves for now */ } - ) - // not typed yet - .then((res: any) => { - return res.body.acknowledged === true - ? Either.right('set_write_block_succeeded' as const) - : Either.left({ - type: 'retryable_es_client_error' as const, - message: 'set_write_block_failed', - }); - }) - .catch((e: ElasticsearchClientError) => { - if (e instanceof EsErrors.ResponseError) { - if (e.body?.error?.type === 'index_not_found_exception') { - return Either.left({ type: 'index_not_found_exception' as const, index }); - } - } - throw e; - }) - .catch(catchRetryableEsClientErrors) - ); -}; +// actions/* imports +export type { FetchIndexResponse, FetchIndicesParams } from './fetch_indices'; +export { fetchIndices } from './fetch_indices'; -/** @internal */ -export interface RemoveWriteBlockParams { - client: ElasticsearchClient; - index: string; -} -/** - * Removes a write block from an index - */ -export const removeWriteBlock = ({ - client, - index, -}: RemoveWriteBlockParams): TaskEither.TaskEither< - RetryableEsClientError, - 'remove_write_block_succeeded' -> => () => { - return client.indices - .putSettings<{ - acknowledged: boolean; - shards_acknowledged: boolean; - }>( - { - index, - // Don't change any existing settings - preserve_existing: true, - body: { - index: { - blocks: { - write: false, - }, - }, - }, - }, - { maxRetries: 0 /** handle retry ourselves for now */ } - ) - .then((res) => { - return res.body.acknowledged === true - ? Either.right('remove_write_block_succeeded' as const) - : Either.left({ - type: 'retryable_es_client_error' as const, - message: 'remove_write_block_failed', - }); - }) - .catch(catchRetryableEsClientErrors); -}; +export type { SetWriteBlockParams } from './set_write_block'; +export { setWriteBlock } from './set_write_block'; -/** @internal */ -export interface WaitForIndexStatusYellowParams { - client: ElasticsearchClient; - index: string; - timeout?: string; -} -/** - * A yellow index status means the index's primary shard is allocated and the - * index is ready for searching/indexing documents, but ES wasn't able to - * allocate the replicas. When migrations proceed with a yellow index it means - * we don't have as much data-redundancy as we could have, but waiting for - * replicas would mean that v2 migrations fail where v1 migrations would have - * succeeded. It doesn't feel like it's Kibana's job to force users to keep - * their clusters green and even if it's green when we migrate it can turn - * yellow at any point in the future. So ultimately data-redundancy is up to - * users to maintain. - */ -export const waitForIndexStatusYellow = ({ - client, - index, - timeout = DEFAULT_TIMEOUT, -}: WaitForIndexStatusYellowParams): TaskEither.TaskEither => () => { - return client.cluster - .health({ index, wait_for_status: 'yellow', timeout }) - .then(() => { - return Either.right({}); - }) - .catch(catchRetryableEsClientErrors); -}; +export type { RemoveWriteBlockParams } from './remove_write_block'; +export { removeWriteBlock } from './remove_write_block'; -export type CloneIndexResponse = AcknowledgeResponse; +export type { CloneIndexResponse, CloneIndexParams } from './clone_index'; +export { cloneIndex } from './clone_index'; -/** @internal */ -export interface CloneIndexParams { - client: ElasticsearchClient; - source: string; - target: string; - /** only used for testing */ - timeout?: string; -} -/** - * Makes a clone of the source index into the target. - * - * @remarks - * This method adds some additional logic to the ES clone index API: - * - it is idempotent, if it gets called multiple times subsequent calls will - * wait for the first clone operation to complete (up to 60s) - * - the first call will wait up to 120s for the cluster state and all shards - * to be updated. - */ -export const cloneIndex = ({ - client, - source, - target, - timeout = DEFAULT_TIMEOUT, -}: CloneIndexParams): TaskEither.TaskEither< - RetryableEsClientError | IndexNotFound, - CloneIndexResponse -> => { - const cloneTask: TaskEither.TaskEither< - RetryableEsClientError | IndexNotFound, - AcknowledgeResponse - > = () => { - return client.indices - .clone( - { - index: source, - target, - wait_for_active_shards: WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE, - body: { - settings: { - index: { - // The source we're cloning from will have a write block set, so - // we need to remove it to allow writes to our newly cloned index - 'blocks.write': false, - number_of_shards: INDEX_NUMBER_OF_SHARDS, - auto_expand_replicas: INDEX_AUTO_EXPAND_REPLICAS, - // Set an explicit refresh interval so that we don't inherit the - // value from incorrectly configured index templates (not required - // after we adopt system indices) - refresh_interval: '1s', - // Bump priority so that recovery happens before newer indices - priority: 10, - }, - }, - }, - timeout, - }, - { maxRetries: 0 /** handle retry ourselves for now */ } - ) - .then((res) => { - /** - * - acknowledged=false, we timed out before the cluster state was - * updated with the newly created index, but it probably will be - * created sometime soon. - * - shards_acknowledged=false, we timed out before all shards were - * started - * - acknowledged=true, shards_acknowledged=true, cloning complete - */ - return Either.right({ - acknowledged: res.body.acknowledged, - shardsAcknowledged: res.body.shards_acknowledged, - }); - }) - .catch((error: EsErrors.ResponseError) => { - if (error?.body?.error?.type === 'index_not_found_exception') { - return Either.left({ - type: 'index_not_found_exception' as const, - index: error.body.error.index, - }); - } else if (error?.body?.error?.type === 'resource_already_exists_exception') { - /** - * If the target index already exists it means a previous clone - * operation had already been started. However, we can't be sure - * that all shards were started so return shardsAcknowledged: false - */ - return Either.right({ - acknowledged: true, - shardsAcknowledged: false, - }); - } else { - throw error; - } - }) - .catch(catchRetryableEsClientErrors); - }; +export type { WaitForIndexStatusYellowParams } from './wait_for_index_status_yellow'; +import { waitForIndexStatusYellow } from './wait_for_index_status_yellow'; - return pipe( - cloneTask, - TaskEither.chain((res) => { - if (res.acknowledged && res.shardsAcknowledged) { - // If the cluster state was updated and all shards ackd we're done - return TaskEither.right(res); - } else { - // Otherwise, wait until the target index has a 'green' status. - return pipe( - waitForIndexStatusYellow({ client, index: target, timeout }), - TaskEither.map((value) => { - /** When the index status is 'green' we know that all shards were started */ - return { acknowledged: true, shardsAcknowledged: true }; - }) - ); - } - }) - ); -}; +export type { WaitForTaskResponse, WaitForTaskCompletionTimeout } from './wait_for_task'; +import { waitForTask, WaitForTaskCompletionTimeout } from './wait_for_task'; -interface WaitForTaskResponse { - error: Option.Option<{ type: string; reason: string; index: string }>; - completed: boolean; - failures: Option.Option; - description?: string; -} +export type { UpdateByQueryResponse } from './pickup_updated_mappings'; +import { pickupUpdatedMappings } from './pickup_updated_mappings'; -/** - * After waiting for the specificed timeout, the task has not yet completed. - * - * When querying the tasks API we use `wait_for_completion=true` to block the - * request until the task completes. If after the `timeout`, the task still has - * not completed we return this error. This does not mean that the task itelf - * has reached a timeout, Elasticsearch will continue to run the task. - */ -export interface WaitForTaskCompletionTimeout { - /** After waiting for the specificed timeout, the task has not yet completed. */ - readonly type: 'wait_for_task_completion_timeout'; - readonly message: string; - readonly error?: Error; -} +export type { OpenPitResponse, OpenPitParams } from './open_pit'; +export { openPit, pitKeepAlive } from './open_pit'; -const catchWaitForTaskCompletionTimeout = ( - e: ResponseError -): Either.Either => { - if ( - e.body?.error?.type === 'timeout_exception' || - e.body?.error?.type === 'receive_timeout_transport_exception' - ) { - return Either.left({ - type: 'wait_for_task_completion_timeout' as const, - message: `[${e.body.error.type}] ${e.body.error.reason}`, - error: e, - }); - } else { - throw e; - } -}; +export type { ReadWithPit, ReadWithPitParams } from './read_with_pit'; +export { readWithPit } from './read_with_pit'; -/** @internal */ -export interface WaitForTaskParams { - client: ElasticsearchClient; - taskId: string; - timeout: string; -} -/** - * Blocks for up to 60s or until a task completes. - * - * TODO: delete completed tasks - */ -const waitForTask = ({ - client, - taskId, - timeout, -}: WaitForTaskParams): TaskEither.TaskEither< - RetryableEsClientError | WaitForTaskCompletionTimeout, - WaitForTaskResponse -> => () => { - return client.tasks - .get({ - task_id: taskId, - wait_for_completion: true, - timeout, - }) - .then((res) => { - const body = res.body; - const failures = body.response?.failures ?? []; - return Either.right({ - completed: body.completed, - // @ts-expect-error @elastic/elasticsearch GetTaskResponse doesn't declare `error` property - error: Option.fromNullable(body.error), - failures: failures.length > 0 ? Option.some(failures) : Option.none, - description: body.task.description, - }); - }) - .catch(catchWaitForTaskCompletionTimeout) - .catch(catchRetryableEsClientErrors); -}; +export type { ClosePitParams } from './close_pit'; +export { closePit } from './close_pit'; -export interface UpdateByQueryResponse { - taskId: string; -} +export type { TransformDocsParams } from './transform_docs'; +export { transformDocs } from './transform_docs'; -/** - * Pickup updated mappings by performing an update by query operation on all - * documents in the index. Returns a task ID which can be - * tracked for progress. - * - * @remarks When mappings are updated to add a field which previously wasn't - * mapped Elasticsearch won't automatically add existing documents to it's - * internal search indices. So search results on this field won't return any - * existing documents. By running an update by query we essentially refresh - * these the internal search indices for all existing documents. - * This action uses `conflicts: 'proceed'` allowing several Kibana instances - * to run this in parallel. - */ -export const pickupUpdatedMappings = ( - client: ElasticsearchClient, - index: string -): TaskEither.TaskEither => () => { - return client - .updateByQuery({ - // Ignore version conflicts that can occur from parallel update by query operations - conflicts: 'proceed', - // Return an error when targeting missing or closed indices - allow_no_indices: false, - index, - // How many documents to update per batch - scroll_size: BATCH_SIZE, - // force a refresh so that we can query the updated index immediately - // after the operation completes - refresh: true, - // Create a task and return task id instead of blocking until complete - wait_for_completion: false, - }) - .then(({ body: { task: taskId } }) => { - return Either.right({ taskId: String(taskId!) }); - }) - .catch(catchRetryableEsClientErrors); -}; +export type { RefreshIndexParams } from './refresh_index'; +export { refreshIndex } from './refresh_index'; -/** @internal */ -export interface OpenPitResponse { - pitId: string; -} +export type { ReindexResponse, ReindexParams } from './reindex'; +export { reindex } from './reindex'; -/** @internal */ -export interface OpenPitParams { - client: ElasticsearchClient; - index: string; -} -// how long ES should keep PIT alive -const pitKeepAlive = '10m'; -/* - * Creates a lightweight view of data when the request has been initiated. - * See https://www.elastic.co/guide/en/elasticsearch/reference/current/point-in-time-api.html - * */ -export const openPit = ({ - client, - index, -}: OpenPitParams): TaskEither.TaskEither => () => { - return client - .openPointInTime({ - index, - keep_alive: pitKeepAlive, - }) - .then((response) => Either.right({ pitId: response.body.id })) - .catch(catchRetryableEsClientErrors); -}; +import type { IncompatibleMappingException } from './wait_for_reindex_task'; +export { waitForReindexTask } from './wait_for_reindex_task'; -/** @internal */ -export interface ReadWithPit { - outdatedDocuments: SavedObjectsRawDoc[]; - readonly lastHitSortValue: number[] | undefined; - readonly totalHits: number | undefined; -} +export type { VerifyReindexParams } from './verify_reindex'; +export { verifyReindex } from './verify_reindex'; -/** @internal */ +import type { AliasNotFound, RemoveIndexNotAConcreteIndex } from './update_aliases'; +export type { AliasAction, UpdateAliasesParams } from './update_aliases'; +export { updateAliases } from './update_aliases'; -export interface ReadWithPitParams { - client: ElasticsearchClient; - pitId: string; - query: estypes.QueryContainer; - batchSize: number; - searchAfter?: number[]; - seqNoPrimaryTerm?: boolean; -} +export type { CreateIndexParams } from './create_index'; +export { createIndex } from './create_index'; -/* - * Requests documents from the index using PIT mechanism. - * */ -export const readWithPit = ({ - client, - pitId, - query, - batchSize, - searchAfter, - seqNoPrimaryTerm, -}: ReadWithPitParams): TaskEither.TaskEither => () => { - return client - .search({ - seq_no_primary_term: seqNoPrimaryTerm, - body: { - // Sort fields are required to use searchAfter - sort: { - // the most efficient option as order is not important for the migration - _shard_doc: { order: 'asc' }, - }, - pit: { id: pitId, keep_alive: pitKeepAlive }, - size: batchSize, - search_after: searchAfter, - /** - * We want to know how many documents we need to process so we can log the progress. - * But we also want to increase the performance of these requests, - * so we ask ES to report the total count only on the first request (when searchAfter does not exist) - */ - track_total_hits: typeof searchAfter === 'undefined', - query, - }, - }) - .then((response) => { - const totalHits = - typeof response.body.hits.total === 'number' - ? response.body.hits.total // This format is to be removed in 8.0 - : response.body.hits.total?.value; - const hits = response.body.hits.hits; +export type { + UpdateAndPickupMappingsResponse, + UpdateAndPickupMappingsParams, +} from './update_and_pickup_mappings'; +export { updateAndPickupMappings } from './update_and_pickup_mappings'; - if (hits.length > 0) { - return Either.right({ - // @ts-expect-error @elastic/elasticsearch _source is optional - outdatedDocuments: hits as SavedObjectsRawDoc[], - lastHitSortValue: hits[hits.length - 1].sort as number[], - totalHits, - }); - } +export { waitForPickupUpdatedMappingsTask } from './wait_for_pickup_updated_mappings_task'; - return Either.right({ - outdatedDocuments: [], - lastHitSortValue: undefined, - totalHits, - }); - }) - .catch(catchRetryableEsClientErrors); -}; +export type { + SearchResponse, + SearchForOutdatedDocumentsOptions, +} from './search_for_outdated_documents'; +export { searchForOutdatedDocuments } from './search_for_outdated_documents'; -/** @internal */ -export interface ClosePitParams { - client: ElasticsearchClient; - pitId: string; -} -/* - * Closes PIT. - * See https://www.elastic.co/guide/en/elasticsearch/reference/current/point-in-time-api.html - * */ -export const closePit = ({ - client, - pitId, -}: ClosePitParams): TaskEither.TaskEither => () => { - return client - .closePointInTime({ - body: { id: pitId }, - }) - .then((response) => { - if (!response.body.succeeded) { - throw new Error(`Failed to close PointInTime with id: ${pitId}`); - } - return Either.right({}); - }) - .catch(catchRetryableEsClientErrors); -}; +export type { BulkOverwriteTransformedDocumentsParams } from './bulk_overwrite_transformed_documents'; +export { bulkOverwriteTransformedDocuments } from './bulk_overwrite_transformed_documents'; -/** @internal */ -export interface TransformDocsParams { - transformRawDocs: TransformRawDocs; - outdatedDocuments: SavedObjectsRawDoc[]; -} -/* - * Transform outdated docs - * */ -export const transformDocs = ({ - transformRawDocs, - outdatedDocuments, -}: TransformDocsParams): TaskEither.TaskEither< - DocumentsTransformFailed, - DocumentsTransformSuccess -> => transformRawDocs(outdatedDocuments); +export { pickupUpdatedMappings, waitForTask, waitForIndexStatusYellow }; +export type { AliasNotFound, RemoveIndexNotAConcreteIndex }; -/** @internal */ -export interface ReindexResponse { - taskId: string; -} - -/** @internal */ -export interface RefreshIndexParams { - client: ElasticsearchClient; - targetIndex: string; -} -/** - * Wait for Elasticsearch to reindex all the changes. - */ -export const refreshIndex = ({ - client, - targetIndex, -}: RefreshIndexParams): TaskEither.TaskEither< - RetryableEsClientError, - { refreshed: boolean } -> => () => { - return client.indices - .refresh({ - index: targetIndex, - }) - .then(() => { - return Either.right({ refreshed: true }); - }) - .catch(catchRetryableEsClientErrors); -}; -/** @internal */ -export interface ReindexParams { - client: ElasticsearchClient; - sourceIndex: string; - targetIndex: string; - reindexScript: Option.Option; - requireAlias: boolean; - /* When reindexing we use a source query to exclude saved objects types which - * are no longer used. These saved objects will still be kept in the outdated - * index for backup purposes, but won't be available in the upgraded index. - */ - unusedTypesQuery: estypes.QueryContainer; +export interface IndexNotFound { + type: 'index_not_found_exception'; + index: string; } -/** - * Reindex documents from the `sourceIndex` into the `targetIndex`. Returns a - * task ID which can be tracked for progress. - * - * @remarks This action is idempotent allowing several Kibana instances to run - * this in parallel. By using `op_type: 'create', conflicts: 'proceed'` there - * will be only one write per reindexed document. - */ -export const reindex = ({ - client, - sourceIndex, - targetIndex, - reindexScript, - requireAlias, - unusedTypesQuery, -}: ReindexParams): TaskEither.TaskEither => () => { - return client - .reindex({ - // Require targetIndex to be an alias. Prevents a new index from being - // created if targetIndex doesn't exist. - require_alias: requireAlias, - body: { - // Ignore version conflicts from existing documents - conflicts: 'proceed', - source: { - index: sourceIndex, - // Set reindex batch size - size: BATCH_SIZE, - // Exclude saved object types - query: unusedTypesQuery, - }, - dest: { - index: targetIndex, - // Don't override existing documents, only create if missing - op_type: 'create', - }, - script: Option.fold( - () => undefined, - (script) => ({ - source: script, - lang: 'painless', - }) - )(reindexScript), - }, - // force a refresh so that we can query the target index - refresh: true, - // Create a task and return task id instead of blocking until complete - wait_for_completion: false, - }) - .then(({ body: { task: taskId } }) => { - return Either.right({ taskId: String(taskId) }); - }) - .catch(catchRetryableEsClientErrors); -}; - -interface WaitForReindexTaskFailure { +export interface WaitForReindexTaskFailure { readonly cause: { type: string; reason: string }; } - -/** @internal */ export interface TargetIndexHadWriteBlock { type: 'target_index_had_write_block'; } -/** @internal */ -export interface IncompatibleMappingException { - type: 'incompatible_mapping_exception'; -} - -export const waitForReindexTask = flow( - waitForTask, - TaskEither.chain( - ( - res - ): TaskEither.TaskEither< - | IndexNotFound - | TargetIndexHadWriteBlock - | IncompatibleMappingException - | RetryableEsClientError - | WaitForTaskCompletionTimeout, - 'reindex_succeeded' - > => { - const failureIsAWriteBlock = ({ cause: { type, reason } }: WaitForReindexTaskFailure) => - type === 'cluster_block_exception' && - reason.match(/index \[.+] blocked by: \[FORBIDDEN\/8\/index write \(api\)\]/); - - const failureIsIncompatibleMappingException = ({ - cause: { type, reason }, - }: WaitForReindexTaskFailure) => - type === 'strict_dynamic_mapping_exception' || type === 'mapper_parsing_exception'; - - if (Option.isSome(res.error)) { - if (res.error.value.type === 'index_not_found_exception') { - return TaskEither.left({ - type: 'index_not_found_exception' as const, - index: res.error.value.index, - }); - } else { - throw new Error('Reindex failed with the following error:\n' + JSON.stringify(res.error)); - } - } else if (Option.isSome(res.failures)) { - if (res.failures.value.every(failureIsAWriteBlock)) { - return TaskEither.left({ type: 'target_index_had_write_block' as const }); - } else if (res.failures.value.every(failureIsIncompatibleMappingException)) { - return TaskEither.left({ type: 'incompatible_mapping_exception' as const }); - } else { - throw new Error( - 'Reindex failed with the following failures:\n' + JSON.stringify(res.failures.value) - ); - } - } else { - return TaskEither.right('reindex_succeeded' as const); - } - } - ) -); - -/** @internal */ -export interface VerifyReindexParams { - client: ElasticsearchClient; - sourceIndex: string; - targetIndex: string; -} - -export const verifyReindex = ({ - client, - sourceIndex, - targetIndex, -}: VerifyReindexParams): TaskEither.TaskEither< - RetryableEsClientError | { type: 'verify_reindex_failed' }, - 'verify_reindex_succeeded' -> => () => { - const count = (index: string) => - client - .count<{ count: number }>({ - index, - // Return an error when targeting missing or closed indices - allow_no_indices: false, - }) - .then((res) => { - return res.body.count; - }); - - return Promise.all([count(sourceIndex), count(targetIndex)]) - .then(([sourceCount, targetCount]) => { - if (targetCount >= sourceCount) { - return Either.right('verify_reindex_succeeded' as const); - } else { - return Either.left({ type: 'verify_reindex_failed' as const }); - } - }) - .catch(catchRetryableEsClientErrors); -}; - -export const waitForPickupUpdatedMappingsTask = flow( - waitForTask, - TaskEither.chain( - ( - res - ): TaskEither.TaskEither< - RetryableEsClientError | WaitForTaskCompletionTimeout, - 'pickup_updated_mappings_succeeded' - > => { - // We don't catch or type failures/errors because they should never - // occur in our migration algorithm and we don't have any business logic - // for dealing with it. If something happens we'll just crash and try - // again. - if (Option.isSome(res.failures)) { - throw new Error( - 'pickupUpdatedMappings task failed with the following failures:\n' + - JSON.stringify(res.failures.value) - ); - } else if (Option.isSome(res.error)) { - throw new Error( - 'pickupUpdatedMappings task failed with the following error:\n' + - JSON.stringify(res.error.value) - ); - } else { - return TaskEither.right('pickup_updated_mappings_succeeded' as const); - } - } - ) -); -export interface AliasNotFound { - type: 'alias_not_found_exception'; -} - -/** @internal */ -export interface RemoveIndexNotAConcreteIndex { - type: 'remove_index_not_a_concrete_index'; -} - -/** @internal */ -export type AliasAction = - | { remove_index: { index: string } } - | { remove: { index: string; alias: string; must_exist: boolean } } - | { add: { index: string; alias: string } }; - -/** @internal */ -export interface UpdateAliasesParams { - client: ElasticsearchClient; - aliasActions: AliasAction[]; -} -/** - * Calls the Update index alias API `_alias` with the provided alias actions. - */ -export const updateAliases = ({ - client, - aliasActions, -}: UpdateAliasesParams): TaskEither.TaskEither< - IndexNotFound | AliasNotFound | RemoveIndexNotAConcreteIndex | RetryableEsClientError, - 'update_aliases_succeeded' -> => () => { - return client.indices - .updateAliases( - { - body: { - actions: aliasActions, - }, - }, - { maxRetries: 0 } - ) - .then(() => { - // Ignore `acknowledged: false`. When the coordinating node accepts - // the new cluster state update but not all nodes have applied the - // update within the timeout `acknowledged` will be false. However, - // retrying this update will always immediately result in `acknowledged: - // true` even if there are still nodes which are falling behind with - // cluster state updates. - // The only impact for using `updateAliases` to mark the version index - // as ready is that it could take longer for other Kibana instances to - // see that the version index is ready so they are more likely to - // perform unecessary duplicate work. - return Either.right('update_aliases_succeeded' as const); - }) - .catch((err: EsErrors.ElasticsearchClientError) => { - if (err instanceof EsErrors.ResponseError) { - if (err?.body?.error?.type === 'index_not_found_exception') { - return Either.left({ - type: 'index_not_found_exception' as const, - index: err.body.error.index, - }); - } else if ( - err?.body?.error?.type === 'illegal_argument_exception' && - err?.body?.error?.reason?.match( - /The provided expression \[.+\] matches an alias, specify the corresponding concrete indices instead./ - ) - ) { - return Either.left({ type: 'remove_index_not_a_concrete_index' as const }); - } else if ( - err?.body?.error?.type === 'aliases_not_found_exception' || - (err?.body?.error?.type === 'resource_not_found_exception' && - err?.body?.error?.reason?.match(/required alias \[.+\] does not exist/)) - ) { - return Either.left({ - type: 'alias_not_found_exception' as const, - }); - } - } - throw err; - }) - .catch(catchRetryableEsClientErrors); -}; - /** @internal */ export interface AcknowledgeResponse { acknowledged: boolean; shardsAcknowledged: boolean; } - -function aliasArrayToRecord(aliases: string[]): Record { - const result: Record = {}; - for (const alias of aliases) { - result[alias] = {}; - } - return result; -} - -/** @internal */ -export interface CreateIndexParams { - client: ElasticsearchClient; - indexName: string; - mappings: IndexMapping; - aliases?: string[]; -} -/** - * Creates an index with the given mappings - * - * @remarks - * This method adds some additional logic to the ES create index API: - * - it is idempotent, if it gets called multiple times subsequent calls will - * wait for the first create operation to complete (up to 60s) - * - the first call will wait up to 120s for the cluster state and all shards - * to be updated. - */ -export const createIndex = ({ - client, - indexName, - mappings, - aliases = [], -}: CreateIndexParams): TaskEither.TaskEither => { - const createIndexTask: TaskEither.TaskEither< - RetryableEsClientError, - AcknowledgeResponse - > = () => { - const aliasesObject = aliasArrayToRecord(aliases); - - return client.indices - .create( - { - index: indexName, - // wait until all shards are available before creating the index - // (since number_of_shards=1 this does not have any effect atm) - wait_for_active_shards: WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE, - // Wait up to 60s for the cluster state to update and all shards to be - // started - timeout: DEFAULT_TIMEOUT, - body: { - mappings, - aliases: aliasesObject, - settings: { - index: { - // ES rule of thumb: shards should be several GB to 10's of GB, so - // Kibana is unlikely to cross that limit. - number_of_shards: 1, - auto_expand_replicas: INDEX_AUTO_EXPAND_REPLICAS, - // Set an explicit refresh interval so that we don't inherit the - // value from incorrectly configured index templates (not required - // after we adopt system indices) - refresh_interval: '1s', - // Bump priority so that recovery happens before newer indices - priority: 10, - }, - }, - }, - }, - { maxRetries: 0 /** handle retry ourselves for now */ } - ) - .then((res) => { - /** - * - acknowledged=false, we timed out before the cluster state was - * updated on all nodes with the newly created index, but it - * probably will be created sometime soon. - * - shards_acknowledged=false, we timed out before all shards were - * started - * - acknowledged=true, shards_acknowledged=true, index creation complete - */ - return Either.right({ - acknowledged: res.body.acknowledged, - shardsAcknowledged: res.body.shards_acknowledged, - }); - }) - .catch((error) => { - if (error?.body?.error?.type === 'resource_already_exists_exception') { - /** - * If the target index already exists it means a previous create - * operation had already been started. However, we can't be sure - * that all shards were started so return shardsAcknowledged: false - */ - return Either.right({ - acknowledged: true, - shardsAcknowledged: false, - }); - } else { - throw error; - } - }) - .catch(catchRetryableEsClientErrors); - }; - - return pipe( - createIndexTask, - TaskEither.chain((res) => { - if (res.acknowledged && res.shardsAcknowledged) { - // If the cluster state was updated and all shards ackd we're done - return TaskEither.right('create_index_succeeded'); - } else { - // Otherwise, wait until the target index has a 'yellow' status. - return pipe( - waitForIndexStatusYellow({ client, index: indexName, timeout: DEFAULT_TIMEOUT }), - TaskEither.map(() => { - /** When the index status is 'yellow' we know that all shards were started */ - return 'create_index_succeeded'; - }) - ); - } - }) - ); -}; - -/** @internal */ -export interface UpdateAndPickupMappingsResponse { - taskId: string; -} - -/** @internal */ -export interface UpdateAndPickupMappingsParams { - client: ElasticsearchClient; - index: string; - mappings: IndexMapping; -} -/** - * Updates an index's mappings and runs an pickupUpdatedMappings task so that the mapping - * changes are "picked up". Returns a taskId to track progress. - */ -export const updateAndPickupMappings = ({ - client, - index, - mappings, -}: UpdateAndPickupMappingsParams): TaskEither.TaskEither< - RetryableEsClientError, - UpdateAndPickupMappingsResponse -> => { - const putMappingTask: TaskEither.TaskEither< - RetryableEsClientError, - 'update_mappings_succeeded' - > = () => { - return client.indices - .putMapping({ - index, - timeout: DEFAULT_TIMEOUT, - body: mappings, - }) - .then((res) => { - // Ignore `acknowledged: false`. When the coordinating node accepts - // the new cluster state update but not all nodes have applied the - // update within the timeout `acknowledged` will be false. However, - // retrying this update will always immediately result in `acknowledged: - // true` even if there are still nodes which are falling behind with - // cluster state updates. - // For updateAndPickupMappings this means that there is the potential - // that some existing document's fields won't be picked up if the node - // on which the Kibana shard is running has fallen behind with cluster - // state updates and the mapping update wasn't applied before we run - // `pickupUpdatedMappings`. ES tries to limit this risk by blocking - // index operations (including update_by_query used by - // updateAndPickupMappings) if there are pending mappings changes. But - // not all mapping changes will prevent this. - return Either.right('update_mappings_succeeded' as const); - }) - .catch(catchRetryableEsClientErrors); - }; - - return pipe( - putMappingTask, - TaskEither.chain((res) => { - return pickupUpdatedMappings(client, index); - }) - ); -}; - -/** @internal */ -export interface SearchResponse { - outdatedDocuments: SavedObjectsRawDoc[]; -} - -interface SearchForOutdatedDocumentsOptions { - batchSize: number; - targetIndex: string; - outdatedDocumentsQuery?: estypes.QueryContainer; +// Map of left response 'type' string -> response interface +export interface ActionErrorTypeMap { + wait_for_task_completion_timeout: WaitForTaskCompletionTimeout; + retryable_es_client_error: RetryableEsClientError; + index_not_found_exception: IndexNotFound; + target_index_had_write_block: TargetIndexHadWriteBlock; + incompatible_mapping_exception: IncompatibleMappingException; + alias_not_found_exception: AliasNotFound; + remove_index_not_a_concrete_index: RemoveIndexNotAConcreteIndex; + documents_transform_failed: DocumentsTransformFailed; } /** - * Search for outdated saved object documents with the provided query. Will - * return one batch of documents. Searching should be repeated until no more - * outdated documents can be found. - * - * Used for testing only + * Type guard for narrowing the type of a left */ -export const searchForOutdatedDocuments = ( - client: ElasticsearchClient, - options: SearchForOutdatedDocumentsOptions -): TaskEither.TaskEither => () => { - return client - .search({ - index: options.targetIndex, - // Return the _seq_no and _primary_term so we can use optimistic - // concurrency control for updates - seq_no_primary_term: true, - size: options.batchSize, - body: { - query: options.outdatedDocumentsQuery, - // Optimize search performance by sorting by the "natural" index order - sort: ['_doc'], - }, - // Return an error when targeting missing or closed indices - allow_no_indices: false, - // Don't return partial results if timeouts or shard failures are - // encountered. This is important because 0 search hits is interpreted as - // there being no more outdated documents left that require - // transformation. Although the default is `false`, we set this - // explicitly to avoid users overriding the - // search.default_allow_partial_results cluster setting to true. - allow_partial_search_results: false, - // Improve performance by not calculating the total number of hits - // matching the query. - track_total_hits: false, - // Reduce the response payload size by only returning the data we care about - filter_path: [ - 'hits.hits._id', - 'hits.hits._source', - 'hits.hits._seq_no', - 'hits.hits._primary_term', - ], - }) - .then((res) => - Either.right({ outdatedDocuments: (res.body.hits?.hits as SavedObjectsRawDoc[]) ?? [] }) - ) - .catch(catchRetryableEsClientErrors); -}; - -/** @internal */ -export interface BulkOverwriteTransformedDocumentsParams { - client: ElasticsearchClient; - index: string; - transformedDocs: SavedObjectsRawDoc[]; - refresh?: estypes.Refresh; +export function isLeftTypeof( + res: any, + typeString: T +): res is ActionErrorTypeMap[T] { + return res.type === typeString; } -/** - * Write the up-to-date transformed documents to the index, overwriting any - * documents that are still on their outdated version. - */ -export const bulkOverwriteTransformedDocuments = ({ - client, - index, - transformedDocs, - refresh = false, -}: BulkOverwriteTransformedDocumentsParams): TaskEither.TaskEither< - RetryableEsClientError, - 'bulk_index_succeeded' -> => () => { - return client - .bulk({ - // Because we only add aliases in the MARK_VERSION_INDEX_READY step we - // can't bulkIndex to an alias with require_alias=true. This means if - // users tamper during this operation (delete indices or restore a - // snapshot), we could end up auto-creating an index without the correct - // mappings. Such tampering could lead to many other problems and is - // probably unlikely so for now we'll accept this risk and wait till - // system indices puts in place a hard control. - require_alias: false, - wait_for_active_shards: WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE, - refresh, - filter_path: ['items.*.error'], - body: transformedDocs.flatMap((doc) => { - return [ - { - index: { - _index: index, - _id: doc._id, - // overwrite existing documents - op_type: 'index', - // use optimistic concurrency control to ensure that outdated - // documents are only overwritten once with the latest version - if_seq_no: doc._seq_no, - if_primary_term: doc._primary_term, - }, - }, - doc._source, - ]; - }), - }) - .then((res) => { - // Filter out version_conflict_engine_exception since these just mean - // that another instance already updated these documents - const errors = (res.body.items ?? []).filter( - (item) => item.index?.error?.type !== 'version_conflict_engine_exception' - ); - if (errors.length === 0) { - return Either.right('bulk_index_succeeded' as const); - } else { - throw new Error(JSON.stringify(errors)); - } - }) - .catch(catchRetryableEsClientErrors); -}; diff --git a/src/core/server/saved_objects/migrationsv2/integration_tests/actions.test.ts b/src/core/server/saved_objects/migrationsv2/actions/integration_tests/actions.test.ts similarity index 99% rename from src/core/server/saved_objects/migrationsv2/integration_tests/actions.test.ts rename to src/core/server/saved_objects/migrationsv2/actions/integration_tests/actions.test.ts index 67a2685caf3d6..b508a6198bfb3 100644 --- a/src/core/server/saved_objects/migrationsv2/integration_tests/actions.test.ts +++ b/src/core/server/saved_objects/migrationsv2/actions/integration_tests/actions.test.ts @@ -6,11 +6,11 @@ * Side Public License, v 1. */ -import { ElasticsearchClient } from '../../../'; -import { InternalCoreStart } from '../../../internal_types'; -import * as kbnTestServer from '../../../../test_helpers/kbn_server'; -import { Root } from '../../../root'; -import { SavedObjectsRawDoc } from '../../serialization'; +import { ElasticsearchClient } from '../../../../'; +import { InternalCoreStart } from '../../../../internal_types'; +import * as kbnTestServer from '../../../../../test_helpers/kbn_server'; +import { Root } from '../../../../root'; +import { SavedObjectsRawDoc } from '../../../serialization'; import { bulkOverwriteTransformedDocuments, cloneIndex, @@ -37,11 +37,11 @@ import { removeWriteBlock, transformDocs, waitForIndexStatusYellow, -} from '../actions'; +} from '../../actions'; import * as Either from 'fp-ts/lib/Either'; import * as Option from 'fp-ts/lib/Option'; import { ResponseError } from '@elastic/elasticsearch/lib/errors'; -import { DocumentsTransformFailed, DocumentsTransformSuccess } from '../../migrations/core'; +import { DocumentsTransformFailed, DocumentsTransformSuccess } from '../../../migrations/core'; import { TaskEither } from 'fp-ts/lib/TaskEither'; const { startES } = kbnTestServer.createTestServers({ diff --git a/src/core/server/saved_objects/migrationsv2/actions/open_pit.test.ts b/src/core/server/saved_objects/migrationsv2/actions/open_pit.test.ts new file mode 100644 index 0000000000000..c8fc29d06f42f --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/open_pit.test.ts @@ -0,0 +1,40 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors'; +import { errors as EsErrors } from '@elastic/elasticsearch'; +jest.mock('./catch_retryable_es_client_errors'); +import { elasticsearchClientMock } from '../../../elasticsearch/client/mocks'; +import { openPit } from './open_pit'; + +describe('openPit', () => { + beforeEach(() => { + jest.clearAllMocks(); + }); + + // Create a mock client that rejects all methods with a 503 status code + // response. + const retryableError = new EsErrors.ResponseError( + elasticsearchClientMock.createApiResponse({ + statusCode: 503, + body: { error: { type: 'es_type', reason: 'es_reason' } }, + }) + ); + const client = elasticsearchClientMock.createInternalClient( + elasticsearchClientMock.createErrorTransportRequestPromise(retryableError) + ); + it('calls catchRetryableEsClientErrors when the promise rejects', async () => { + const task = openPit({ client, index: 'my_index' }); + try { + await task(); + } catch (e) { + /** ignore */ + } + expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); + }); +}); diff --git a/src/core/server/saved_objects/migrationsv2/actions/open_pit.ts b/src/core/server/saved_objects/migrationsv2/actions/open_pit.ts new file mode 100644 index 0000000000000..e740dc00ac27e --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/open_pit.ts @@ -0,0 +1,43 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import * as Either from 'fp-ts/lib/Either'; +import * as TaskEither from 'fp-ts/lib/TaskEither'; +import { ElasticsearchClient } from '../../../elasticsearch'; +import { + catchRetryableEsClientErrors, + RetryableEsClientError, +} from './catch_retryable_es_client_errors'; +/** @internal */ +export interface OpenPitResponse { + pitId: string; +} + +/** @internal */ +export interface OpenPitParams { + client: ElasticsearchClient; + index: string; +} +// how long ES should keep PIT alive +export const pitKeepAlive = '10m'; +/* + * Creates a lightweight view of data when the request has been initiated. + * See https://www.elastic.co/guide/en/elasticsearch/reference/current/point-in-time-api.html + * */ +export const openPit = ({ + client, + index, +}: OpenPitParams): TaskEither.TaskEither => () => { + return client + .openPointInTime({ + index, + keep_alive: pitKeepAlive, + }) + .then((response) => Either.right({ pitId: response.body.id })) + .catch(catchRetryableEsClientErrors); +}; diff --git a/src/core/server/saved_objects/migrationsv2/actions/pickup_updated_mappings.test.ts b/src/core/server/saved_objects/migrationsv2/actions/pickup_updated_mappings.test.ts new file mode 100644 index 0000000000000..e319d4149dd1a --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/pickup_updated_mappings.test.ts @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +import { errors as EsErrors } from '@elastic/elasticsearch'; +import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors'; +jest.mock('./catch_retryable_es_client_errors'); +import { elasticsearchClientMock } from '../../../elasticsearch/client/mocks'; +import { pickupUpdatedMappings } from './pickup_updated_mappings'; + +describe('pickupUpdatedMappings', () => { + beforeEach(() => { + jest.clearAllMocks(); + }); + + // Create a mock client that rejects all methods with a 503 status code + // response. + const retryableError = new EsErrors.ResponseError( + elasticsearchClientMock.createApiResponse({ + statusCode: 503, + body: { error: { type: 'es_type', reason: 'es_reason' } }, + }) + ); + const client = elasticsearchClientMock.createInternalClient( + elasticsearchClientMock.createErrorTransportRequestPromise(retryableError) + ); + it('calls catchRetryableEsClientErrors when the promise rejects', async () => { + const task = pickupUpdatedMappings(client, 'my_index'); + try { + await task(); + } catch (e) { + /** ignore */ + } + expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); + }); +}); diff --git a/src/core/server/saved_objects/migrationsv2/actions/pickup_updated_mappings.ts b/src/core/server/saved_objects/migrationsv2/actions/pickup_updated_mappings.ts new file mode 100644 index 0000000000000..8cc609e5277bc --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/pickup_updated_mappings.ts @@ -0,0 +1,57 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import * as Either from 'fp-ts/lib/Either'; +import * as TaskEither from 'fp-ts/lib/TaskEither'; +import { ElasticsearchClient } from '../../../elasticsearch'; +import { + catchRetryableEsClientErrors, + RetryableEsClientError, +} from './catch_retryable_es_client_errors'; +import { BATCH_SIZE } from './constants'; +export interface UpdateByQueryResponse { + taskId: string; +} + +/** + * Pickup updated mappings by performing an update by query operation on all + * documents in the index. Returns a task ID which can be + * tracked for progress. + * + * @remarks When mappings are updated to add a field which previously wasn't + * mapped Elasticsearch won't automatically add existing documents to it's + * internal search indices. So search results on this field won't return any + * existing documents. By running an update by query we essentially refresh + * these the internal search indices for all existing documents. + * This action uses `conflicts: 'proceed'` allowing several Kibana instances + * to run this in parallel. + */ +export const pickupUpdatedMappings = ( + client: ElasticsearchClient, + index: string +): TaskEither.TaskEither => () => { + return client + .updateByQuery({ + // Ignore version conflicts that can occur from parallel update by query operations + conflicts: 'proceed', + // Return an error when targeting missing or closed indices + allow_no_indices: false, + index, + // How many documents to update per batch + scroll_size: BATCH_SIZE, + // force a refresh so that we can query the updated index immediately + // after the operation completes + refresh: true, + // Create a task and return task id instead of blocking until complete + wait_for_completion: false, + }) + .then(({ body: { task: taskId } }) => { + return Either.right({ taskId: String(taskId!) }); + }) + .catch(catchRetryableEsClientErrors); +}; diff --git a/src/core/server/saved_objects/migrationsv2/actions/read_with_pit.test.ts b/src/core/server/saved_objects/migrationsv2/actions/read_with_pit.test.ts new file mode 100644 index 0000000000000..0d8d76b45a57b --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/read_with_pit.test.ts @@ -0,0 +1,45 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors'; +import { errors as EsErrors } from '@elastic/elasticsearch'; +jest.mock('./catch_retryable_es_client_errors'); +import { elasticsearchClientMock } from '../../../elasticsearch/client/mocks'; +import { readWithPit } from './read_with_pit'; + +describe('readWithPit', () => { + beforeEach(() => { + jest.clearAllMocks(); + }); + + // Create a mock client that rejects all methods with a 503 status code + // response. + const retryableError = new EsErrors.ResponseError( + elasticsearchClientMock.createApiResponse({ + statusCode: 503, + body: { error: { type: 'es_type', reason: 'es_reason' } }, + }) + ); + const client = elasticsearchClientMock.createInternalClient( + elasticsearchClientMock.createErrorTransportRequestPromise(retryableError) + ); + it('calls catchRetryableEsClientErrors when the promise rejects', async () => { + const task = readWithPit({ + client, + pitId: 'pitId', + query: { match_all: {} }, + batchSize: 10_000, + }); + try { + await task(); + } catch (e) { + /** ignore */ + } + expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); + }); +}); diff --git a/src/core/server/saved_objects/migrationsv2/actions/read_with_pit.ts b/src/core/server/saved_objects/migrationsv2/actions/read_with_pit.ts new file mode 100644 index 0000000000000..16f1df05f26b3 --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/read_with_pit.ts @@ -0,0 +1,92 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import * as Either from 'fp-ts/lib/Either'; +import * as TaskEither from 'fp-ts/lib/TaskEither'; +import type { estypes } from '@elastic/elasticsearch'; +import { ElasticsearchClient } from '../../../elasticsearch'; +import type { SavedObjectsRawDoc } from '../../serialization'; +import { + catchRetryableEsClientErrors, + RetryableEsClientError, +} from './catch_retryable_es_client_errors'; +import { pitKeepAlive } from './open_pit'; + +/** @internal */ +export interface ReadWithPit { + outdatedDocuments: SavedObjectsRawDoc[]; + readonly lastHitSortValue: number[] | undefined; + readonly totalHits: number | undefined; +} + +/** @internal */ +export interface ReadWithPitParams { + client: ElasticsearchClient; + pitId: string; + query: estypes.QueryContainer; + batchSize: number; + searchAfter?: number[]; + seqNoPrimaryTerm?: boolean; +} + +/* + * Requests documents from the index using PIT mechanism. + * */ +export const readWithPit = ({ + client, + pitId, + query, + batchSize, + searchAfter, + seqNoPrimaryTerm, +}: ReadWithPitParams): TaskEither.TaskEither => () => { + return client + .search({ + seq_no_primary_term: seqNoPrimaryTerm, + body: { + // Sort fields are required to use searchAfter + sort: { + // the most efficient option as order is not important for the migration + _shard_doc: { order: 'asc' }, + }, + pit: { id: pitId, keep_alive: pitKeepAlive }, + size: batchSize, + search_after: searchAfter, + /** + * We want to know how many documents we need to process so we can log the progress. + * But we also want to increase the performance of these requests, + * so we ask ES to report the total count only on the first request (when searchAfter does not exist) + */ + track_total_hits: typeof searchAfter === 'undefined', + query, + }, + }) + .then((response) => { + const totalHits = + typeof response.body.hits.total === 'number' + ? response.body.hits.total // This format is to be removed in 8.0 + : response.body.hits.total?.value; + const hits = response.body.hits.hits; + + if (hits.length > 0) { + return Either.right({ + // @ts-expect-error @elastic/elasticsearch _source is optional + outdatedDocuments: hits as SavedObjectsRawDoc[], + lastHitSortValue: hits[hits.length - 1].sort as number[], + totalHits, + }); + } + + return Either.right({ + outdatedDocuments: [], + lastHitSortValue: undefined, + totalHits, + }); + }) + .catch(catchRetryableEsClientErrors); +}; diff --git a/src/core/server/saved_objects/migrationsv2/actions/refresh_index.test.ts b/src/core/server/saved_objects/migrationsv2/actions/refresh_index.test.ts new file mode 100644 index 0000000000000..0ebdb2b2b1851 --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/refresh_index.test.ts @@ -0,0 +1,42 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors'; +import { errors as EsErrors } from '@elastic/elasticsearch'; +jest.mock('./catch_retryable_es_client_errors'); +import { elasticsearchClientMock } from '../../../elasticsearch/client/mocks'; +import { refreshIndex } from './refresh_index'; + +describe('refreshIndex', () => { + beforeEach(() => { + jest.clearAllMocks(); + }); + + // Create a mock client that rejects all methods with a 503 status code + // response. + const retryableError = new EsErrors.ResponseError( + elasticsearchClientMock.createApiResponse({ + statusCode: 503, + body: { error: { type: 'es_type', reason: 'es_reason' } }, + }) + ); + const client = elasticsearchClientMock.createInternalClient( + elasticsearchClientMock.createErrorTransportRequestPromise(retryableError) + ); + + it('calls catchRetryableEsClientErrors when the promise rejects', async () => { + const task = refreshIndex({ client, targetIndex: 'target_index' }); + try { + await task(); + } catch (e) { + /** ignore */ + } + + expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); + }); +}); diff --git a/src/core/server/saved_objects/migrationsv2/actions/refresh_index.ts b/src/core/server/saved_objects/migrationsv2/actions/refresh_index.ts new file mode 100644 index 0000000000000..e7bcbfb7d2d53 --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/refresh_index.ts @@ -0,0 +1,40 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +import * as Either from 'fp-ts/lib/Either'; +import * as TaskEither from 'fp-ts/lib/TaskEither'; +import { ElasticsearchClient } from '../../../elasticsearch'; + +import { + catchRetryableEsClientErrors, + RetryableEsClientError, +} from './catch_retryable_es_client_errors'; + +/** @internal */ +export interface RefreshIndexParams { + client: ElasticsearchClient; + targetIndex: string; +} +/** + * Wait for Elasticsearch to reindex all the changes. + */ +export const refreshIndex = ({ + client, + targetIndex, +}: RefreshIndexParams): TaskEither.TaskEither< + RetryableEsClientError, + { refreshed: boolean } +> => () => { + return client.indices + .refresh({ + index: targetIndex, + }) + .then(() => { + return Either.right({ refreshed: true }); + }) + .catch(catchRetryableEsClientErrors); +}; diff --git a/src/core/server/saved_objects/migrationsv2/actions/reindex.test.ts b/src/core/server/saved_objects/migrationsv2/actions/reindex.test.ts new file mode 100644 index 0000000000000..f53368bd9321b --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/reindex.test.ts @@ -0,0 +1,48 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +import * as Option from 'fp-ts/lib/Option'; +import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors'; +import { errors as EsErrors } from '@elastic/elasticsearch'; +jest.mock('./catch_retryable_es_client_errors'); +import { elasticsearchClientMock } from '../../../elasticsearch/client/mocks'; +import { reindex } from './reindex'; + +describe('reindex', () => { + beforeEach(() => { + jest.clearAllMocks(); + }); + + // Create a mock client that rejects all methods with a 503 status code + // response. + const retryableError = new EsErrors.ResponseError( + elasticsearchClientMock.createApiResponse({ + statusCode: 503, + body: { error: { type: 'es_type', reason: 'es_reason' } }, + }) + ); + const client = elasticsearchClientMock.createInternalClient( + elasticsearchClientMock.createErrorTransportRequestPromise(retryableError) + ); + + it('calls catchRetryableEsClientErrors when the promise rejects', async () => { + const task = reindex({ + client, + sourceIndex: 'my_source_index', + targetIndex: 'my_target_index', + reindexScript: Option.none, + requireAlias: false, + unusedTypesQuery: {}, + }); + try { + await task(); + } catch (e) { + /** ignore */ + } + expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); + }); +}); diff --git a/src/core/server/saved_objects/migrationsv2/actions/reindex.ts b/src/core/server/saved_objects/migrationsv2/actions/reindex.ts new file mode 100644 index 0000000000000..ca8d3b594703c --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/reindex.ts @@ -0,0 +1,90 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import * as Either from 'fp-ts/lib/Either'; +import * as TaskEither from 'fp-ts/lib/TaskEither'; +import * as Option from 'fp-ts/lib/Option'; +import type { estypes } from '@elastic/elasticsearch'; +import { ElasticsearchClient } from '../../../elasticsearch'; +import { + catchRetryableEsClientErrors, + RetryableEsClientError, +} from './catch_retryable_es_client_errors'; +import { BATCH_SIZE } from './constants'; + +/** @internal */ +export interface ReindexResponse { + taskId: string; +} +/** @internal */ +export interface ReindexParams { + client: ElasticsearchClient; + sourceIndex: string; + targetIndex: string; + reindexScript: Option.Option; + requireAlias: boolean; + /* When reindexing we use a source query to exclude saved objects types which + * are no longer used. These saved objects will still be kept in the outdated + * index for backup purposes, but won't be available in the upgraded index. + */ + unusedTypesQuery: estypes.QueryContainer; +} +/** + * Reindex documents from the `sourceIndex` into the `targetIndex`. Returns a + * task ID which can be tracked for progress. + * + * @remarks This action is idempotent allowing several Kibana instances to run + * this in parallel. By using `op_type: 'create', conflicts: 'proceed'` there + * will be only one write per reindexed document. + */ +export const reindex = ({ + client, + sourceIndex, + targetIndex, + reindexScript, + requireAlias, + unusedTypesQuery, +}: ReindexParams): TaskEither.TaskEither => () => { + return client + .reindex({ + // Require targetIndex to be an alias. Prevents a new index from being + // created if targetIndex doesn't exist. + require_alias: requireAlias, + body: { + // Ignore version conflicts from existing documents + conflicts: 'proceed', + source: { + index: sourceIndex, + // Set reindex batch size + size: BATCH_SIZE, + // Exclude saved object types + query: unusedTypesQuery, + }, + dest: { + index: targetIndex, + // Don't override existing documents, only create if missing + op_type: 'create', + }, + script: Option.fold( + () => undefined, + (script) => ({ + source: script, + lang: 'painless', + }) + )(reindexScript), + }, + // force a refresh so that we can query the target index + refresh: true, + // Create a task and return task id instead of blocking until complete + wait_for_completion: false, + }) + .then(({ body: { task: taskId } }) => { + return Either.right({ taskId: String(taskId) }); + }) + .catch(catchRetryableEsClientErrors); +}; diff --git a/src/core/server/saved_objects/migrationsv2/actions/remove_write_block.test.ts b/src/core/server/saved_objects/migrationsv2/actions/remove_write_block.test.ts new file mode 100644 index 0000000000000..497211cb693ab --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/remove_write_block.test.ts @@ -0,0 +1,53 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import { removeWriteBlock } from './remove_write_block'; +import { elasticsearchClientMock } from '../../../elasticsearch/client/mocks'; +import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors'; +import { errors as EsErrors } from '@elastic/elasticsearch'; +jest.mock('./catch_retryable_es_client_errors'); + +describe('removeWriteBlock', () => { + beforeEach(() => { + jest.clearAllMocks(); + }); + + // Create a mock client that rejects all methods with a 503 status code + // response. + const retryableError = new EsErrors.ResponseError( + elasticsearchClientMock.createApiResponse({ + statusCode: 503, + body: { error: { type: 'es_type', reason: 'es_reason' } }, + }) + ); + const client = elasticsearchClientMock.createInternalClient( + elasticsearchClientMock.createErrorTransportRequestPromise(retryableError) + ); + + const nonRetryableError = new Error('crash'); + const clientWithNonRetryableError = elasticsearchClientMock.createInternalClient( + elasticsearchClientMock.createErrorTransportRequestPromise(nonRetryableError) + ); + it('calls catchRetryableEsClientErrors when the promise rejects', async () => { + const task = removeWriteBlock({ client, index: 'my_index' }); + try { + await task(); + } catch (e) { + /** ignore */ + } + expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); + }); + it('re-throws non retry-able errors', async () => { + const task = removeWriteBlock({ + client: clientWithNonRetryableError, + index: 'my_index', + }); + await task(); + expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(nonRetryableError); + }); +}); diff --git a/src/core/server/saved_objects/migrationsv2/actions/remove_write_block.ts b/src/core/server/saved_objects/migrationsv2/actions/remove_write_block.ts new file mode 100644 index 0000000000000..c55e4a235fbf1 --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/remove_write_block.ts @@ -0,0 +1,60 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import * as Either from 'fp-ts/lib/Either'; +import * as TaskEither from 'fp-ts/lib/TaskEither'; +import { ElasticsearchClient } from '../../../elasticsearch'; +import { + catchRetryableEsClientErrors, + RetryableEsClientError, +} from './catch_retryable_es_client_errors'; + +/** @internal */ +export interface RemoveWriteBlockParams { + client: ElasticsearchClient; + index: string; +} +/** + * Removes a write block from an index + */ +export const removeWriteBlock = ({ + client, + index, +}: RemoveWriteBlockParams): TaskEither.TaskEither< + RetryableEsClientError, + 'remove_write_block_succeeded' +> => () => { + return client.indices + .putSettings<{ + acknowledged: boolean; + shards_acknowledged: boolean; + }>( + { + index, + // Don't change any existing settings + preserve_existing: true, + body: { + index: { + blocks: { + write: false, + }, + }, + }, + }, + { maxRetries: 0 /** handle retry ourselves for now */ } + ) + .then((res) => { + return res.body.acknowledged === true + ? Either.right('remove_write_block_succeeded' as const) + : Either.left({ + type: 'retryable_es_client_error' as const, + message: 'remove_write_block_failed', + }); + }) + .catch(catchRetryableEsClientErrors); +}; diff --git a/src/core/server/saved_objects/migrationsv2/actions/search_for_outdated_documents.test.ts b/src/core/server/saved_objects/migrationsv2/actions/search_for_outdated_documents.test.ts new file mode 100644 index 0000000000000..ab133e9a564be --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/search_for_outdated_documents.test.ts @@ -0,0 +1,69 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors'; +import { errors as EsErrors } from '@elastic/elasticsearch'; +jest.mock('./catch_retryable_es_client_errors'); +import { elasticsearchClientMock } from '../../../elasticsearch/client/mocks'; +import { searchForOutdatedDocuments } from './search_for_outdated_documents'; + +describe('searchForOutdatedDocuments', () => { + beforeEach(() => { + jest.clearAllMocks(); + }); + + // Create a mock client that rejects all methods with a 503 status code + // response. + const retryableError = new EsErrors.ResponseError( + elasticsearchClientMock.createApiResponse({ + statusCode: 503, + body: { error: { type: 'es_type', reason: 'es_reason' } }, + }) + ); + const client = elasticsearchClientMock.createInternalClient( + elasticsearchClientMock.createErrorTransportRequestPromise(retryableError) + ); + it('calls catchRetryableEsClientErrors when the promise rejects', async () => { + const task = searchForOutdatedDocuments(client, { + batchSize: 1000, + targetIndex: 'new_index', + outdatedDocumentsQuery: {}, + }); + + try { + await task(); + } catch (e) { + /** ignore */ + } + + expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); + }); + + it('configures request according to given parameters', async () => { + const esClient = elasticsearchClientMock.createInternalClient(); + const query = {}; + const targetIndex = 'new_index'; + const batchSize = 1000; + const task = searchForOutdatedDocuments(esClient, { + batchSize, + targetIndex, + outdatedDocumentsQuery: query, + }); + + await task(); + + expect(esClient.search).toHaveBeenCalledTimes(1); + expect(esClient.search).toHaveBeenCalledWith( + expect.objectContaining({ + index: targetIndex, + size: batchSize, + body: expect.objectContaining({ query }), + }) + ); + }); +}); diff --git a/src/core/server/saved_objects/migrationsv2/actions/search_for_outdated_documents.ts b/src/core/server/saved_objects/migrationsv2/actions/search_for_outdated_documents.ts new file mode 100644 index 0000000000000..7406cd35b1593 --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/search_for_outdated_documents.ts @@ -0,0 +1,77 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import * as Either from 'fp-ts/lib/Either'; +import * as TaskEither from 'fp-ts/lib/TaskEither'; +import type { estypes } from '@elastic/elasticsearch'; +import { ElasticsearchClient } from '../../../elasticsearch'; +import type { SavedObjectsRawDoc, SavedObjectsRawDocSource } from '../../serialization'; +import { + catchRetryableEsClientErrors, + RetryableEsClientError, +} from './catch_retryable_es_client_errors'; + +/** @internal */ +export interface SearchResponse { + outdatedDocuments: SavedObjectsRawDoc[]; +} + +export interface SearchForOutdatedDocumentsOptions { + batchSize: number; + targetIndex: string; + outdatedDocumentsQuery?: estypes.QueryContainer; +} + +/** + * Search for outdated saved object documents with the provided query. Will + * return one batch of documents. Searching should be repeated until no more + * outdated documents can be found. + * + * Used for testing only + */ +export const searchForOutdatedDocuments = ( + client: ElasticsearchClient, + options: SearchForOutdatedDocumentsOptions +): TaskEither.TaskEither => () => { + return client + .search({ + index: options.targetIndex, + // Return the _seq_no and _primary_term so we can use optimistic + // concurrency control for updates + seq_no_primary_term: true, + size: options.batchSize, + body: { + query: options.outdatedDocumentsQuery, + // Optimize search performance by sorting by the "natural" index order + sort: ['_doc'], + }, + // Return an error when targeting missing or closed indices + allow_no_indices: false, + // Don't return partial results if timeouts or shard failures are + // encountered. This is important because 0 search hits is interpreted as + // there being no more outdated documents left that require + // transformation. Although the default is `false`, we set this + // explicitly to avoid users overriding the + // search.default_allow_partial_results cluster setting to true. + allow_partial_search_results: false, + // Improve performance by not calculating the total number of hits + // matching the query. + track_total_hits: false, + // Reduce the response payload size by only returning the data we care about + filter_path: [ + 'hits.hits._id', + 'hits.hits._source', + 'hits.hits._seq_no', + 'hits.hits._primary_term', + ], + }) + .then((res) => + Either.right({ outdatedDocuments: (res.body.hits?.hits as SavedObjectsRawDoc[]) ?? [] }) + ) + .catch(catchRetryableEsClientErrors); +}; diff --git a/src/core/server/saved_objects/migrationsv2/actions/set_write_block.test.ts b/src/core/server/saved_objects/migrationsv2/actions/set_write_block.test.ts new file mode 100644 index 0000000000000..cf7b3091f38ff --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/set_write_block.test.ts @@ -0,0 +1,52 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +import { setWriteBlock } from './set_write_block'; +import { elasticsearchClientMock } from '../../../elasticsearch/client/mocks'; +import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors'; +import { errors as EsErrors } from '@elastic/elasticsearch'; +jest.mock('./catch_retryable_es_client_errors'); + +describe('setWriteBlock', () => { + beforeEach(() => { + jest.clearAllMocks(); + }); + + // Create a mock client that rejects all methods with a 503 status code + // response. + const retryableError = new EsErrors.ResponseError( + elasticsearchClientMock.createApiResponse({ + statusCode: 503, + body: { error: { type: 'es_type', reason: 'es_reason' } }, + }) + ); + const client = elasticsearchClientMock.createInternalClient( + elasticsearchClientMock.createErrorTransportRequestPromise(retryableError) + ); + + const nonRetryableError = new Error('crash'); + const clientWithNonRetryableError = elasticsearchClientMock.createInternalClient( + elasticsearchClientMock.createErrorTransportRequestPromise(nonRetryableError) + ); + it('calls catchRetryableEsClientErrors when the promise rejects', async () => { + const task = setWriteBlock({ client, index: 'my_index' }); + try { + await task(); + } catch (e) { + /** ignore */ + } + expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); + }); + it('re-throws non retry-able errors', async () => { + const task = setWriteBlock({ + client: clientWithNonRetryableError, + index: 'my_index', + }); + await task(); + expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(nonRetryableError); + }); +}); diff --git a/src/core/server/saved_objects/migrationsv2/actions/set_write_block.ts b/src/core/server/saved_objects/migrationsv2/actions/set_write_block.ts new file mode 100644 index 0000000000000..5aed316306cf9 --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/set_write_block.ts @@ -0,0 +1,73 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import type { ElasticsearchClientError } from '@elastic/elasticsearch/lib/errors'; +import * as Either from 'fp-ts/lib/Either'; +import * as TaskEither from 'fp-ts/lib/TaskEither'; +import { errors as EsErrors } from '@elastic/elasticsearch'; +import { ElasticsearchClient } from '../../../elasticsearch'; +import { + catchRetryableEsClientErrors, + RetryableEsClientError, +} from './catch_retryable_es_client_errors'; +import type { IndexNotFound } from './'; + +/** @internal */ +export interface SetWriteBlockParams { + client: ElasticsearchClient; + index: string; +} +/** + * Sets a write block in place for the given index. If the response includes + * `acknowledged: true` all in-progress writes have drained and no further + * writes to this index will be possible. + * + * The first time the write block is added to an index the response will + * include `shards_acknowledged: true` but once the block is in place, + * subsequent calls return `shards_acknowledged: false` + */ +export const setWriteBlock = ({ + client, + index, +}: SetWriteBlockParams): TaskEither.TaskEither< + IndexNotFound | RetryableEsClientError, + 'set_write_block_succeeded' +> => () => { + return ( + client.indices + .addBlock<{ + acknowledged: boolean; + shards_acknowledged: boolean; + }>( + { + index, + block: 'write', + }, + { maxRetries: 0 /** handle retry ourselves for now */ } + ) + // not typed yet + .then((res: any) => { + return res.body.acknowledged === true + ? Either.right('set_write_block_succeeded' as const) + : Either.left({ + type: 'retryable_es_client_error' as const, + message: 'set_write_block_failed', + }); + }) + .catch((e: ElasticsearchClientError) => { + if (e instanceof EsErrors.ResponseError) { + if (e.body?.error?.type === 'index_not_found_exception') { + return Either.left({ type: 'index_not_found_exception' as const, index }); + } + } + throw e; + }) + .catch(catchRetryableEsClientErrors) + ); +}; +// diff --git a/src/core/server/saved_objects/migrationsv2/actions/transform_docs.ts b/src/core/server/saved_objects/migrationsv2/actions/transform_docs.ts new file mode 100644 index 0000000000000..4c712afcff3a4 --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/transform_docs.ts @@ -0,0 +1,30 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +import * as TaskEither from 'fp-ts/lib/TaskEither'; +import type { TransformRawDocs } from '../types'; +import type { SavedObjectsRawDoc } from '../../serialization'; +import { + DocumentsTransformFailed, + DocumentsTransformSuccess, +} from '../../migrations/core/migrate_raw_docs'; + +/** @internal */ +export interface TransformDocsParams { + transformRawDocs: TransformRawDocs; + outdatedDocuments: SavedObjectsRawDoc[]; +} +/* + * Transform outdated docs + * */ +export const transformDocs = ({ + transformRawDocs, + outdatedDocuments, +}: TransformDocsParams): TaskEither.TaskEither< + DocumentsTransformFailed, + DocumentsTransformSuccess +> => transformRawDocs(outdatedDocuments); diff --git a/src/core/server/saved_objects/migrationsv2/actions/update_aliases.test.ts b/src/core/server/saved_objects/migrationsv2/actions/update_aliases.test.ts new file mode 100644 index 0000000000000..e2ea07d40281b --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/update_aliases.test.ts @@ -0,0 +1,55 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors'; +import { errors as EsErrors } from '@elastic/elasticsearch'; +jest.mock('./catch_retryable_es_client_errors'); +import { elasticsearchClientMock } from '../../../elasticsearch/client/mocks'; +import { updateAliases } from './update_aliases'; +import { setWriteBlock } from './set_write_block'; + +describe('updateAliases', () => { + beforeEach(() => { + jest.clearAllMocks(); + }); + + // Create a mock client that rejects all methods with a 503 status code + // response. + const retryableError = new EsErrors.ResponseError( + elasticsearchClientMock.createApiResponse({ + statusCode: 503, + body: { error: { type: 'es_type', reason: 'es_reason' } }, + }) + ); + const client = elasticsearchClientMock.createInternalClient( + elasticsearchClientMock.createErrorTransportRequestPromise(retryableError) + ); + + const nonRetryableError = new Error('crash'); + const clientWithNonRetryableError = elasticsearchClientMock.createInternalClient( + elasticsearchClientMock.createErrorTransportRequestPromise(nonRetryableError) + ); + it('calls catchRetryableEsClientErrors when the promise rejects', async () => { + const task = updateAliases({ client, aliasActions: [] }); + try { + await task(); + } catch (e) { + /** ignore */ + } + + expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); + }); + it('re-throws non retry-able errors', async () => { + const task = setWriteBlock({ + client: clientWithNonRetryableError, + index: 'my_index', + }); + await task(); + expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(nonRetryableError); + }); +}); diff --git a/src/core/server/saved_objects/migrationsv2/actions/update_aliases.ts b/src/core/server/saved_objects/migrationsv2/actions/update_aliases.ts new file mode 100644 index 0000000000000..ffb8002f09212 --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/update_aliases.ts @@ -0,0 +1,98 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import * as Either from 'fp-ts/lib/Either'; +import * as TaskEither from 'fp-ts/lib/TaskEither'; +import { errors as EsErrors } from '@elastic/elasticsearch'; +import { ElasticsearchClient } from '../../../elasticsearch'; +import { + catchRetryableEsClientErrors, + RetryableEsClientError, +} from './catch_retryable_es_client_errors'; +import { IndexNotFound } from './index'; + +export interface AliasNotFound { + type: 'alias_not_found_exception'; +} + +/** @internal */ +export interface RemoveIndexNotAConcreteIndex { + type: 'remove_index_not_a_concrete_index'; +} + +/** @internal */ +export type AliasAction = + | { remove_index: { index: string } } + | { remove: { index: string; alias: string; must_exist: boolean } } + | { add: { index: string; alias: string } }; + +/** @internal */ +export interface UpdateAliasesParams { + client: ElasticsearchClient; + aliasActions: AliasAction[]; +} +/** + * Calls the Update index alias API `_alias` with the provided alias actions. + */ +export const updateAliases = ({ + client, + aliasActions, +}: UpdateAliasesParams): TaskEither.TaskEither< + IndexNotFound | AliasNotFound | RemoveIndexNotAConcreteIndex | RetryableEsClientError, + 'update_aliases_succeeded' +> => () => { + return client.indices + .updateAliases( + { + body: { + actions: aliasActions, + }, + }, + { maxRetries: 0 } + ) + .then(() => { + // Ignore `acknowledged: false`. When the coordinating node accepts + // the new cluster state update but not all nodes have applied the + // update within the timeout `acknowledged` will be false. However, + // retrying this update will always immediately result in `acknowledged: + // true` even if there are still nodes which are falling behind with + // cluster state updates. + // The only impact for using `updateAliases` to mark the version index + // as ready is that it could take longer for other Kibana instances to + // see that the version index is ready so they are more likely to + // perform unecessary duplicate work. + return Either.right('update_aliases_succeeded' as const); + }) + .catch((err: EsErrors.ElasticsearchClientError) => { + if (err instanceof EsErrors.ResponseError) { + if (err?.body?.error?.type === 'index_not_found_exception') { + return Either.left({ + type: 'index_not_found_exception' as const, + index: err.body.error.index, + }); + } else if ( + err?.body?.error?.type === 'illegal_argument_exception' && + err?.body?.error?.reason?.match( + /The provided expression \[.+\] matches an alias, specify the corresponding concrete indices instead./ + ) + ) { + return Either.left({ type: 'remove_index_not_a_concrete_index' as const }); + } else if ( + err?.body?.error?.type === 'aliases_not_found_exception' || + (err?.body?.error?.type === 'resource_not_found_exception' && + err?.body?.error?.reason?.match(/required alias \[.+\] does not exist/)) + ) { + return Either.left({ + type: 'alias_not_found_exception' as const, + }); + } + } + throw err; + }) + .catch(catchRetryableEsClientErrors); +}; diff --git a/src/core/server/saved_objects/migrationsv2/actions/update_and_pickup_mappings.test.ts b/src/core/server/saved_objects/migrationsv2/actions/update_and_pickup_mappings.test.ts new file mode 100644 index 0000000000000..3ecb990cd9e82 --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/update_and_pickup_mappings.test.ts @@ -0,0 +1,45 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors'; +import { errors as EsErrors } from '@elastic/elasticsearch'; +jest.mock('./catch_retryable_es_client_errors'); +import { elasticsearchClientMock } from '../../../elasticsearch/client/mocks'; +import { updateAndPickupMappings } from './update_and_pickup_mappings'; + +describe('updateAndPickupMappings', () => { + beforeEach(() => { + jest.clearAllMocks(); + }); + + // Create a mock client that rejects all methods with a 503 status code + // response. + const retryableError = new EsErrors.ResponseError( + elasticsearchClientMock.createApiResponse({ + statusCode: 503, + body: { error: { type: 'es_type', reason: 'es_reason' } }, + }) + ); + const client = elasticsearchClientMock.createInternalClient( + elasticsearchClientMock.createErrorTransportRequestPromise(retryableError) + ); + it('calls catchRetryableEsClientErrors when the promise rejects', async () => { + const task = updateAndPickupMappings({ + client, + index: 'new_index', + mappings: { properties: {} }, + }); + try { + await task(); + } catch (e) { + /** ignore */ + } + + expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); + }); +}); diff --git a/src/core/server/saved_objects/migrationsv2/actions/update_and_pickup_mappings.ts b/src/core/server/saved_objects/migrationsv2/actions/update_and_pickup_mappings.ts new file mode 100644 index 0000000000000..8c742005a01ce --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/update_and_pickup_mappings.ts @@ -0,0 +1,80 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import * as Either from 'fp-ts/lib/Either'; +import * as TaskEither from 'fp-ts/lib/TaskEither'; +import { pipe } from 'fp-ts/lib/pipeable'; +import { ElasticsearchClient } from '../../../elasticsearch'; +import { IndexMapping } from '../../mappings'; +import { + catchRetryableEsClientErrors, + RetryableEsClientError, +} from './catch_retryable_es_client_errors'; +import { pickupUpdatedMappings } from './pickup_updated_mappings'; +import { DEFAULT_TIMEOUT } from './constants'; + +/** @internal */ +export interface UpdateAndPickupMappingsResponse { + taskId: string; +} + +/** @internal */ +export interface UpdateAndPickupMappingsParams { + client: ElasticsearchClient; + index: string; + mappings: IndexMapping; +} +/** + * Updates an index's mappings and runs an pickupUpdatedMappings task so that the mapping + * changes are "picked up". Returns a taskId to track progress. + */ +export const updateAndPickupMappings = ({ + client, + index, + mappings, +}: UpdateAndPickupMappingsParams): TaskEither.TaskEither< + RetryableEsClientError, + UpdateAndPickupMappingsResponse +> => { + const putMappingTask: TaskEither.TaskEither< + RetryableEsClientError, + 'update_mappings_succeeded' + > = () => { + return client.indices + .putMapping({ + index, + timeout: DEFAULT_TIMEOUT, + body: mappings, + }) + .then((res) => { + // Ignore `acknowledged: false`. When the coordinating node accepts + // the new cluster state update but not all nodes have applied the + // update within the timeout `acknowledged` will be false. However, + // retrying this update will always immediately result in `acknowledged: + // true` even if there are still nodes which are falling behind with + // cluster state updates. + // For updateAndPickupMappings this means that there is the potential + // that some existing document's fields won't be picked up if the node + // on which the Kibana shard is running has fallen behind with cluster + // state updates and the mapping update wasn't applied before we run + // `pickupUpdatedMappings`. ES tries to limit this risk by blocking + // index operations (including update_by_query used by + // updateAndPickupMappings) if there are pending mappings changes. But + // not all mapping changes will prevent this. + return Either.right('update_mappings_succeeded' as const); + }) + .catch(catchRetryableEsClientErrors); + }; + + return pipe( + putMappingTask, + TaskEither.chain((res) => { + return pickupUpdatedMappings(client, index); + }) + ); +}; diff --git a/src/core/server/saved_objects/migrationsv2/actions/verify_reindex.ts b/src/core/server/saved_objects/migrationsv2/actions/verify_reindex.ts new file mode 100644 index 0000000000000..4db599d8fbadf --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/verify_reindex.ts @@ -0,0 +1,52 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import * as Either from 'fp-ts/lib/Either'; +import * as TaskEither from 'fp-ts/lib/TaskEither'; +import { ElasticsearchClient } from '../../../elasticsearch'; +import { + catchRetryableEsClientErrors, + RetryableEsClientError, +} from './catch_retryable_es_client_errors'; + +/** @internal */ +export interface VerifyReindexParams { + client: ElasticsearchClient; + sourceIndex: string; + targetIndex: string; +} + +export const verifyReindex = ({ + client, + sourceIndex, + targetIndex, +}: VerifyReindexParams): TaskEither.TaskEither< + RetryableEsClientError | { type: 'verify_reindex_failed' }, + 'verify_reindex_succeeded' +> => () => { + const count = (index: string) => + client + .count<{ count: number }>({ + index, + // Return an error when targeting missing or closed indices + allow_no_indices: false, + }) + .then((res) => { + return res.body.count; + }); + + return Promise.all([count(sourceIndex), count(targetIndex)]) + .then(([sourceCount, targetCount]) => { + if (targetCount >= sourceCount) { + return Either.right('verify_reindex_succeeded' as const); + } else { + return Either.left({ type: 'verify_reindex_failed' as const }); + } + }) + .catch(catchRetryableEsClientErrors); +}; diff --git a/src/core/server/saved_objects/migrationsv2/actions/wait_for_index_status_yellow.test.ts b/src/core/server/saved_objects/migrationsv2/actions/wait_for_index_status_yellow.test.ts new file mode 100644 index 0000000000000..8cea34b80ffad --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/wait_for_index_status_yellow.test.ts @@ -0,0 +1,44 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import { errors as EsErrors } from '@elastic/elasticsearch'; +import { waitForIndexStatusYellow } from './wait_for_index_status_yellow'; +import { elasticsearchClientMock } from '../../../elasticsearch/client/mocks'; +import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors'; +jest.mock('./catch_retryable_es_client_errors'); + +describe('waitForIndexStatusYellow', () => { + beforeEach(() => { + jest.clearAllMocks(); + }); + + // Create a mock client that rejects all methods with a 503 status code + // response. + const retryableError = new EsErrors.ResponseError( + elasticsearchClientMock.createApiResponse({ + statusCode: 503, + body: { error: { type: 'es_type', reason: 'es_reason' } }, + }) + ); + const client = elasticsearchClientMock.createInternalClient( + elasticsearchClientMock.createErrorTransportRequestPromise(retryableError) + ); + + it('calls catchRetryableEsClientErrors when the promise rejects', async () => { + const task = waitForIndexStatusYellow({ + client, + index: 'my_index', + }); + try { + await task(); + } catch (e) { + /** ignore */ + } + expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); + }); +}); diff --git a/src/core/server/saved_objects/migrationsv2/actions/wait_for_index_status_yellow.ts b/src/core/server/saved_objects/migrationsv2/actions/wait_for_index_status_yellow.ts new file mode 100644 index 0000000000000..307c77ee5b89c --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/wait_for_index_status_yellow.ts @@ -0,0 +1,45 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +import * as Either from 'fp-ts/lib/Either'; +import * as TaskEither from 'fp-ts/lib/TaskEither'; +import { ElasticsearchClient } from '../../../elasticsearch'; +import { + catchRetryableEsClientErrors, + RetryableEsClientError, +} from './catch_retryable_es_client_errors'; +import { DEFAULT_TIMEOUT } from './constants'; + +/** @internal */ +export interface WaitForIndexStatusYellowParams { + client: ElasticsearchClient; + index: string; + timeout?: string; +} +/** + * A yellow index status means the index's primary shard is allocated and the + * index is ready for searching/indexing documents, but ES wasn't able to + * allocate the replicas. When migrations proceed with a yellow index it means + * we don't have as much data-redundancy as we could have, but waiting for + * replicas would mean that v2 migrations fail where v1 migrations would have + * succeeded. It doesn't feel like it's Kibana's job to force users to keep + * their clusters green and even if it's green when we migrate it can turn + * yellow at any point in the future. So ultimately data-redundancy is up to + * users to maintain. + */ +export const waitForIndexStatusYellow = ({ + client, + index, + timeout = DEFAULT_TIMEOUT, +}: WaitForIndexStatusYellowParams): TaskEither.TaskEither => () => { + return client.cluster + .health({ index, wait_for_status: 'yellow', timeout }) + .then(() => { + return Either.right({}); + }) + .catch(catchRetryableEsClientErrors); +}; diff --git a/src/core/server/saved_objects/migrationsv2/actions/wait_for_pickup_updated_mappings_task.test.ts b/src/core/server/saved_objects/migrationsv2/actions/wait_for_pickup_updated_mappings_task.test.ts new file mode 100644 index 0000000000000..f7c380be9427c --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/wait_for_pickup_updated_mappings_task.test.ts @@ -0,0 +1,59 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors'; +import { errors as EsErrors } from '@elastic/elasticsearch'; +jest.mock('./catch_retryable_es_client_errors'); +import { elasticsearchClientMock } from '../../../elasticsearch/client/mocks'; +import { waitForPickupUpdatedMappingsTask } from './wait_for_pickup_updated_mappings_task'; +import { setWriteBlock } from './set_write_block'; + +describe('waitForPickupUpdatedMappingsTask', () => { + beforeEach(() => { + jest.clearAllMocks(); + }); + + // Create a mock client that rejects all methods with a 503 status code + // response. + const retryableError = new EsErrors.ResponseError( + elasticsearchClientMock.createApiResponse({ + statusCode: 503, + body: { error: { type: 'es_type', reason: 'es_reason' } }, + }) + ); + const client = elasticsearchClientMock.createInternalClient( + elasticsearchClientMock.createErrorTransportRequestPromise(retryableError) + ); + + const nonRetryableError = new Error('crash'); + const clientWithNonRetryableError = elasticsearchClientMock.createInternalClient( + elasticsearchClientMock.createErrorTransportRequestPromise(nonRetryableError) + ); + it('calls catchRetryableEsClientErrors when the promise rejects', async () => { + const task = waitForPickupUpdatedMappingsTask({ + client, + taskId: 'my task id', + timeout: '60s', + }); + try { + await task(); + } catch (e) { + /** ignore */ + } + + expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); + }); + it('re-throws non retry-able errors', async () => { + const task = setWriteBlock({ + client: clientWithNonRetryableError, + index: 'my_index', + }); + await task(); + expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(nonRetryableError); + }); +}); diff --git a/src/core/server/saved_objects/migrationsv2/actions/wait_for_pickup_updated_mappings_task.ts b/src/core/server/saved_objects/migrationsv2/actions/wait_for_pickup_updated_mappings_task.ts new file mode 100644 index 0000000000000..02f7c3455cec9 --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/wait_for_pickup_updated_mappings_task.ts @@ -0,0 +1,43 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import * as TaskEither from 'fp-ts/lib/TaskEither'; +import * as Option from 'fp-ts/lib/Option'; +import { flow } from 'fp-ts/lib/function'; +import { waitForTask, WaitForTaskCompletionTimeout } from './wait_for_task'; +import { RetryableEsClientError } from './catch_retryable_es_client_errors'; + +export const waitForPickupUpdatedMappingsTask = flow( + waitForTask, + TaskEither.chain( + ( + res + ): TaskEither.TaskEither< + RetryableEsClientError | WaitForTaskCompletionTimeout, + 'pickup_updated_mappings_succeeded' + > => { + // We don't catch or type failures/errors because they should never + // occur in our migration algorithm and we don't have any business logic + // for dealing with it. If something happens we'll just crash and try + // again. + if (Option.isSome(res.failures)) { + throw new Error( + 'pickupUpdatedMappings task failed with the following failures:\n' + + JSON.stringify(res.failures.value) + ); + } else if (Option.isSome(res.error)) { + throw new Error( + 'pickupUpdatedMappings task failed with the following error:\n' + + JSON.stringify(res.error.value) + ); + } else { + return TaskEither.right('pickup_updated_mappings_succeeded' as const); + } + } + ) +); diff --git a/src/core/server/saved_objects/migrationsv2/actions/wait_for_reindex_task.test.ts b/src/core/server/saved_objects/migrationsv2/actions/wait_for_reindex_task.test.ts new file mode 100644 index 0000000000000..f6a236aab5c85 --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/wait_for_reindex_task.test.ts @@ -0,0 +1,56 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors'; +import { errors as EsErrors } from '@elastic/elasticsearch'; +jest.mock('./catch_retryable_es_client_errors'); +import { elasticsearchClientMock } from '../../../elasticsearch/client/mocks'; +import { waitForReindexTask } from './wait_for_reindex_task'; +import { setWriteBlock } from './set_write_block'; + +describe('waitForReindexTask', () => { + beforeEach(() => { + jest.clearAllMocks(); + }); + + // Create a mock client that rejects all methods with a 503 status code + // response. + const retryableError = new EsErrors.ResponseError( + elasticsearchClientMock.createApiResponse({ + statusCode: 503, + body: { error: { type: 'es_type', reason: 'es_reason' } }, + }) + ); + const client = elasticsearchClientMock.createInternalClient( + elasticsearchClientMock.createErrorTransportRequestPromise(retryableError) + ); + + const nonRetryableError = new Error('crash'); + const clientWithNonRetryableError = elasticsearchClientMock.createInternalClient( + elasticsearchClientMock.createErrorTransportRequestPromise(nonRetryableError) + ); + + it('calls catchRetryableEsClientErrors when the promise rejects', async () => { + const task = waitForReindexTask({ client, taskId: 'my task id', timeout: '60s' }); + try { + await task(); + } catch (e) { + /** ignore */ + } + + expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); + }); + it('re-throws non retry-able errors', async () => { + const task = setWriteBlock({ + client: clientWithNonRetryableError, + index: 'my_index', + }); + await task(); + expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(nonRetryableError); + }); +}); diff --git a/src/core/server/saved_objects/migrationsv2/actions/wait_for_reindex_task.ts b/src/core/server/saved_objects/migrationsv2/actions/wait_for_reindex_task.ts new file mode 100644 index 0000000000000..fcadb5e80298a --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/wait_for_reindex_task.ts @@ -0,0 +1,65 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import * as TaskEither from 'fp-ts/lib/TaskEither'; +import * as Option from 'fp-ts/lib/Option'; +import { flow } from 'fp-ts/lib/function'; +import { RetryableEsClientError } from './catch_retryable_es_client_errors'; +import type { IndexNotFound, WaitForReindexTaskFailure, TargetIndexHadWriteBlock } from './index'; +import { waitForTask, WaitForTaskCompletionTimeout } from './wait_for_task'; + +export interface IncompatibleMappingException { + type: 'incompatible_mapping_exception'; +} +export const waitForReindexTask = flow( + waitForTask, + TaskEither.chain( + ( + res + ): TaskEither.TaskEither< + | IndexNotFound + | TargetIndexHadWriteBlock + | IncompatibleMappingException + | RetryableEsClientError + | WaitForTaskCompletionTimeout, + 'reindex_succeeded' + > => { + const failureIsAWriteBlock = ({ cause: { type, reason } }: WaitForReindexTaskFailure) => + type === 'cluster_block_exception' && + reason.match(/index \[.+] blocked by: \[FORBIDDEN\/8\/index write \(api\)\]/); + + const failureIsIncompatibleMappingException = ({ + cause: { type, reason }, + }: WaitForReindexTaskFailure) => + type === 'strict_dynamic_mapping_exception' || type === 'mapper_parsing_exception'; + + if (Option.isSome(res.error)) { + if (res.error.value.type === 'index_not_found_exception') { + return TaskEither.left({ + type: 'index_not_found_exception' as const, + index: res.error.value.index, + }); + } else { + throw new Error('Reindex failed with the following error:\n' + JSON.stringify(res.error)); + } + } else if (Option.isSome(res.failures)) { + if (res.failures.value.every(failureIsAWriteBlock)) { + return TaskEither.left({ type: 'target_index_had_write_block' as const }); + } else if (res.failures.value.every(failureIsIncompatibleMappingException)) { + return TaskEither.left({ type: 'incompatible_mapping_exception' as const }); + } else { + throw new Error( + 'Reindex failed with the following failures:\n' + JSON.stringify(res.failures.value) + ); + } + } else { + return TaskEither.right('reindex_succeeded' as const); + } + } + ) +); diff --git a/src/core/server/saved_objects/migrationsv2/actions/wait_for_task.test.ts b/src/core/server/saved_objects/migrationsv2/actions/wait_for_task.test.ts new file mode 100644 index 0000000000000..c7ca9bf36a2c6 --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/wait_for_task.test.ts @@ -0,0 +1,47 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +import { errors as EsErrors } from '@elastic/elasticsearch'; +import { waitForTask } from './wait_for_task'; +import { elasticsearchClientMock } from '../../../elasticsearch/client/mocks'; +import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors'; +jest.mock('./catch_retryable_es_client_errors'); + +describe('waitForTask', () => { + beforeEach(() => { + jest.clearAllMocks(); + }); + + // Create a mock client that rejects all methods with a 503 status code + // response. + const retryableError = new EsErrors.ResponseError( + elasticsearchClientMock.createApiResponse({ + statusCode: 503, + body: { error: { type: 'es_type', reason: 'es_reason' } }, + }) + ); + const client = elasticsearchClientMock.createInternalClient( + elasticsearchClientMock.createErrorTransportRequestPromise(retryableError) + ); + + describe('waitForPickupUpdatedMappingsTask', () => { + it('calls catchRetryableEsClientErrors when the promise rejects', async () => { + const task = waitForTask({ + client, + taskId: 'my task id', + timeout: '60s', + }); + try { + await task(); + } catch (e) { + /** ignore */ + } + + expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); + }); + }); +}); diff --git a/src/core/server/saved_objects/migrationsv2/actions/wait_for_task.ts b/src/core/server/saved_objects/migrationsv2/actions/wait_for_task.ts new file mode 100644 index 0000000000000..4e3631797e34b --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/wait_for_task.ts @@ -0,0 +1,95 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +import * as Either from 'fp-ts/lib/Either'; +import * as TaskEither from 'fp-ts/lib/TaskEither'; +import * as Option from 'fp-ts/lib/Option'; +import { errors as EsErrors } from '@elastic/elasticsearch'; +import { ElasticsearchClient } from '../../../elasticsearch'; +import { + catchRetryableEsClientErrors, + RetryableEsClientError, +} from './catch_retryable_es_client_errors'; +/** @internal */ +export interface WaitForTaskResponse { + error: Option.Option<{ type: string; reason: string; index: string }>; + completed: boolean; + failures: Option.Option; + description?: string; +} + +/** + * After waiting for the specificed timeout, the task has not yet completed. + * + * When querying the tasks API we use `wait_for_completion=true` to block the + * request until the task completes. If after the `timeout`, the task still has + * not completed we return this error. This does not mean that the task itelf + * has reached a timeout, Elasticsearch will continue to run the task. + */ +export interface WaitForTaskCompletionTimeout { + /** After waiting for the specificed timeout, the task has not yet completed. */ + readonly type: 'wait_for_task_completion_timeout'; + readonly message: string; + readonly error?: Error; +} + +const catchWaitForTaskCompletionTimeout = ( + e: EsErrors.ResponseError +): Either.Either => { + if ( + e.body?.error?.type === 'timeout_exception' || + e.body?.error?.type === 'receive_timeout_transport_exception' + ) { + return Either.left({ + type: 'wait_for_task_completion_timeout' as const, + message: `[${e.body.error.type}] ${e.body.error.reason}`, + error: e, + }); + } else { + throw e; + } +}; + +/** @internal */ +export interface WaitForTaskParams { + client: ElasticsearchClient; + taskId: string; + timeout: string; +} +/** + * Blocks for up to 60s or until a task completes. + * + * TODO: delete completed tasks + */ +export const waitForTask = ({ + client, + taskId, + timeout, +}: WaitForTaskParams): TaskEither.TaskEither< + RetryableEsClientError | WaitForTaskCompletionTimeout, + WaitForTaskResponse +> => () => { + return client.tasks + .get({ + task_id: taskId, + wait_for_completion: true, + timeout, + }) + .then((res) => { + const body = res.body; + const failures = body.response?.failures ?? []; + return Either.right({ + completed: body.completed, + // @ts-expect-error @elastic/elasticsearch GetTaskResponse doesn't declare `error` property + error: Option.fromNullable(body.error), + failures: failures.length > 0 ? Option.some(failures) : Option.none, + description: body.task.description, + }); + }) + .catch(catchWaitForTaskCompletionTimeout) + .catch(catchRetryableEsClientErrors); +};