Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.x] Splits migrationsv2 actions and unit tests into separate files (#101200) #101304

Merged
merged 1 commit into from
Jun 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors';
import { errors as EsErrors } from '@elastic/elasticsearch';
jest.mock('./catch_retryable_es_client_errors');
import { elasticsearchClientMock } from '../../../elasticsearch/client/mocks';
import { bulkOverwriteTransformedDocuments } from './bulk_overwrite_transformed_documents';

describe('bulkOverwriteTransformedDocuments', () => {
beforeEach(() => {
jest.clearAllMocks();
});

// Create a mock client that rejects all methods with a 503 status code
// response.
const retryableError = new EsErrors.ResponseError(
elasticsearchClientMock.createApiResponse({
statusCode: 503,
body: { error: { type: 'es_type', reason: 'es_reason' } },
})
);
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
);
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = bulkOverwriteTransformedDocuments({
client,
index: 'new_index',
transformedDocs: [],
refresh: 'wait_for',
});
try {
await task();
} catch (e) {
/** ignore */
}

expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import * as Either from 'fp-ts/lib/Either';
import * as TaskEither from 'fp-ts/lib/TaskEither';
import type { estypes } from '@elastic/elasticsearch';
import { ElasticsearchClient } from '../../../elasticsearch';
import type { SavedObjectsRawDoc } from '../../serialization';
import {
catchRetryableEsClientErrors,
RetryableEsClientError,
} from './catch_retryable_es_client_errors';
import { WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE } from './constants';

/** @internal */
export interface BulkOverwriteTransformedDocumentsParams {
client: ElasticsearchClient;
index: string;
transformedDocs: SavedObjectsRawDoc[];
refresh?: estypes.Refresh;
}
/**
* Write the up-to-date transformed documents to the index, overwriting any
* documents that are still on their outdated version.
*/
export const bulkOverwriteTransformedDocuments = ({
client,
index,
transformedDocs,
refresh = false,
}: BulkOverwriteTransformedDocumentsParams): TaskEither.TaskEither<
RetryableEsClientError,
'bulk_index_succeeded'
> => () => {
return client
.bulk({
// Because we only add aliases in the MARK_VERSION_INDEX_READY step we
// can't bulkIndex to an alias with require_alias=true. This means if
// users tamper during this operation (delete indices or restore a
// snapshot), we could end up auto-creating an index without the correct
// mappings. Such tampering could lead to many other problems and is
// probably unlikely so for now we'll accept this risk and wait till
// system indices puts in place a hard control.
require_alias: false,
wait_for_active_shards: WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE,
refresh,
filter_path: ['items.*.error'],
body: transformedDocs.flatMap((doc) => {
return [
{
index: {
_index: index,
_id: doc._id,
// overwrite existing documents
op_type: 'index',
// use optimistic concurrency control to ensure that outdated
// documents are only overwritten once with the latest version
if_seq_no: doc._seq_no,
if_primary_term: doc._primary_term,
},
},
doc._source,
];
}),
})
.then((res) => {
// Filter out version_conflict_engine_exception since these just mean
// that another instance already updated these documents
const errors = (res.body.items ?? []).filter(
(item) => item.index?.error?.type !== 'version_conflict_engine_exception'
);
if (errors.length === 0) {
return Either.right('bulk_index_succeeded' as const);
} else {
throw new Error(JSON.stringify(errors));
}
})
.catch(catchRetryableEsClientErrors);
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import { errors as EsErrors } from '@elastic/elasticsearch';
import { cloneIndex } from './clone_index';
import { setWriteBlock } from './set_write_block';
import { elasticsearchClientMock } from '../../../elasticsearch/client/mocks';
import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors';
jest.mock('./catch_retryable_es_client_errors');

describe('cloneIndex', () => {
beforeEach(() => {
jest.clearAllMocks();
});

// Create a mock client that rejects all methods with a 503 status code
// response.
const retryableError = new EsErrors.ResponseError(
elasticsearchClientMock.createApiResponse({
statusCode: 503,
body: { error: { type: 'es_type', reason: 'es_reason' } },
})
);
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
);

const nonRetryableError = new Error('crash');
const clientWithNonRetryableError = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createErrorTransportRequestPromise(nonRetryableError)
);

it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = cloneIndex({
client,
source: 'my_source_index',
target: 'my_target_index',
});
try {
await task();
} catch (e) {
/** ignore */
}
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
});

it('re-throws non retry-able errors', async () => {
const task = setWriteBlock({
client: clientWithNonRetryableError,
index: 'my_index',
});
await task();
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(nonRetryableError);
});
});
141 changes: 141 additions & 0 deletions src/core/server/saved_objects/migrationsv2/actions/clone_index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import * as Either from 'fp-ts/lib/Either';
import * as TaskEither from 'fp-ts/lib/TaskEither';
import { errors as EsErrors } from '@elastic/elasticsearch';
import { pipe } from 'fp-ts/lib/pipeable';
import { ElasticsearchClient } from '../../../elasticsearch';
import {
catchRetryableEsClientErrors,
RetryableEsClientError,
} from './catch_retryable_es_client_errors';
import type { IndexNotFound, AcknowledgeResponse } from './';
import { waitForIndexStatusYellow } from './wait_for_index_status_yellow';
import {
DEFAULT_TIMEOUT,
INDEX_AUTO_EXPAND_REPLICAS,
INDEX_NUMBER_OF_SHARDS,
WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE,
} from './constants';
export type CloneIndexResponse = AcknowledgeResponse;

