Skip to content

Commit

Permalink
Add transforms creation to the CSP plugin initialization (#129905)
Browse files Browse the repository at this point in the history
* add transforms

* [CI] Auto-commit changed files from 'node scripts/eslint --no-cache --fix'

* catch 404

* add hyphen

* start transforms, promise.all

* return is clearer

* add tests

* transform rename

* add test

* use exact pattern

* only start if created

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
  • Loading branch information
eyalkraft and kibanamachine authored Apr 18, 2022
1 parent 509f1da commit d55c7b3
Show file tree
Hide file tree
Showing 7 changed files with 395 additions and 16 deletions.
6 changes: 5 additions & 1 deletion x-pack/plugins/cloud_security_posture/common/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@ export const UPDATE_RULES_CONFIG_ROUTE_PATH =

export const CSP_FINDINGS_INDEX_NAME = 'findings';
export const CIS_KUBERNETES_PACKAGE_NAME = 'cis_kubernetes_benchmark';
export const FINDINGS_DATA_STREAM_NAME =
// Currently 'cis_kubernetes_benchmark.findings', To be refactored to 'cloud_security_posture.findings'
CIS_KUBERNETES_PACKAGE_NAME + '.' + CSP_FINDINGS_INDEX_NAME;
export const LATEST_FINDINGS_INDEX_NAME = 'cloud_security_posture.findings_latest';
export const BENCHMARK_SCORE_INDEX_NAME = 'cloud_security_posture.scores';

export const AGENT_LOGS_INDEX_PATTERN = '.logs-cis_kubernetes_benchmark.metadata*';
export const CSP_KUBEBEAT_INDEX_PATTERN = 'logs-cis_kubernetes_benchmark.findings*';
export const CSP_KUBEBEAT_INDEX_PATTERN = 'logs-cis_kubernetes_benchmark.findings-*';
export const FINDINGS_INDEX_PATTERN = 'logs-' + FINDINGS_DATA_STREAM_NAME + '-default';
export const LATEST_FINDINGS_INDEX_PATTERN = 'logs-' + LATEST_FINDINGS_INDEX_NAME + '-default';
export const BENCHMARK_SCORE_INDEX_PATTERN = 'logs-' + BENCHMARK_SCORE_INDEX_NAME + '-default';

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,22 @@ export const initializeCspTransformsIndices = async (
esClient: ElasticsearchClient,
logger: Logger
) => {
createIndexIfNotExists(
esClient,
LATEST_FINDINGS_INDEX_NAME,
LATEST_FINDINGS_INDEX_PATTERN,
latestFindingsMapping,
logger
);
createIndexIfNotExists(
esClient,
BENCHMARK_SCORE_INDEX_NAME,
BENCHMARK_SCORE_INDEX_PATTERN,
benchmarkScoreMapping,
logger
);
return Promise.all([
createIndexIfNotExists(
esClient,
LATEST_FINDINGS_INDEX_NAME,
LATEST_FINDINGS_INDEX_PATTERN,
latestFindingsMapping,
logger
),
createIndexIfNotExists(
esClient,
BENCHMARK_SCORE_INDEX_NAME,
BENCHMARK_SCORE_INDEX_PATTERN,
benchmarkScoreMapping,
logger
),
]);
};

export const createIndexIfNotExists = async (
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import type { TransformPutTransformRequest } from '@elastic/elasticsearch/lib/api/types';
import {
LATEST_FINDINGS_INDEX_PATTERN,
BENCHMARK_SCORE_INDEX_PATTERN,
} from '../../common/constants';

export const benchmarkScoreTransform: TransformPutTransformRequest = {
transform_id: 'cloud_security_posture.score-default-0.0.1',
description: 'Calculate latest findings score',
source: {
index: LATEST_FINDINGS_INDEX_PATTERN,
},
dest: {
index: BENCHMARK_SCORE_INDEX_PATTERN,
},
frequency: '30m',
sync: {
time: {
field: 'event.ingested',
delay: '60s',
},
},
retention_policy: {
time: {
field: '@timestamp',
max_age: '30d',
},
},
pivot: {
group_by: {
'@timestamp': {
date_histogram: {
field: '@timestamp',
calendar_interval: '1m',
},
},
},
aggregations: {
total_findings: {
value_count: {
field: 'result.evaluation.keyword',
},
},
passed_findings: {
filter: {
term: {
'result.evaluation.keyword': 'passed',
},
},
},
failed_findings: {
filter: {
term: {
'result.evaluation.keyword': 'failed',
},
},
},
score_by_cluster_id: {
terms: {
field: 'cluster_id.keyword',
},
aggregations: {
total_findings: {
value_count: {
field: 'result.evaluation.keyword',
},
},
passed_findings: {
filter: {
term: {
'result.evaluation.keyword': 'passed',
},
},
},
failed_findings: {
filter: {
term: {
'result.evaluation.keyword': 'failed',
},
},
},
},
},
},
},
_meta: {
managed: 'true',
},
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

// eslint-disable-next-line @kbn/eslint/no-restricted-paths
import { elasticsearchClientMock } from '@kbn/core/server/elasticsearch/client/mocks';
import { loggingSystemMock } from '@kbn/core/server/mocks';
import { createTransformIfNotExists, startTransformIfNotStarted } from './create_transforms';
import { latestFindingsTransform } from './latest_findings_transform';

const mockEsClient = elasticsearchClientMock.createClusterClient().asScoped().asInternalUser;

describe('createTransformIfNotExist', () => {
let logger: ReturnType<typeof loggingSystemMock.createLogger>;

beforeEach(() => {
logger = loggingSystemMock.createLogger();
jest.resetAllMocks();
});

it('expect not to create if already exists', async () => {
mockEsClient.transform.getTransform.mockResolvedValue({ transforms: [], count: 1 });
await createTransformIfNotExists(mockEsClient, latestFindingsTransform, logger);
expect(mockEsClient.transform.getTransform).toHaveBeenCalledTimes(1);
expect(mockEsClient.transform.getTransform).toHaveBeenCalledWith({
transform_id: latestFindingsTransform.transform_id,
});
expect(mockEsClient.transform.putTransform).toHaveBeenCalledTimes(0);
});

it('expect to create if does not already exist', async () => {
mockEsClient.transform.getTransform.mockRejectedValue({ statusCode: 404 });
await createTransformIfNotExists(mockEsClient, latestFindingsTransform, logger);
expect(mockEsClient.transform.getTransform).toHaveBeenCalledTimes(1);
expect(mockEsClient.transform.getTransform).toHaveBeenCalledWith({
transform_id: latestFindingsTransform.transform_id,
});
expect(mockEsClient.transform.putTransform).toHaveBeenCalledTimes(1);
expect(mockEsClient.transform.putTransform).toHaveBeenCalledWith(latestFindingsTransform);
});

it('expect not to create if get error is not 404', async () => {
mockEsClient.transform.getTransform.mockRejectedValue({ statusCode: 400 });
await createTransformIfNotExists(mockEsClient, latestFindingsTransform, logger);
expect(mockEsClient.transform.getTransform).toHaveBeenCalledTimes(1);
expect(mockEsClient.transform.putTransform).toHaveBeenCalledTimes(0);
});
});

function getTransformWithState(state: string) {
return {
state,
checkpointing: { last: { checkpoint: 1 } },
id: '',
stats: {
documents_indexed: 0,
documents_processed: 0,
exponential_avg_checkpoint_duration_ms: 0,
exponential_avg_documents_indexed: 0,
exponential_avg_documents_processed: 0,
index_failures: 0,
index_time_in_ms: 0,
index_total: 0,
pages_processed: 0,
processing_time_in_ms: 0,
processing_total: 0,
search_failures: 0,
search_time_in_ms: 0,
search_total: 0,
trigger_count: 0,
},
};
}

describe('startTransformIfNotStarted', () => {
let logger: ReturnType<typeof loggingSystemMock.createLogger>;

beforeEach(() => {
logger = loggingSystemMock.createLogger();
jest.resetAllMocks();
});

['failed', 'stopping', 'started', 'aborting', 'indexing'].forEach((state) =>
it(`expect not to start if state is ${state}`, async () => {
mockEsClient.transform.getTransformStats.mockResolvedValue({
transforms: [getTransformWithState(state)],
count: 1,
});
await startTransformIfNotStarted(mockEsClient, latestFindingsTransform.transform_id, logger);
expect(mockEsClient.transform.getTransformStats).toHaveBeenCalledTimes(1);
expect(mockEsClient.transform.getTransformStats).toHaveBeenCalledWith({
transform_id: latestFindingsTransform.transform_id,
});
expect(mockEsClient.transform.startTransform).toHaveBeenCalledTimes(0);
})
);

it('expect not to start if transform not found', async () => {
mockEsClient.transform.getTransformStats.mockResolvedValue({
transforms: [],
count: 0,
});
await startTransformIfNotStarted(mockEsClient, latestFindingsTransform.transform_id, logger);
expect(mockEsClient.transform.getTransformStats).toHaveBeenCalledTimes(1);
expect(mockEsClient.transform.getTransformStats).toHaveBeenCalledWith({
transform_id: latestFindingsTransform.transform_id,
});
expect(mockEsClient.transform.startTransform).toHaveBeenCalledTimes(0);
});

it('expect to start if state is stopped', async () => {
mockEsClient.transform.getTransformStats.mockResolvedValue({
transforms: [getTransformWithState('stopped')],
count: 1,
});
await startTransformIfNotStarted(mockEsClient, latestFindingsTransform.transform_id, logger);
expect(mockEsClient.transform.getTransformStats).toHaveBeenCalledTimes(1);
expect(mockEsClient.transform.getTransformStats).toHaveBeenCalledWith({
transform_id: latestFindingsTransform.transform_id,
});
expect(mockEsClient.transform.startTransform).toHaveBeenCalledTimes(1);
expect(mockEsClient.transform.startTransform).toHaveBeenCalledWith({
transform_id: latestFindingsTransform.transform_id,
});
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { transformError } from '@kbn/securitysolution-es-utils';
import { TransformPutTransformRequest } from '@elastic/elasticsearch/lib/api/types';
import type { ElasticsearchClient, Logger } from '@kbn/core/server';
import { latestFindingsTransform } from './latest_findings_transform';
import { benchmarkScoreTransform } from './benchmark_score_transform';

// TODO: Move transforms to integration package
export const initializeCspTransforms = async (esClient: ElasticsearchClient, logger: Logger) => {
return Promise.all([
initializeTransform(esClient, latestFindingsTransform, logger),
initializeTransform(esClient, benchmarkScoreTransform, logger),
]);
};

export const initializeTransform = async (
esClient: ElasticsearchClient,
transform: TransformPutTransformRequest,
logger: Logger
) => {
return createTransformIfNotExists(esClient, transform, logger).then((succeeded) => {
if (succeeded) {
startTransformIfNotStarted(esClient, transform.transform_id, logger);
}
});
};

/**
* Checks if a transform exists, And if not creates it
*
* @param transform - the transform to create. If a transform with the same transform_id already exists, nothing is created or updated.
*
* @return true if the transform exits or created, false otherwise.
*/
export const createTransformIfNotExists = async (
esClient: ElasticsearchClient,
transform: TransformPutTransformRequest,
logger: Logger
) => {
try {
await esClient.transform.getTransform({
transform_id: transform.transform_id,
});
return true;
} catch (existErr) {
const existError = transformError(existErr);
if (existError.statusCode === 404) {
try {
await esClient.transform.putTransform(transform);
return true;
} catch (createErr) {
const createError = transformError(createErr);
logger.error(
`Failed to create transform ${transform.transform_id}: ${createError.message}`
);
}
} else {
logger.error(
`Failed to check if transform ${transform.transform_id} exists: ${existError.message}`
);
}
}
return false;
};

export const startTransformIfNotStarted = async (
esClient: ElasticsearchClient,
transformId: string,
logger: Logger
) => {
try {
const transformStats = await esClient.transform.getTransformStats({
transform_id: transformId,
});
if (transformStats.count <= 0) {
logger.error(`Failed starting transform ${transformId}: couldn't find transform`);
return;
}
const fetchedTransformStats = transformStats.transforms[0];
if (fetchedTransformStats.state === 'stopped') {
try {
return await esClient.transform.startTransform({ transform_id: transformId });
} catch (startErr) {
const startError = transformError(startErr);
logger.error(`Failed starting transform ${transformId}: ${startError.message}`);
}
} else if (
fetchedTransformStats.state === 'stopping' ||
fetchedTransformStats.state === 'aborting' ||
fetchedTransformStats.state === 'failed'
) {
logger.error(
`Not starting transform ${transformId} since it's state is: ${fetchedTransformStats.state}`
);
}
} catch (statsErr) {
const statsError = transformError(statsErr);
logger.error(`Failed to check if transform ${transformId} is started: ${statsError.message}`);
}
};
Loading

0 comments on commit d55c7b3

Please sign in to comment.