Skip to content

Commit

Permalink
fix reindex race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
pgayvallet committed Jul 6, 2021
1 parent 7395fee commit a4ca9dc
Show file tree
Hide file tree
Showing 7 changed files with 287 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,96 @@
* Side Public License, v 1.
*/

import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors';
import * as Either from 'fp-ts/Either';
import { errors as EsErrors } from '@elastic/elasticsearch';
jest.mock('./catch_retryable_es_client_errors');
import { elasticsearchClientMock } from '../../../elasticsearch/client/mocks';
import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors';
import { bulkOverwriteTransformedDocuments } from './bulk_overwrite_transformed_documents';

jest.mock('./catch_retryable_es_client_errors');

describe('bulkOverwriteTransformedDocuments', () => {
beforeEach(() => {
jest.clearAllMocks();
});

// Create a mock client that rejects all methods with a 503 status code
// response.
const retryableError = new EsErrors.ResponseError(
elasticsearchClientMock.createApiResponse({
statusCode: 503,
body: { error: { type: 'es_type', reason: 'es_reason' } },
})
);
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
);
it('resolves with `right:bulk_index_succeeded` if no error is encountered', async () => {
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createSuccessTransportRequestPromise({
items: [
{
index: {
_index: '.dolly',
},
},
{
index: {
_index: '.dolly',
},
},
],
})
);

const task = bulkOverwriteTransformedDocuments({
client,
index: 'new_index',
transformedDocs: [],
refresh: 'wait_for',
});

const result = await task();

expect(Either.isRight(result)).toBe(true);
expect((result as Either.Right<any>).right).toEqual('bulk_index_succeeded');
});

it('resolves with `right:bulk_index_succeeded` if version conflict errors are encountered', async () => {
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createSuccessTransportRequestPromise({
items: [
{
index: {
_index: '.dolly',
},
},
{
index: {
error: {
type: 'version_conflict_engine_exception',
reason: 'reason',
},
},
},
],
})
);

const task = bulkOverwriteTransformedDocuments({
client,
index: 'new_index',
transformedDocs: [],
refresh: 'wait_for',
});

const result = await task();

expect(Either.isRight(result)).toBe(true);
expect((result as Either.Right<any>).right).toEqual('bulk_index_succeeded');
});

it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
// Create a mock client that rejects all methods with a 503 status code response.
const retryableError = new EsErrors.ResponseError(
elasticsearchClientMock.createApiResponse({
statusCode: 503,
body: { error: { type: 'es_type', reason: 'es_reason' } },
})
);
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
);

