Skip to content

Commit

Permalink
[Security Solution] Dedupe alerts by querying _id before creation (#1…
Browse files Browse the repository at this point in the history
…19045)

* Dedupe alerts by querying _id before creation

* Update alert chunk size

* Use aggregations to find existing alert _ids

* Remove tightly coupled tests

* Add api integration test for alert deduplication

* Remove unused import

* Cleaner util implementation

* Skip flaky test

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
  • Loading branch information
marshallmain and kibanamachine authored Nov 18, 2021
1 parent fd81bf5 commit 8c8c62e
Show file tree
Hide file tree
Showing 7 changed files with 172 additions and 249 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
* 2.0.
*/

import { VERSION } from '@kbn/rule-data-utils';
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { chunk } from 'lodash';
import { ALERT_UUID, VERSION } from '@kbn/rule-data-utils';
import { getCommonAlertFields } from './get_common_alert_fields';
import { CreatePersistenceRuleTypeWrapper } from './persistence_types';

Expand All @@ -26,22 +28,87 @@ export const createPersistenceRuleTypeWrapper: CreatePersistenceRuleTypeWrapper
if (ruleDataClient.isWriteEnabled() && numAlerts) {
const commonRuleFields = getCommonAlertFields(options);

const CHUNK_SIZE = 10000;
const alertChunks = chunk(alerts, CHUNK_SIZE);
const filteredAlerts: typeof alerts = [];

for (const alertChunk of alertChunks) {
const request: estypes.SearchRequest = {
body: {
query: {
ids: {
values: alertChunk.map((alert) => alert._id),
},
},
aggs: {
uuids: {
terms: {
field: ALERT_UUID,
size: CHUNK_SIZE,
},
},
},
size: 0,
},
};
const response = await ruleDataClient
.getReader({ namespace: options.spaceId })
.search(request);
const uuidsMap: Record<string, boolean> = {};
const aggs = response.aggregations as
| Record<estypes.AggregateName, { buckets: Array<{ key: string }> }>
| undefined;
if (aggs != null) {
aggs.uuids.buckets.forEach((bucket) => (uuidsMap[bucket.key] = true));
const newAlerts = alertChunk.filter((alert) => !uuidsMap[alert._id]);
filteredAlerts.push(...newAlerts);
} else {
filteredAlerts.push(...alertChunk);
}
}

if (filteredAlerts.length === 0) {
return { createdAlerts: [] };
}

const augmentedAlerts = filteredAlerts.map((alert) => {
return {
...alert,
_source: {
[VERSION]: ruleDataClient.kibanaVersion,
...commonRuleFields,
...alert._source,
},
};
});

const response = await ruleDataClient
.getWriter({ namespace: options.spaceId })
.bulk({
body: alerts.flatMap((alert) => [
{ index: { _id: alert.id } },
{
[VERSION]: ruleDataClient.kibanaVersion,
...commonRuleFields,
...alert.fields,
},
body: augmentedAlerts.flatMap((alert) => [
{ create: { _id: alert._id } },
alert._source,
]),
refresh,
});
return response;

if (response == null) {
return { createdAlerts: [] };
}

return {
createdAlerts: augmentedAlerts.map((alert, idx) => {
const responseItem = response.body.items[idx].create;
return {
_id: responseItem?._id ?? '',
_index: responseItem?._index ?? '',
...alert._source,
};
}),
};
} else {
logger.debug('Writing is disabled.');
return { createdAlerts: [] };
}
},
},
Expand Down
14 changes: 8 additions & 6 deletions x-pack/plugins/rule_registry/server/utils/persistence_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
* 2.0.
*/

