Skip to content

Commit

Permalink
[ResponseOps][Alerting] fix alert conflict resolution to support create
Browse files Browse the repository at this point in the history
resolves: #190376

In PR #160572, we changed from
using just the bulk op `index` to using `create` when new alerts are
being created.

Unfortunately, the code to handle the bulk responses didn't take into
account that the bulk responses for `create`s need different handling
than `index`s.  Specifically, conflicts for `create` were being treated
as errors.

This PR changes the processing to consider additional ops besides just
`index`.
  • Loading branch information
pmuellr committed Sep 26, 2024
1 parent f8416b8 commit bd4cf3d
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ describe('alert_conflict_resolver', () => {
});

test('no errors in bulk results', async () => {
const { bulkRequest, bulkResponse } = getReqRes('c is is c is');
const { bulkRequest, bulkResponse } = getReqRes('cs is is cs is');
await resolveAlertConflicts({
logger,
esClient,
Expand Down Expand Up @@ -163,18 +163,42 @@ describe('alert_conflict_resolver', () => {
);
});

test('one conflicted doc amonst other successes and errors', async () => {
const { bulkRequest, bulkResponse } = getReqRes('is c ic ie');
test('one conflicted index doc amonst other successes and errors', async () => {
const { bulkRequest, bulkResponse } = getReqRes('is cs ic ie');
esClient.mget.mockResolvedValueOnce({ docs: [getMGetResDoc(2, alertDoc)] });
esClient.bulk.mockResolvedValueOnce({ errors: false, took: 0, items: [getBulkResItem(2)] });

esClient.mget.mockResolvedValueOnce({
docs: [getMGetResDoc(2, alertDoc)],
await resolveAlertConflicts({
logger,
esClient,
bulkRequest,
bulkResponse,
ruleId,
ruleName,
ruleType,
});

esClient.bulk.mockResolvedValueOnce({
errors: false,
took: 0,
items: [getBulkResItem(2)],
});
expect(logger.error).toHaveBeenNthCalledWith(
1,
`Error writing alerts ${ruleInfo}: 2 successful, 1 conflicts, 1 errors: hallo`,
logTags
);
expect(logger.info).toHaveBeenNthCalledWith(
1,
`Retrying bulk update of 1 conflicted alerts ${ruleInfo}`,
logTags
);
expect(logger.info).toHaveBeenNthCalledWith(
2,
`Retried bulk update of 1 conflicted alerts succeeded ${ruleInfo}`,
logTags
);
});

test('one conflicted create doc amonst other successes and errors', async () => {
const { bulkRequest, bulkResponse } = getReqRes('is cs cc ie');
esClient.mget.mockResolvedValueOnce({ docs: [getMGetResDoc(2, alertDoc)] });
esClient.bulk.mockResolvedValueOnce({ errors: false, took: 0, items: [getBulkResItem(2)] });

await resolveAlertConflicts({
logger,
Expand Down Expand Up @@ -204,7 +228,7 @@ describe('alert_conflict_resolver', () => {
});

test('multiple conflicted doc amonst other successes and errors', async () => {
const { bulkRequest, bulkResponse } = getReqRes('is c ic ic ie ic');
const { bulkRequest, bulkResponse } = getReqRes('is cs ic cc ie ic');

esClient.mget.mockResolvedValueOnce({
docs: [getMGetResDoc(2, alertDoc), getMGetResDoc(3, alertDoc), getMGetResDoc(5, alertDoc)],
Expand Down Expand Up @@ -276,7 +300,9 @@ interface GetReqResResult {
/**
* takes as input a string of c, is, ic, ie tokens and builds appropriate
* bulk request and response objects to use in the tests:
* - c: create, ignored by the resolve logic
* - cs: create with success
* - cc: create with conflict
* - ce: create with error but not conflict
* - is: index with success
* - ic: index with conflict
* - ie: index with error but not conflict
Expand All @@ -293,18 +319,30 @@ function getReqRes(bulkOps: string): GetReqResResult {

if (ops[0] === '') return { bulkRequest, bulkResponse };

const createOp = { create: {} };

let id = 0;
for (const op of ops) {
id++;
switch (op) {
// create, ignored by the resolve logic
case 'c':
bulkRequest.operations.push(createOp, alertDoc);
// create with success
case 'cs':
bulkRequest.operations.push(getCreateOp(id), alertDoc);
bulkResponse.items.push(getResponseItem('create', id, false, 200));
break;

// index with conflict
case 'cc':
bulkResponse.errors = true;
bulkRequest.operations.push(getCreateOp(id), alertDoc);
bulkResponse.items.push(getResponseItem('create', id, true, 409));
break;

// index with error but not conflict
case 'ce':
bulkResponse.errors = true;
bulkRequest.operations.push(getCreateOp(id), alertDoc);
bulkResponse.items.push(getResponseItem('create', id, true, 418)); // I'm a teapot
break;

// index with success
case 'is':
bulkRequest.operations.push(getIndexOp(id), alertDoc);
Expand Down Expand Up @@ -355,6 +393,16 @@ function getIndexOp(id: number) {
};
}

function getCreateOp(id: number) {
return {
create: {
_id: `id-${id}`,
_index: `index-${id}`,
require_alias: false,
},
};
}

function getBulkResponse(): BulkResponse {
return {
errors: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ import {
BulkResponse,
BulkOperationContainer,
MgetResponseItem,
BulkCreateOperation,
BulkIndexOperation,
BulkUpdateOperation,
BulkDeleteOperation,
BulkOperationType,
BulkResponseItem,
} from '@elastic/elasticsearch/lib/api/types';

import { Logger, ElasticsearchClient } from '@kbn/core/server';
Expand Down Expand Up @@ -40,6 +46,13 @@ export interface ResolveAlertConflictsParams {
ruleType: string;
}

type BulkOperation =
| BulkCreateOperation
| BulkIndexOperation
| BulkUpdateOperation
| BulkDeleteOperation;

type BulkItem = Partial<Record<BulkOperationType, BulkResponseItem>>;
interface NormalizedBulkRequest {
op: BulkOperationContainer;
doc: unknown;
Expand Down Expand Up @@ -134,6 +147,28 @@ interface MakeBulkRequestResponse {
error?: Error;
}

function getBulkOperation(opContainer?: BulkOperationContainer): BulkOperation | undefined {
if (!opContainer) return undefined;

const operation =
opContainer.create || opContainer.index || opContainer.update || opContainer.delete;

if (!operation) {
throw new Error(`Missing bulk op in op container: ${JSON.stringify(opContainer)}`);
}
return operation;
}

function getItemInfoFromBulk(item?: BulkItem): BulkResponseItem | undefined {
if (!item) return undefined;

const info = item.create || item.index || item.update || item.delete;
if (!info) {
throw new Error(`Missing bulk op in bulk request: ${JSON.stringify(item)}`);
}
return info;
}

// make the bulk request to fix conflicts
async function makeBulkRequest(
esClient: ElasticsearchClient,
Expand All @@ -146,7 +181,7 @@ async function makeBulkRequest(

const bulkResponse = await esClient.bulk(updatedBulkRequest);

const errors = bulkResponse.items.filter((item) => item.index?.error).length;
const errors = bulkResponse.items.filter((item) => getItemInfoFromBulk(item)?.error).length;
return { bulkRequest, bulkResponse, errors };
}

Expand All @@ -156,7 +191,7 @@ async function refreshFieldsInDocs(
freshResponses: MgetResponseItem[]
) {
for (const [conflictRequest, freshResponse] of zip(conflictRequests, freshResponses)) {
if (!conflictRequest?.op.index || !freshResponse) continue;
if (!conflictRequest || !getBulkOperation(conflictRequest?.op) || !freshResponse) continue;

// @ts-expect-error @elastic/elasticsearch _source is not in the type!
const freshDoc = freshResponse._source;
Expand Down Expand Up @@ -190,7 +225,10 @@ async function refreshFieldsInDocs(
/** Update the OCC info in the conflict request with the fresh info. */
async function updateOCC(conflictRequests: NormalizedBulkRequest[], freshDocs: MgetResponseItem[]) {
for (const [req, freshDoc] of zip(conflictRequests, freshDocs)) {
if (!req?.op.index || !freshDoc) continue;
if (!req) continue;

const bulkOperation = getBulkOperation(req?.op);
if (!bulkOperation || !freshDoc) continue;

// @ts-expect-error @elastic/elasticsearch _seq_no is not in the type!
const seqNo: number | undefined = freshDoc._seq_no;
Expand All @@ -200,8 +238,8 @@ async function updateOCC(conflictRequests: NormalizedBulkRequest[], freshDocs: M
if (seqNo === undefined) throw new Error('Unexpected undefined seqNo');
if (primaryTerm === undefined) throw new Error('Unexpected undefined primaryTerm');

req.op.index.if_seq_no = seqNo;
req.op.index.if_primary_term = primaryTerm;
bulkOperation.if_seq_no = seqNo;
bulkOperation.if_primary_term = primaryTerm;
}
}

Expand All @@ -213,7 +251,8 @@ async function getFreshDocs(
const docs: Array<{ _id: string; _index: string }> = [];

conflictRequests.forEach((req) => {
const [id, index] = [req.op.index?._id, req.op.index?._index];
const bulkOperation = getBulkOperation(req?.op);
const [id, index] = [bulkOperation?._id, bulkOperation?._index];
if (!id || !index) return;

docs.push({ _id: id, _index: index });
Expand Down Expand Up @@ -245,9 +284,9 @@ function getConflictRequest(

if (request.length === 0) return [];

// we only want op: index where the status was 409 / conflict
// pick out just the conflicts (409)
const conflictRequest = zip(request, bulkResponse.items)
.filter(([_, res]) => res?.index?.status === 409)
.filter(([_, res]) => getItemInfoFromBulk(res)?.status === 409)
.map(([req, _]) => req!);

return conflictRequest;
Expand Down Expand Up @@ -292,9 +331,9 @@ function getResponseStats(bulkResponse: BulkResponse): ResponseStatsResult {
const sanitizedResponse = sanitizeBulkErrorResponse(bulkResponse) as BulkResponse;
const stats: ResponseStatsResult = { success: 0, conflicts: 0, errors: 0, messages: [] };
for (const item of sanitizedResponse.items) {
const op = item.create || item.index || item.update || item.delete;
const op = getItemInfoFromBulk(item);
if (op?.error) {
if (op?.status === 409 && op === item.index) {
if (op?.status === 409) {
stats.conflicts++;
} else {
stats.errors++;
Expand Down

0 comments on commit bd4cf3d

Please sign in to comment.