const task = bulkOverwriteTransformedDocuments({
client,
index: 'new_index',
Expand All @@ -43,4 +110,93 @@ describe('bulkOverwriteTransformedDocuments', () => {

expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
});

it('resolves with `left:target_index_had_write_block` if all errors are write block exceptions', async () => {
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createSuccessTransportRequestPromise({
items: [
{
index: {
error: {
type: 'cluster_block_exception',
reason:
'index [.kibana_9000] blocked by: [FORBIDDEN/8/moving to block index write (api)]',
},
},
},
{
index: {
error: {
type: 'cluster_block_exception',
reason:
'index [.kibana_9000] blocked by: [FORBIDDEN/8/moving to block index write (api)]',
},
},
},
],
})
);

const task = bulkOverwriteTransformedDocuments({
client,
index: 'new_index',
transformedDocs: [],
refresh: 'wait_for',
});

const result = await task();

expect(Either.isLeft(result)).toBe(true);
expect((result as Either.Left<any>).left).toEqual({
type: 'target_index_had_write_block',
});
});

it('throws an error if any error is not a write block exceptions', async () => {
(catchRetryableEsClientErrors as jest.Mock).mockImplementation((e) => {
throw e;
});

const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createSuccessTransportRequestPromise({
items: [
{
index: {
error: {
type: 'cluster_block_exception',
reason:
'index [.kibana_9000] blocked by: [FORBIDDEN/8/moving to block index write (api)]',
},
},
},
{
index: {
error: {
type: 'dolly_exception',
reason: 'because',
},
},
},
{
index: {
error: {
type: 'cluster_block_exception',
reason:
'index [.kibana_9000] blocked by: [FORBIDDEN/8/moving to block index write (api)]',
},
},
},
],
})
);

const task = bulkOverwriteTransformedDocuments({
client,
index: 'new_index',
transformedDocs: [],
refresh: 'wait_for',
});

await expect(task()).rejects.toThrow();
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import {
catchRetryableEsClientErrors,
RetryableEsClientError,
} from './catch_retryable_es_client_errors';
import { isWriteBlockException } from './es_errors';
import { WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE } from './constants';
import type { TargetIndexHadWriteBlock } from './index';

/** @internal */
export interface BulkOverwriteTransformedDocumentsParams {
Expand All @@ -24,6 +26,7 @@ export interface BulkOverwriteTransformedDocumentsParams {
transformedDocs: SavedObjectsRawDoc[];
refresh?: estypes.Refresh;
}

/**
* Write the up-to-date transformed documents to the index, overwriting any
* documents that are still on their outdated version.
Expand All @@ -34,7 +37,7 @@ export const bulkOverwriteTransformedDocuments = ({
transformedDocs,
refresh = false,
}: BulkOverwriteTransformedDocumentsParams): TaskEither.TaskEither<
RetryableEsClientError,
RetryableEsClientError | TargetIndexHadWriteBlock,
'bulk_index_succeeded'
> => () => {
return client
Expand Down Expand Up @@ -71,12 +74,19 @@ export const bulkOverwriteTransformedDocuments = ({
.then((res) => {
// Filter out version_conflict_engine_exception since these just mean
// that another instance already updated these documents
const errors = (res.body.items ?? []).filter(
(item) => item.index?.error?.type !== 'version_conflict_engine_exception'
);
const errors = (res.body.items ?? [])
.filter((item) => item.index?.error)
.map((item) => item.index!.error!)
.filter(({ type }) => type !== 'version_conflict_engine_exception');

if (errors.length === 0) {
return Either.right('bulk_index_succeeded' as const);
} else {
if (errors.every(isWriteBlockException)) {
return Either.left({
type: 'target_index_had_write_block' as const,
});
}
throw new Error(JSON.stringify(errors));
}
})
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import { isIncompatibleMappingException, isWriteBlockException } from './es_errors';

describe('isWriteBlockError', () => {
it('returns true for a `index write` cluster_block_exception', () => {
expect(
isWriteBlockException({
type: 'cluster_block_exception',
reason: `index [.kibana_dolly] blocked by: [FORBIDDEN/8/index write (api)]`,
})
).toEqual(true);
});
it('returns true for a `moving to block index write` cluster_block_exception', () => {
expect(
isWriteBlockException({
type: 'cluster_block_exception',
reason: `index [.kibana_dolly] blocked by: [FORBIDDEN/8/moving to block index write (api)]`,
})
).toEqual(true);
});
it('returns false for incorrect type', () => {
expect(
isWriteBlockException({
type: 'not_a_cluster_block_exception_at_all',
reason: `index [.kibana_dolly] blocked by: [FORBIDDEN/8/index write (api)]`,
})
).toEqual(false);
});
});

describe('isIncompatibleMappingExceptionError', () => {
it('returns true for `strict_dynamic_mapping_exception` errors', () => {
expect(
isIncompatibleMappingException({
type: 'strict_dynamic_mapping_exception',
reason: 'idk',
})
).toEqual(true);
});

it('returns true for `mapper_parsing_exception` errors', () => {
expect(
isIncompatibleMappingException({
type: 'mapper_parsing_exception',
reason: 'idk',
})
).toEqual(true);
});
});
23 changes: 23 additions & 0 deletions src/core/server/saved_objects/migrationsv2/actions/es_errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

export interface EsErrorCause {
type: string;
reason: string;
}

export const isWriteBlockException = ({ type, reason }: EsErrorCause): boolean => {
return (
type === 'cluster_block_exception' &&
reason.match(/index \[.+] blocked by: \[FORBIDDEN\/8\/.+ \(api\)\]/) !== null
);
};

export const isIncompatibleMappingException = ({ type }: EsErrorCause): boolean => {
return type === 'strict_dynamic_mapping_exception' || type === 'mapper_parsing_exception';
};
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ import * as TaskEither from 'fp-ts/lib/TaskEither';
import * as Option from 'fp-ts/lib/Option';
import { flow } from 'fp-ts/lib/function';
import { RetryableEsClientError } from './catch_retryable_es_client_errors';
import type { IndexNotFound, WaitForReindexTaskFailure, TargetIndexHadWriteBlock } from './index';
import type { IndexNotFound, TargetIndexHadWriteBlock } from './index';
import { waitForTask, WaitForTaskCompletionTimeout } from './wait_for_task';
import { isWriteBlockException, isIncompatibleMappingException } from './es_errors';

export interface IncompatibleMappingException {
type: 'incompatible_mapping_exception';
}

export const waitForReindexTask = flow(
waitForTask,
TaskEither.chain(
Expand All @@ -29,15 +31,6 @@ export const waitForReindexTask = flow(
| WaitForTaskCompletionTimeout,
'reindex_succeeded'
> => {
const failureIsAWriteBlock = ({ cause: { type, reason } }: WaitForReindexTaskFailure) =>
type === 'cluster_block_exception' &&
reason.match(/index \[.+] blocked by: \[FORBIDDEN\/8\/index write \(api\)\]/);

const failureIsIncompatibleMappingException = ({
cause: { type, reason },
}: WaitForReindexTaskFailure) =>
type === 'strict_dynamic_mapping_exception' || type === 'mapper_parsing_exception';

if (Option.isSome(res.error)) {
if (res.error.value.type === 'index_not_found_exception') {
return TaskEither.left({
Expand All @@ -48,9 +41,9 @@ export const waitForReindexTask = flow(
throw new Error('Reindex failed with the following error:\n' + JSON.stringify(res.error));
}
} else if (Option.isSome(res.failures)) {
if (res.failures.value.every(failureIsAWriteBlock)) {
if (res.failures.value.every(isWriteBlockException)) {
return TaskEither.left({ type: 'target_index_had_write_block' as const });
} else if (res.failures.value.every(failureIsIncompatibleMappingException)) {
} else if (res.failures.value.every(isIncompatibleMappingException)) {
return TaskEither.left({ type: 'incompatible_mapping_exception' as const });
} else {
throw new Error(
Expand Down
Loading

0 comments on commit a4ca9dc

Please sign in to comment.