import type { TransportResult } from '@elastic/elasticsearch';
import { BulkResponse } from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { Logger } from '@kbn/logging';
import {
AlertExecutorOptions,
Expand All @@ -19,13 +17,17 @@ import {
import { WithoutReservedActionGroups } from '../../../alerting/common';
import { IRuleDataClient } from '../rule_data_client';

export type PersistenceAlertService = (
export type PersistenceAlertService = <T>(
alerts: Array<{
id: string;
fields: Record<string, unknown>;
_id: string;
_source: T;
}>,
refresh: boolean | 'wait_for'
) => Promise<TransportResult<BulkResponse, unknown> | undefined>;
) => Promise<PersistenceAlertServiceResult<T>>;

export interface PersistenceAlertServiceResult<T> {
createdAlerts: Array<T & { _id: string; _index: string }>;
}

export interface PersistenceServices {
alertWithPersistence: PersistenceAlertService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@
*/

import { performance } from 'perf_hooks';
import { countBy, isEmpty } from 'lodash';

import { Logger } from 'kibana/server';
import { BaseHit } from '../../../../../common/detection_engine/types';
import { BuildRuleMessage } from '../../signals/rule_messages';
import { errorAggregator, makeFloatString } from '../../signals/utils';
import { makeFloatString } from '../../signals/utils';
import { RefreshTypes } from '../../types';
import { PersistenceAlertService } from '../../../../../../rule_registry/server';

Expand Down Expand Up @@ -45,11 +44,11 @@ export const bulkCreateFactory =

const start = performance.now();

const response = await alertWithPersistence(
const { createdAlerts } = await alertWithPersistence(
wrappedDocs.map((doc) => ({
id: doc._id,
_id: doc._id,
// `fields` should have already been merged into `doc._source`
fields: doc._source,
_source: doc._source,
})),
refreshForBulkCreate
);
Expand All @@ -62,64 +61,11 @@ export const bulkCreateFactory =
)
);

if (response == null) {
return {
errors: [
'alertWithPersistence returned undefined response. Alerts as Data write flag may be disabled.',
],
success: false,
bulkCreateDuration: makeFloatString(end - start),
createdItemsCount: 0,
createdItems: [],
};
}

logger.debug(
buildRuleMessage(`took property says bulk took: ${response.body.took} milliseconds`)
);

const createdItems = wrappedDocs
.map((doc, index) => {
const responseIndex = response.body.items[index].index;
return {
_id: responseIndex?._id ?? '',
_index: responseIndex?._index ?? '',
...doc._source,
};
})
.filter((_, index) => response.body.items[index].index?.status === 201);
const createdItemsCount = createdItems.length;

const duplicateSignalsCount = countBy(response.body.items, 'create.status')['409'];
const errorCountByMessage = errorAggregator(response.body, [409]);

logger.debug(buildRuleMessage(`bulk created ${createdItemsCount} signals`));

if (duplicateSignalsCount > 0) {
logger.debug(buildRuleMessage(`ignored ${duplicateSignalsCount} duplicate signals`));
}

if (!isEmpty(errorCountByMessage)) {
logger.error(
buildRuleMessage(
`[-] bulkResponse had errors with responses of: ${JSON.stringify(errorCountByMessage)}`
)
);

return {
errors: Object.keys(errorCountByMessage),
success: false,
bulkCreateDuration: makeFloatString(end - start),
createdItemsCount: createdItems.length,
createdItems,
};
} else {
return {
errors: [],
success: true,
bulkCreateDuration: makeFloatString(end - start),
createdItemsCount: createdItems.length,
createdItems,
};
}
return {
errors: [],
success: true,
bulkCreateDuration: makeFloatString(end - start),
createdItemsCount: createdAlerts.length,
createdItems: createdAlerts,
};
};
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import { allowedExperimentalValues } from '../../../../../common/experimental_fe
import { createRuleTypeMocks } from '../__mocks__/rule_type';
import { createIndicatorMatchAlertType } from './create_indicator_match_alert_type';
import { sampleDocNoSortId } from '../../signals/__mocks__/es_results';
import { CountResponse } from 'kibana/server';
import { RuleParams } from '../../schemas/rule_schemas';
import { createSecurityRuleTypeWrapper } from '../create_security_rule_type_wrapper';
import { createMockConfig } from '../../routes/__mocks__';
Expand Down Expand Up @@ -133,121 +132,4 @@ describe('Indicator Match Alerts', () => {
await executor({ params });
expect(dependencies.ruleDataClient.getWriter).not.toBeCalled();
});

it('sends an alert when enrichments are found', async () => {
const indicatorMatchAlertType = securityRuleTypeWrapper(
createIndicatorMatchAlertType({
experimentalFeatures: allowedExperimentalValues,
logger: dependencies.logger,
version: '1.0.0',
})
);

dependencies.alerting.registerType(indicatorMatchAlertType);

// threat list count
services.scopedClusterClient.asCurrentUser.count.mockReturnValue(
elasticsearchClientMock.createSuccessTransportRequestPromise({ count: 1 } as CountResponse)
);

services.scopedClusterClient.asCurrentUser.search.mockReturnValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise({
hits: {
hits: [
{
...sampleDocNoSortId(v4()),
_source: {
...sampleDocNoSortId(v4())._source,
'threat.indicator.file.hash.md5': 'a1b2c3',
},
fields: {
...sampleDocNoSortId(v4()).fields,
'threat.indicator.file.hash.md5': ['a1b2c3'],
},
},
],
total: {
relation: 'eq',
value: 1,
},
},
took: 0,
timed_out: false,
_shards: {
failed: 0,
skipped: 0,
successful: 1,
total: 1,
},
})
);

services.scopedClusterClient.asCurrentUser.search.mockReturnValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise({
hits: {
hits: [
{
...sampleDocNoSortId(v4()),
_source: {
...sampleDocNoSortId(v4())._source,
'file.hash.md5': 'a1b2c3',
},
fields: {
...sampleDocNoSortId(v4()).fields,
'file.hash.md5': ['a1b2c3'],
},
},
],
total: {
relation: 'eq',
value: 1,
},
},
took: 0,
timed_out: false,
_shards: {
failed: 0,
skipped: 0,
successful: 1,
total: 1,
},
})
);

services.scopedClusterClient.asCurrentUser.search.mockReturnValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise({
hits: {
hits: [
{
...sampleDocNoSortId(v4()),
_source: {
...sampleDocNoSortId(v4())._source,
'file.hash.md5': 'a1b2c3',
},
fields: {
...sampleDocNoSortId(v4()).fields,
'file.hash.md5': ['a1b2c3'],
},
},
],
total: {
relation: 'eq',
value: 1,
},
},
took: 0,
timed_out: false,
_shards: {
failed: 0,
skipped: 0,
successful: 1,
total: 1,
},
})
);

await executor({ params });

expect(dependencies.ruleDataClient.getWriter).toBeCalled();
});
});
Loading

0 comments on commit 8c8c62e

Please sign in to comment.