/** @internal */
export interface CloneIndexParams {
client: ElasticsearchClient;
source: string;
target: string;
/** only used for testing */
timeout?: string;
}
/**
* Makes a clone of the source index into the target.
*
* @remarks
* This method adds some additional logic to the ES clone index API:
* - it is idempotent, if it gets called multiple times subsequent calls will
* wait for the first clone operation to complete (up to 60s)
* - the first call will wait up to 120s for the cluster state and all shards
* to be updated.
*/
export const cloneIndex = ({
client,
source,
target,
timeout = DEFAULT_TIMEOUT,
}: CloneIndexParams): TaskEither.TaskEither<
RetryableEsClientError | IndexNotFound,
CloneIndexResponse
> => {
const cloneTask: TaskEither.TaskEither<
RetryableEsClientError | IndexNotFound,
AcknowledgeResponse
> = () => {
return client.indices
.clone(
{
index: source,
target,
wait_for_active_shards: WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE,
body: {
settings: {
index: {
// The source we're cloning from will have a write block set, so
// we need to remove it to allow writes to our newly cloned index
'blocks.write': false,
number_of_shards: INDEX_NUMBER_OF_SHARDS,
auto_expand_replicas: INDEX_AUTO_EXPAND_REPLICAS,
// Set an explicit refresh interval so that we don't inherit the
// value from incorrectly configured index templates (not required
// after we adopt system indices)
refresh_interval: '1s',
// Bump priority so that recovery happens before newer indices
priority: 10,
},
},
},
timeout,
},
{ maxRetries: 0 /** handle retry ourselves for now */ }
)
.then((res) => {
/**
* - acknowledged=false, we timed out before the cluster state was
* updated with the newly created index, but it probably will be
* created sometime soon.
* - shards_acknowledged=false, we timed out before all shards were
* started
* - acknowledged=true, shards_acknowledged=true, cloning complete
*/
return Either.right({
acknowledged: res.body.acknowledged,
shardsAcknowledged: res.body.shards_acknowledged,
});
})
.catch((error: EsErrors.ResponseError) => {
if (error?.body?.error?.type === 'index_not_found_exception') {
return Either.left({
type: 'index_not_found_exception' as const,
index: error.body.error.index,
});
} else if (error?.body?.error?.type === 'resource_already_exists_exception') {
/**
* If the target index already exists it means a previous clone
* operation had already been started. However, we can't be sure
* that all shards were started so return shardsAcknowledged: false
*/
return Either.right({
acknowledged: true,
shardsAcknowledged: false,
});
} else {
throw error;
}
})
.catch(catchRetryableEsClientErrors);
};

return pipe(
cloneTask,
TaskEither.chain((res) => {
if (res.acknowledged && res.shardsAcknowledged) {
// If the cluster state was updated and all shards ackd we're done
return TaskEither.right(res);
} else {
// Otherwise, wait until the target index has a 'green' status.
return pipe(
waitForIndexStatusYellow({ client, index: target, timeout }),
TaskEither.map((value) => {
/** When the index status is 'green' we know that all shards were started */
return { acknowledged: true, shardsAcknowledged: true };
})
);
}
})
);
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors';
jest.mock('./catch_retryable_es_client_errors');
import { errors as EsErrors } from '@elastic/elasticsearch';
import { elasticsearchClientMock } from '../../../elasticsearch/client/mocks';
import { closePit } from './close_pit';

describe('closePit', () => {
beforeEach(() => {
jest.clearAllMocks();
});

// Create a mock client that rejects all methods with a 503 status code
// response.
const retryableError = new EsErrors.ResponseError(
elasticsearchClientMock.createApiResponse({
statusCode: 503,
body: { error: { type: 'es_type', reason: 'es_reason' } },
})
);
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
);

it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = closePit({ client, pitId: 'pitId' });
try {
await task();
} catch (e) {
/** ignore */
}
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
});
});
Loading