diff --git a/x-pack/plugins/security_solution/common/experimental_features.ts b/x-pack/plugins/security_solution/common/experimental_features.ts index 079ad8434f445..dd00c9b3ab190 100644 --- a/x-pack/plugins/security_solution/common/experimental_features.ts +++ b/x-pack/plugins/security_solution/common/experimental_features.ts @@ -117,6 +117,11 @@ export const allowedExperimentalValues = Object.freeze({ */ detectionsCoverageOverview: false, + /** + * Enable risk engine client and initialisation of datastream, component templates and mappings + */ + riskScoringPersistence: false, + /** * Enables experimental Entity Analytics HTTP endpoints */ diff --git a/x-pack/plugins/security_solution/server/lib/detection_engine/routes/__mocks__/request_context.ts b/x-pack/plugins/security_solution/server/lib/detection_engine/routes/__mocks__/request_context.ts index 94e688a4db079..f45a57208e915 100644 --- a/x-pack/plugins/security_solution/server/lib/detection_engine/routes/__mocks__/request_context.ts +++ b/x-pack/plugins/security_solution/server/lib/detection_engine/routes/__mocks__/request_context.ts @@ -34,6 +34,7 @@ import type { import { getEndpointAuthzInitialStateMock } from '../../../../../common/endpoint/service/authz/mocks'; import type { EndpointAuthz } from '../../../../../common/endpoint/types/authz'; +import { riskEngineDataClientMock } from '../../../risk_engine/__mocks__/risk_engine_data_client_mock'; export const createMockClients = () => { const core = coreMock.createRequestHandlerContext(); @@ -61,6 +62,7 @@ export const createMockClients = () => { detectionEngineHealthClient: detectionEngineHealthClientMock.create(), ruleExecutionLog: ruleExecutionLogMock.forRoutes.create(), + riskEngineDataClient: riskEngineDataClientMock.create(), }; }; @@ -139,6 +141,7 @@ const createSecuritySolutionRequestContextMock = ( // TODO: Mock EndpointInternalFleetServicesInterface and return the mocked object. throw new Error('Not implemented'); }), + getRiskEngineDataClient: jest.fn(() => clients.riskEngineDataClient), }; }; diff --git a/x-pack/plugins/security_solution/server/lib/risk_engine/__mocks__/risk_engine_data_client_mock.ts b/x-pack/plugins/security_solution/server/lib/risk_engine/__mocks__/risk_engine_data_client_mock.ts new file mode 100644 index 0000000000000..0e9f1fade7bb6 --- /dev/null +++ b/x-pack/plugins/security_solution/server/lib/risk_engine/__mocks__/risk_engine_data_client_mock.ts @@ -0,0 +1,16 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { RiskEngineDataClient } from '../risk_engine_data_client'; + +const createRiskEngineDataClientMock = () => + ({ + getWriter: jest.fn(), + initializeResources: jest.fn(), + } as unknown as jest.Mocked); + +export const riskEngineDataClientMock = { create: createRiskEngineDataClientMock }; diff --git a/x-pack/plugins/security_solution/server/lib/risk_engine/configurations.ts b/x-pack/plugins/security_solution/server/lib/risk_engine/configurations.ts new file mode 100644 index 0000000000000..64b31b2c705a5 --- /dev/null +++ b/x-pack/plugins/security_solution/server/lib/risk_engine/configurations.ts @@ -0,0 +1,97 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +import type { FieldMap } from '@kbn/alerts-as-data-utils'; +import type { IIndexPatternString } from './utils/create_datastream'; + +export const ilmPolicy = { + _meta: { + managed: true, + }, + phases: { + hot: { + actions: { + rollover: { + max_age: '30d', + max_primary_shard_size: '50gb', + }, + }, + }, + }, +}; + +export const riskFieldMap: FieldMap = { + '@timestamp': { + type: 'date', + array: false, + required: false, + }, + identifierField: { + type: 'keyword', + array: false, + required: false, + }, + identifierValue: { + type: 'keyword', + array: false, + required: false, + }, + level: { + type: 'keyword', + array: false, + required: false, + }, + totalScore: { + type: 'float', + array: false, + required: false, + }, + totalScoreNormalized: { + type: 'float', + array: false, + required: false, + }, + alertsScore: { + type: 'float', + array: false, + required: false, + }, + otherScore: { + type: 'float', + array: false, + required: false, + }, + riskiestInputs: { + type: 'nested', + required: false, + }, + 'riskiestInputs.id': { + type: 'keyword', + array: false, + required: false, + }, + 'riskiestInputs.index': { + type: 'keyword', + array: false, + required: false, + }, + 'riskiestInputs.riskScore': { + type: 'float', + array: false, + required: false, + }, +} as const; + +export const ilmPolicyName = '.risk-score-ilm-policy'; +export const mappingComponentName = '.risk-score-mappings'; +export const totalFieldsLimit = 1000; + +const riskScoreBaseIndexName = 'risk-score'; + +export const getIndexPattern = (namespace: string): IIndexPatternString => ({ + template: `.${riskScoreBaseIndexName}.${riskScoreBaseIndexName}-${namespace}-index-template`, + alias: `${riskScoreBaseIndexName}.${riskScoreBaseIndexName}-${namespace}`, +}); diff --git a/x-pack/plugins/security_solution/server/lib/risk_engine/risk_engine_data_client.test.ts b/x-pack/plugins/security_solution/server/lib/risk_engine/risk_engine_data_client.test.ts new file mode 100644 index 0000000000000..391d89b2ebf8b --- /dev/null +++ b/x-pack/plugins/security_solution/server/lib/risk_engine/risk_engine_data_client.test.ts @@ -0,0 +1,218 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { + createOrUpdateComponentTemplate, + createOrUpdateIlmPolicy, + createOrUpdateIndexTemplate, +} from '@kbn/alerting-plugin/server'; +import { loggingSystemMock, elasticsearchServiceMock } from '@kbn/core/server/mocks'; +import { RiskEngineDataClient } from './risk_engine_data_client'; +import { createDataStream } from './utils/create_datastream'; + +jest.mock('@kbn/alerting-plugin/server', () => ({ + createOrUpdateComponentTemplate: jest.fn(), + createOrUpdateIlmPolicy: jest.fn(), + createOrUpdateIndexTemplate: jest.fn(), +})); + +jest.mock('./utils/create_datastream', () => ({ + createDataStream: jest.fn(), +})); + +describe('RiskEngineDataClient', () => { + let riskEngineDataClient: RiskEngineDataClient; + let logger: ReturnType; + const esClient = elasticsearchServiceMock.createClusterClient().asInternalUser; + const totalFieldsLimit = 1000; + + beforeEach(() => { + logger = loggingSystemMock.createLogger(); + const options = { + logger, + kibanaVersion: '8.9.0', + elasticsearchClientPromise: Promise.resolve(esClient), + }; + riskEngineDataClient = new RiskEngineDataClient(options); + }); + + afterEach(() => { + jest.clearAllMocks(); + }); + + describe('getWriter', () => { + it('should return a writer object', async () => { + const writer = await riskEngineDataClient.getWriter({ namespace: 'default' }); + expect(writer).toBeDefined(); + expect(typeof writer?.bulk).toBe('function'); + }); + + it('should cache and return the same writer for the same namespace', async () => { + const writer1 = await riskEngineDataClient.getWriter({ namespace: 'default' }); + const writer2 = await riskEngineDataClient.getWriter({ namespace: 'default' }); + const writer3 = await riskEngineDataClient.getWriter({ namespace: 'space-1' }); + + expect(writer1).toEqual(writer2); + expect(writer2).not.toEqual(writer3); + }); + + it('should cache writer and not call initializeResources for a second tme', async () => { + const initializeResourcesSpy = jest.spyOn(riskEngineDataClient, 'initializeResources'); + await riskEngineDataClient.getWriter({ namespace: 'default' }); + await riskEngineDataClient.getWriter({ namespace: 'default' }); + expect(initializeResourcesSpy).toHaveBeenCalledTimes(1); + }); + }); + + describe('initializeResources succes', () => { + it('should initialize risk engine resources', async () => { + await riskEngineDataClient.initializeResources({ namespace: 'default' }); + + expect(createOrUpdateIlmPolicy).toHaveBeenCalledWith({ + logger, + esClient, + name: '.risk-score-ilm-policy', + policy: { + _meta: { + managed: true, + }, + phases: { + hot: { + actions: { + rollover: { + max_age: '30d', + max_primary_shard_size: '50gb', + }, + }, + }, + }, + }, + }); + + expect(createOrUpdateComponentTemplate).toHaveBeenCalledWith({ + logger, + esClient, + template: { + name: '.risk-score-mappings', + _meta: { + managed: true, + }, + template: { + settings: {}, + mappings: { + dynamic: 'strict', + properties: { + '@timestamp': { + type: 'date', + }, + alertsScore: { + type: 'float', + }, + identifierField: { + type: 'keyword', + }, + identifierValue: { + type: 'keyword', + }, + level: { + type: 'keyword', + }, + otherScore: { + type: 'float', + }, + riskiestInputs: { + properties: { + id: { + type: 'keyword', + }, + index: { + type: 'keyword', + }, + riskScore: { + type: 'float', + }, + }, + type: 'nested', + }, + totalScore: { + type: 'float', + }, + totalScoreNormalized: { + type: 'float', + }, + }, + }, + }, + }, + totalFieldsLimit, + }); + + expect(createOrUpdateIndexTemplate).toHaveBeenCalledWith({ + logger, + esClient, + template: { + name: '.risk-score.risk-score-default-index-template', + body: { + data_stream: { hidden: true }, + index_patterns: ['risk-score.risk-score-default'], + composed_of: ['.risk-score-mappings'], + template: { + settings: { + auto_expand_replicas: '0-1', + hidden: true, + 'index.lifecycle': { + name: '.risk-score-ilm-policy', + }, + 'index.mapping.total_fields.limit': totalFieldsLimit, + }, + mappings: { + dynamic: false, + _meta: { + kibana: { + version: '8.9.0', + }, + managed: true, + namespace: 'default', + }, + }, + }, + _meta: { + kibana: { + version: '8.9.0', + }, + managed: true, + namespace: 'default', + }, + }, + }, + }); + + expect(createDataStream).toHaveBeenCalledWith({ + logger, + esClient, + totalFieldsLimit, + indexPatterns: { + template: `.risk-score.risk-score-default-index-template`, + alias: `risk-score.risk-score-default`, + }, + }); + }); + }); + + describe('initializeResources error', () => { + it('should handle errors during initialization', async () => { + const error = new Error('There error'); + (createOrUpdateIlmPolicy as jest.Mock).mockRejectedValue(error); + + await riskEngineDataClient.initializeResources({ namespace: 'default' }); + + expect(logger.error).toHaveBeenCalledWith( + `Error initializing risk engine resources: ${error.message}` + ); + }); + }); +}); diff --git a/x-pack/plugins/security_solution/server/lib/risk_engine/risk_engine_data_client.ts b/x-pack/plugins/security_solution/server/lib/risk_engine/risk_engine_data_client.ts new file mode 100644 index 0000000000000..9b77741bb164a --- /dev/null +++ b/x-pack/plugins/security_solution/server/lib/risk_engine/risk_engine_data_client.ts @@ -0,0 +1,142 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +import type { Metadata } from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; +import type { ClusterPutComponentTemplateRequest } from '@elastic/elasticsearch/lib/api/types'; +import { + createOrUpdateComponentTemplate, + createOrUpdateIlmPolicy, + createOrUpdateIndexTemplate, +} from '@kbn/alerting-plugin/server'; +import { mappingFromFieldMap } from '@kbn/alerting-plugin/common'; +import { DEFAULT_NAMESPACE_STRING } from '@kbn/core-saved-objects-utils-server'; +import type { Logger, ElasticsearchClient } from '@kbn/core/server'; +import { + riskFieldMap, + getIndexPattern, + totalFieldsLimit, + mappingComponentName, + ilmPolicyName, + ilmPolicy, +} from './configurations'; +import { createDataStream } from './utils/create_datastream'; + +interface InitializeRiskEngineResourcesOpts { + namespace?: string; +} + +interface RiskEngineDataClientOpts { + logger: Logger; + kibanaVersion: string; + elasticsearchClientPromise: Promise; +} + +interface Writer { + bulk: () => Promise; +} + +export class RiskEngineDataClient { + private writerCache: Map = new Map(); + constructor(private readonly options: RiskEngineDataClientOpts) {} + + public async getWriter({ namespace }: { namespace: string }): Promise { + if (this.writerCache.get(namespace)) { + return this.writerCache.get(namespace) as Writer; + } + + await this.initializeResources({ namespace }); + return this.writerCache.get(namespace) as Writer; + } + + private async initializeWriter(namespace: string): Promise { + const writer: Writer = { + bulk: async () => {}, + }; + this.writerCache.set(namespace, writer); + return writer; + } + + public async initializeResources({ + namespace = DEFAULT_NAMESPACE_STRING, + }: InitializeRiskEngineResourcesOpts) { + try { + const esClient = await this.options.elasticsearchClientPromise; + + const indexPatterns = getIndexPattern(namespace); + + const indexMetadata: Metadata = { + kibana: { + version: this.options.kibanaVersion, + }, + managed: true, + namespace, + }; + + await Promise.all([ + createOrUpdateIlmPolicy({ + logger: this.options.logger, + esClient, + name: ilmPolicyName, + policy: ilmPolicy, + }), + createOrUpdateComponentTemplate({ + logger: this.options.logger, + esClient, + template: { + name: mappingComponentName, + _meta: { + managed: true, + }, + template: { + settings: {}, + mappings: mappingFromFieldMap(riskFieldMap, 'strict'), + }, + } as ClusterPutComponentTemplateRequest, + totalFieldsLimit, + }), + ]); + + await createOrUpdateIndexTemplate({ + logger: this.options.logger, + esClient, + template: { + name: indexPatterns.template, + body: { + data_stream: { hidden: true }, + index_patterns: [indexPatterns.alias], + composed_of: [mappingComponentName], + template: { + settings: { + auto_expand_replicas: '0-1', + hidden: true, + 'index.lifecycle': { + name: ilmPolicyName, + }, + 'index.mapping.total_fields.limit': totalFieldsLimit, + }, + mappings: { + dynamic: false, + _meta: indexMetadata, + }, + }, + _meta: indexMetadata, + }, + }, + }); + + await createDataStream({ + logger: this.options.logger, + esClient, + totalFieldsLimit, + indexPatterns, + }); + + this.initializeWriter(namespace); + } catch (error) { + this.options.logger.error(`Error initializing risk engine resources: ${error.message}`); + } + } +} diff --git a/x-pack/plugins/security_solution/server/lib/risk_engine/utils/create_datastream.ts b/x-pack/plugins/security_solution/server/lib/risk_engine/utils/create_datastream.ts new file mode 100644 index 0000000000000..910ba5e887046 --- /dev/null +++ b/x-pack/plugins/security_solution/server/lib/risk_engine/utils/create_datastream.ts @@ -0,0 +1,206 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +// This file is a copy of x-pack/plugins/alerting/server/alerts_service/lib/create_concrete_write_index.ts +// original function create index instead of datastream, and their have plan to use datastream in the future +// so we probably should remove this file and use the original when datastream will be supported + +import type { IndicesSimulateIndexTemplateResponse } from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; +import type { Logger, ElasticsearchClient } from '@kbn/core/server'; +import { get } from 'lodash'; +import { retryTransientEsErrors } from './retry_transient_es_errors'; + +export interface IIndexPatternString { + template: string; + alias: string; +} + +interface ConcreteIndexInfo { + index: string; + alias: string; + isWriteIndex: boolean; +} + +interface UpdateIndexMappingsOpts { + logger: Logger; + esClient: ElasticsearchClient; + totalFieldsLimit: number; + concreteIndices: ConcreteIndexInfo[]; +} + +interface UpdateIndexOpts { + logger: Logger; + esClient: ElasticsearchClient; + totalFieldsLimit: number; + concreteIndexInfo: ConcreteIndexInfo; +} + +const updateTotalFieldLimitSetting = async ({ + logger, + esClient, + totalFieldsLimit, + concreteIndexInfo, +}: UpdateIndexOpts) => { + const { index, alias } = concreteIndexInfo; + try { + await retryTransientEsErrors( + () => + esClient.indices.putSettings({ + index, + body: { 'index.mapping.total_fields.limit': totalFieldsLimit }, + }), + { logger } + ); + return; + } catch (err) { + logger.error( + `Failed to PUT index.mapping.total_fields.limit settings for alias ${alias}: ${err.message}` + ); + throw err; + } +}; + +// This will update the mappings of backing indices but *not* the settings. This +// is due to the fact settings can be classed as dynamic and static, and static +// updates will fail on an index that isn't closed. New settings *will* be applied as part +// of the ILM policy rollovers. More info: https://github.com/elastic/kibana/pull/113389#issuecomment-940152654 +const updateUnderlyingMapping = async ({ + logger, + esClient, + concreteIndexInfo, +}: UpdateIndexOpts) => { + const { index, alias } = concreteIndexInfo; + let simulatedIndexMapping: IndicesSimulateIndexTemplateResponse; + try { + simulatedIndexMapping = await retryTransientEsErrors( + () => esClient.indices.simulateIndexTemplate({ name: index }), + { logger } + ); + } catch (err) { + logger.error( + `Ignored PUT mappings for alias ${alias}; error generating simulated mappings: ${err.message}` + ); + return; + } + + const simulatedMapping = get(simulatedIndexMapping, ['template', 'mappings']); + + if (simulatedMapping == null) { + logger.error(`Ignored PUT mappings for alias ${alias}; simulated mappings were empty`); + return; + } + + try { + await retryTransientEsErrors( + () => esClient.indices.putMapping({ index, body: simulatedMapping }), + { logger } + ); + + return; + } catch (err) { + logger.error(`Failed to PUT mapping for alias ${alias}: ${err.message}`); + throw err; + } +}; +/** + * Updates the underlying mapping for any existing concrete indices + */ +const updateIndexMappings = async ({ + logger, + esClient, + totalFieldsLimit, + concreteIndices, +}: UpdateIndexMappingsOpts) => { + logger.debug(`Updating underlying mappings for ${concreteIndices.length} indices.`); + + // Update total field limit setting of found indices + // Other index setting changes are not updated at this time + await Promise.all( + concreteIndices.map((index) => + updateTotalFieldLimitSetting({ logger, esClient, totalFieldsLimit, concreteIndexInfo: index }) + ) + ); + + // Update mappings of the found indices. + await Promise.all( + concreteIndices.map((index) => + updateUnderlyingMapping({ logger, esClient, totalFieldsLimit, concreteIndexInfo: index }) + ) + ); +}; + +interface CreateConcreteWriteIndexOpts { + logger: Logger; + esClient: ElasticsearchClient; + totalFieldsLimit: number; + indexPatterns: IIndexPatternString; +} +/** + * Create a data stream + */ +export const createDataStream = async ({ + logger, + esClient, + indexPatterns, + totalFieldsLimit, +}: CreateConcreteWriteIndexOpts) => { + logger.info(`Creating data stream - ${indexPatterns.alias}`); + + // check if a datastream already exists + let dataStreams: ConcreteIndexInfo[] = []; + try { + // Specify both the index pattern for the backing indices and their aliases + // The alias prevents the request from finding other namespaces that could match the -* pattern + const response = await retryTransientEsErrors( + () => esClient.indices.getDataStream({ name: indexPatterns.alias, expand_wildcards: 'all' }), + { logger } + ); + + dataStreams = response.data_streams.map((dataStream) => ({ + index: dataStream.name, + alias: dataStream.name, + isWriteIndex: true, + })); + + logger.debug( + `Found ${dataStreams.length} concrete indices for ${indexPatterns.alias} - ${JSON.stringify( + dataStreams + )}` + ); + } catch (error) { + // 404 is expected if no datastream have been created + if (error.statusCode !== 404) { + logger.error( + `Error fetching concrete indices for ${indexPatterns.alias} pattern - ${error.message}` + ); + throw error; + } + } + + const dataStreamsExist = dataStreams.length > 0; + + // if a concrete write datastream already exists, update the underlying mapping + if (dataStreams.length > 0) { + await updateIndexMappings({ logger, esClient, totalFieldsLimit, concreteIndices: dataStreams }); + } + + // check if a concrete write datastream already exists + if (!dataStreamsExist) { + try { + await retryTransientEsErrors( + () => + esClient.indices.createDataStream({ + name: indexPatterns.alias, + }), + { logger } + ); + } catch (error) { + logger.error(`Error creating datastream - ${error.message}`); + throw error; + } + } +}; diff --git a/x-pack/plugins/security_solution/server/lib/risk_engine/utils/retry_transient_es_errors.test.ts b/x-pack/plugins/security_solution/server/lib/risk_engine/utils/retry_transient_es_errors.test.ts new file mode 100644 index 0000000000000..2501c57776d80 --- /dev/null +++ b/x-pack/plugins/security_solution/server/lib/risk_engine/utils/retry_transient_es_errors.test.ts @@ -0,0 +1,95 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +import { loggerMock } from '@kbn/logging-mocks'; +import { errors as EsErrors } from '@elastic/elasticsearch'; + +import { retryTransientEsErrors } from './retry_transient_es_errors'; + +const logger = loggerMock.create(); +const randomDelayMultiplier = 0.01; + +describe('retryTransientErrors', () => { + beforeEach(() => { + jest.resetAllMocks(); + jest.spyOn(global.Math, 'random').mockReturnValue(randomDelayMultiplier); + }); + + it("doesn't retry if operation is successful", async () => { + const esCallMock = jest.fn().mockResolvedValue('success'); + expect(await retryTransientEsErrors(esCallMock, { logger })).toEqual('success'); + expect(esCallMock).toHaveBeenCalledTimes(1); + }); + + it('logs a warning message on retry', async () => { + const esCallMock = jest + .fn() + .mockRejectedValueOnce(new EsErrors.ConnectionError('foo')) + .mockResolvedValue('success'); + + await retryTransientEsErrors(esCallMock, { logger }); + expect(logger.warn).toHaveBeenCalledTimes(1); + expect(logger.warn.mock.calls[0][0]).toMatch( + `Retrying Elasticsearch operation after [2s] due to error: ConnectionError: foo ConnectionError: foo` + ); + }); + + it('retries with an exponential backoff', async () => { + let attempt = 0; + const esCallMock = jest.fn(async () => { + attempt++; + if (attempt < 4) { + throw new EsErrors.ConnectionError('foo'); + } else { + return 'success'; + } + }); + + expect(await retryTransientEsErrors(esCallMock, { logger })).toEqual('success'); + expect(esCallMock).toHaveBeenCalledTimes(4); + expect(logger.warn).toHaveBeenCalledTimes(3); + expect(logger.warn.mock.calls[0][0]).toMatch( + `Retrying Elasticsearch operation after [2s] due to error: ConnectionError: foo ConnectionError: foo` + ); + expect(logger.warn.mock.calls[1][0]).toMatch( + `Retrying Elasticsearch operation after [4s] due to error: ConnectionError: foo ConnectionError: foo` + ); + expect(logger.warn.mock.calls[2][0]).toMatch( + `Retrying Elasticsearch operation after [8s] due to error: ConnectionError: foo ConnectionError: foo` + ); + }); + + it('retries each supported error type', async () => { + const errors = [ + new EsErrors.NoLivingConnectionsError('no living connection', { + warnings: [], + // eslint-disable-next-line @typescript-eslint/no-explicit-any + meta: {} as any, + }), + new EsErrors.ConnectionError('no connection'), + new EsErrors.TimeoutError('timeout'), + // eslint-disable-next-line @typescript-eslint/no-explicit-any + new EsErrors.ResponseError({ statusCode: 503, meta: {} as any, warnings: [] }), + // eslint-disable-next-line @typescript-eslint/no-explicit-any + new EsErrors.ResponseError({ statusCode: 408, meta: {} as any, warnings: [] }), + // eslint-disable-next-line @typescript-eslint/no-explicit-any + new EsErrors.ResponseError({ statusCode: 410, meta: {} as any, warnings: [] }), + ]; + + for (const error of errors) { + const esCallMock = jest.fn().mockRejectedValueOnce(error).mockResolvedValue('success'); + expect(await retryTransientEsErrors(esCallMock, { logger })).toEqual('success'); + expect(esCallMock).toHaveBeenCalledTimes(2); + } + }); + + it('does not retry unsupported errors', async () => { + const error = new Error('foo!'); + const esCallMock = jest.fn().mockRejectedValueOnce(error).mockResolvedValue('success'); + await expect(retryTransientEsErrors(esCallMock, { logger })).rejects.toThrow(error); + expect(esCallMock).toHaveBeenCalledTimes(1); + }); +}); diff --git a/x-pack/plugins/security_solution/server/lib/risk_engine/utils/retry_transient_es_errors.ts b/x-pack/plugins/security_solution/server/lib/risk_engine/utils/retry_transient_es_errors.ts new file mode 100644 index 0000000000000..7a3839ad3c5bc --- /dev/null +++ b/x-pack/plugins/security_solution/server/lib/risk_engine/utils/retry_transient_es_errors.ts @@ -0,0 +1,58 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { Logger } from '@kbn/core/server'; +import { errors as EsErrors } from '@elastic/elasticsearch'; + +const MAX_ATTEMPTS = 3; + +const retryResponseStatuses = [ + 503, // ServiceUnavailable + 408, // RequestTimeout + 410, // Gone +]; + +const isRetryableError = (e: Error) => + e instanceof EsErrors.NoLivingConnectionsError || + e instanceof EsErrors.ConnectionError || + e instanceof EsErrors.TimeoutError || + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + (e instanceof EsErrors.ResponseError && retryResponseStatuses.includes(e?.statusCode!)); + +const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); + +export const retryTransientEsErrors = async ( + esCall: () => Promise, + { + logger, + attempt = 0, + }: { + logger: Logger; + attempt?: number; + } +): Promise => { + try { + return await esCall(); + } catch (e) { + if (attempt < MAX_ATTEMPTS && isRetryableError(e)) { + const retryCount = attempt + 1; + const retryDelaySec: number = Math.min(Math.pow(2, retryCount), 30); // 2s, 4s, 8s, 16s, 30s, 30s, 30s... + + logger.warn( + `Retrying Elasticsearch operation after [${retryDelaySec}s] due to error: ${e.toString()} ${ + e.stack + }` + ); + + // delay with some randomness + await delay(retryDelaySec * 1000 * Math.random()); + return retryTransientEsErrors(esCall, { logger, attempt: retryCount }); + } + + throw e; + } +}; diff --git a/x-pack/plugins/security_solution/server/plugin.ts b/x-pack/plugins/security_solution/server/plugin.ts index a838c83a31e2b..b9dce72f64671 100644 --- a/x-pack/plugins/security_solution/server/plugin.ts +++ b/x-pack/plugins/security_solution/server/plugin.ts @@ -94,6 +94,8 @@ import { ENDPOINT_FIELDS_SEARCH_STRATEGY, ENDPOINT_SEARCH_STRATEGY, } from '../common/endpoint/constants'; +import { RiskEngineDataClient } from './lib/risk_engine/risk_engine_data_client'; + import { AppFeatures } from './lib/app_features'; export type { SetupPlugins, StartPlugins, PluginSetup, PluginStart } from './plugin_contract'; @@ -118,6 +120,7 @@ export class Plugin implements ISecuritySolutionPlugin { private checkMetadataTransformsTask: CheckMetadataTransformsTask | undefined; private telemetryUsageCounter?: UsageCounter; private endpointContext: EndpointAppContext; + private riskEngineDataClient: RiskEngineDataClient | undefined; constructor(context: PluginInitializerContext) { const serverConfig = createConfig(context); @@ -159,6 +162,18 @@ export class Plugin implements ISecuritySolutionPlugin { this.ruleMonitoringService.setup(core, plugins); + this.riskEngineDataClient = new RiskEngineDataClient({ + logger: this.logger, + kibanaVersion: this.pluginContext.env.packageInfo.version, + elasticsearchClientPromise: core + .getStartServices() + .then(([{ elasticsearch }]) => elasticsearch.client.asInternalUser), + }); + + if (experimentalFeatures.riskScoringPersistence) { + this.riskEngineDataClient.initializeResources({}); + } + const requestContextFactory = new RequestContextFactory({ config, logger, @@ -168,6 +183,7 @@ export class Plugin implements ISecuritySolutionPlugin { ruleMonitoringService: this.ruleMonitoringService, kibanaVersion: pluginContext.env.packageInfo.version, kibanaBranch: pluginContext.env.packageInfo.branch, + riskEngineDataClient: this.riskEngineDataClient, }); const router = core.http.createRouter(); diff --git a/x-pack/plugins/security_solution/server/request_context_factory.ts b/x-pack/plugins/security_solution/server/request_context_factory.ts index e56df95a10650..c5cf4b24750a4 100644 --- a/x-pack/plugins/security_solution/server/request_context_factory.ts +++ b/x-pack/plugins/security_solution/server/request_context_factory.ts @@ -25,6 +25,7 @@ import type { import type { Immutable } from '../common/endpoint/types'; import type { EndpointAuthz } from '../common/endpoint/types/authz'; import type { EndpointAppContextService } from './endpoint/endpoint_app_context_services'; +import type { RiskEngineDataClient } from './lib/risk_engine/risk_engine_data_client'; export interface IRequestContextFactory { create( @@ -42,6 +43,7 @@ interface ConstructorOptions { ruleMonitoringService: IRuleMonitoringService; kibanaVersion: string; kibanaBranch: string; + riskEngineDataClient: RiskEngineDataClient; } export class RequestContextFactory implements IRequestContextFactory { @@ -56,7 +58,15 @@ export class RequestContextFactory implements IRequestContextFactory { request: KibanaRequest ): Promise { const { options, appClientFactory } = this; - const { config, core, plugins, endpointAppContextService, ruleMonitoringService } = options; + const { + config, + core, + plugins, + endpointAppContextService, + ruleMonitoringService, + riskEngineDataClient, + } = options; + const { lists, ruleRegistry, security } = plugins; const [, startPlugins] = await core.getStartServices(); @@ -128,6 +138,8 @@ export class RequestContextFactory implements IRequestContextFactory { }, getInternalFleetServices: memoize(() => endpointAppContextService.getInternalFleetServices()), + + getRiskEngineDataClient: () => riskEngineDataClient, }; } } diff --git a/x-pack/plugins/security_solution/server/types.ts b/x-pack/plugins/security_solution/server/types.ts index 64211998defc5..8326d13ad03a7 100644 --- a/x-pack/plugins/security_solution/server/types.ts +++ b/x-pack/plugins/security_solution/server/types.ts @@ -29,6 +29,7 @@ import type { import type { FrameworkRequest } from './lib/framework'; import type { EndpointAuthz } from '../common/endpoint/types/authz'; import type { EndpointInternalFleetServicesInterface } from './endpoint/services/fleet'; +import type { RiskEngineDataClient } from './lib/risk_engine/risk_engine_data_client'; export { AppClient }; @@ -46,6 +47,7 @@ export interface SecuritySolutionApiRequestHandlerContext { getRacClient: (req: KibanaRequest) => Promise; getExceptionListClient: () => ExceptionListClient | null; getInternalFleetServices: () => EndpointInternalFleetServicesInterface; + getRiskEngineDataClient: () => RiskEngineDataClient; } export type SecuritySolutionRequestHandlerContext = CustomRequestHandlerContext<{ diff --git a/x-pack/test/detection_engine_api_integration/common/config.ts b/x-pack/test/detection_engine_api_integration/common/config.ts index 8454915db9a7d..c4c3c44f1c418 100644 --- a/x-pack/test/detection_engine_api_integration/common/config.ts +++ b/x-pack/test/detection_engine_api_integration/common/config.ts @@ -76,6 +76,7 @@ export function createTestConfig(options: CreateTestConfigOptions, testFiles?: s '--xpack.ruleRegistry.unsafe.legacyMultiTenancy.enabled=true', `--xpack.securitySolution.enableExperimental=${JSON.stringify([ 'previewTelemetryUrlEnabled', + 'riskScoringPersistence', 'riskScoringRoutesEnabled', ])}`, '--xpack.task_manager.poll_interval=1000', diff --git a/x-pack/test/detection_engine_api_integration/security_and_spaces/group10/index.ts b/x-pack/test/detection_engine_api_integration/security_and_spaces/group10/index.ts index c8501050fc25c..6841648145fd3 100644 --- a/x-pack/test/detection_engine_api_integration/security_and_spaces/group10/index.ts +++ b/x-pack/test/detection_engine_api_integration/security_and_spaces/group10/index.ts @@ -37,6 +37,7 @@ export default ({ loadTestFile }: FtrProviderContext): void => { loadTestFile(require.resolve('./throttle')); loadTestFile(require.resolve('./ignore_fields')); loadTestFile(require.resolve('./migrations')); + loadTestFile(require.resolve('./risk_engine_install_resources')); loadTestFile(require.resolve('./risk_engine')); loadTestFile(require.resolve('./set_alert_tags')); }); diff --git a/x-pack/test/detection_engine_api_integration/security_and_spaces/group10/risk_engine_install_resources.ts b/x-pack/test/detection_engine_api_integration/security_and_spaces/group10/risk_engine_install_resources.ts new file mode 100644 index 0000000000000..a7cae20fa8b34 --- /dev/null +++ b/x-pack/test/detection_engine_api_integration/security_and_spaces/group10/risk_engine_install_resources.ts @@ -0,0 +1,150 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import expect from '@kbn/expect'; +import { FtrProviderContext } from '../../common/ftr_provider_context'; + +// eslint-disable-next-line import/no-default-export +export default ({ getService }: FtrProviderContext) => { + const es = getService('es'); + + describe('install risk engine resources', () => { + it('should install resources on startup', async () => { + const ilmPolicyName = '.risk-score-ilm-policy'; + const componentTemplateName = '.risk-score-mappings'; + const indexTemplateName = '.risk-score.risk-score-default-index-template'; + const indexName = 'risk-score.risk-score-default'; + + const ilmPolicy = await es.ilm.getLifecycle({ + name: ilmPolicyName, + }); + + expect(ilmPolicy[ilmPolicyName].policy).to.eql({ + _meta: { + managed: true, + }, + phases: { + hot: { + min_age: '0ms', + actions: { + rollover: { + max_age: '30d', + max_primary_shard_size: '50gb', + }, + }, + }, + }, + }); + + const { component_templates: componentTemplates1 } = await es.cluster.getComponentTemplate({ + name: componentTemplateName, + }); + + expect(componentTemplates1.length).to.eql(1); + const componentTemplate = componentTemplates1[0]; + + expect(componentTemplate.name).to.eql(componentTemplateName); + expect(componentTemplate.component_template.template.mappings).to.eql({ + dynamic: 'strict', + properties: { + '@timestamp': { + type: 'date', + }, + alertsScore: { + type: 'float', + }, + identifierField: { + type: 'keyword', + }, + identifierValue: { + type: 'keyword', + }, + level: { + type: 'keyword', + }, + otherScore: { + type: 'float', + }, + riskiestInputs: { + properties: { + id: { + type: 'keyword', + }, + index: { + type: 'keyword', + }, + riskScore: { + type: 'float', + }, + }, + type: 'nested', + }, + totalScore: { + type: 'float', + }, + totalScoreNormalized: { + type: 'float', + }, + }, + }); + + const { index_templates: indexTemplates } = await es.indices.getIndexTemplate({ + name: indexTemplateName, + }); + expect(indexTemplates.length).to.eql(1); + const indexTemplate = indexTemplates[0]; + expect(indexTemplate.name).to.eql(indexTemplateName); + expect(indexTemplate.index_template.index_patterns).to.eql(['risk-score.risk-score-default']); + expect(indexTemplate.index_template.composed_of).to.eql(['.risk-score-mappings']); + expect(indexTemplate.index_template.template!.mappings?.dynamic).to.eql(false); + expect(indexTemplate.index_template.template!.mappings?._meta?.managed).to.eql(true); + expect(indexTemplate.index_template.template!.mappings?._meta?.namespace).to.eql('default'); + expect(indexTemplate.index_template.template!.mappings?._meta?.kibana?.version).to.be.a( + 'string' + ); + expect(indexTemplate.index_template.template!.settings).to.eql({ + index: { + lifecycle: { + name: '.risk-score-ilm-policy', + }, + mapping: { + total_fields: { + limit: '1000', + }, + }, + hidden: 'true', + auto_expand_replicas: '0-1', + }, + }); + + const dsResponse = await es.indices.get({ + index: indexName, + }); + + const dataStream = Object.values(dsResponse).find((ds) => ds.data_stream === indexName); + + expect(dataStream?.mappings?._meta?.managed).to.eql(true); + expect(dataStream?.mappings?._meta?.namespace).to.eql('default'); + expect(dataStream?.mappings?._meta?.kibana?.version).to.be.a('string'); + expect(dataStream?.mappings?.dynamic).to.eql('false'); + + expect(dataStream?.settings?.index?.lifecycle).to.eql({ + name: '.risk-score-ilm-policy', + }); + + expect(dataStream?.settings?.index?.mapping).to.eql({ + total_fields: { + limit: '1000', + }, + }); + + expect(dataStream?.settings?.index?.hidden).to.eql('true'); + expect(dataStream?.settings?.index?.number_of_shards).to.eql(1); + expect(dataStream?.settings?.index?.auto_expand_replicas).to.eql('0-1'); + }); + }); +};