diff --git a/x-pack/plugins/apm/server/plugin.ts b/x-pack/plugins/apm/server/plugin.ts index cf5be4369f79e3..8d83f762e2023e 100644 --- a/x-pack/plugins/apm/server/plugin.ts +++ b/x-pack/plugins/apm/server/plugin.ts @@ -130,19 +130,20 @@ export class APMPlugin registerFeaturesUsage({ licensingPlugin: plugins.licensing }); + const { ruleDataService } = plugins.ruleRegistry; const getCoreStart = () => core.getStartServices().then(([coreStart]) => coreStart); const ready = once(async () => { - const componentTemplateName = plugins.ruleRegistry.getFullAssetName( + const componentTemplateName = ruleDataService.getFullAssetName( 'apm-mappings' ); - if (!plugins.ruleRegistry.isWriteEnabled()) { + if (!ruleDataService.isWriteEnabled()) { return; } - await plugins.ruleRegistry.createOrUpdateComponentTemplate({ + await ruleDataService.createOrUpdateComponentTemplate({ name: componentTemplateName, body: { template: { @@ -167,16 +168,14 @@ export class APMPlugin }, }); - await plugins.ruleRegistry.createOrUpdateIndexTemplate({ - name: plugins.ruleRegistry.getFullAssetName('apm-index-template'), + await ruleDataService.createOrUpdateIndexTemplate({ + name: ruleDataService.getFullAssetName('apm-index-template'), body: { index_patterns: [ - plugins.ruleRegistry.getFullAssetName('observability-apm*'), + ruleDataService.getFullAssetName('observability-apm*'), ], composed_of: [ - plugins.ruleRegistry.getFullAssetName( - TECHNICAL_COMPONENT_TEMPLATE_NAME - ), + ruleDataService.getFullAssetName(TECHNICAL_COMPONENT_TEMPLATE_NAME), componentTemplateName, ], }, @@ -188,7 +187,7 @@ export class APMPlugin }); const ruleDataClient = new RuleDataClient({ - alias: plugins.ruleRegistry.getFullAssetName('observability-apm'), + alias: ruleDataService.getFullAssetName('observability-apm'), getClusterClient: async () => { const coreStart = await getCoreStart(); return coreStart.elasticsearch.client.asInternalUser; diff --git a/x-pack/plugins/observability/server/plugin.ts b/x-pack/plugins/observability/server/plugin.ts index 046a9a62d5fa79..9eff1b08cead98 100644 --- a/x-pack/plugins/observability/server/plugin.ts +++ b/x-pack/plugins/observability/server/plugin.ts @@ -57,7 +57,7 @@ export class ObservabilityPlugin implements Plugin { return coreStart.elasticsearch.client.asInternalUser; }, ready: () => Promise.resolve(), - alias: plugins.ruleRegistry.getFullAssetName(), + alias: plugins.ruleRegistry.ruleDataService.getFullAssetName(), }); registerRoutes({ diff --git a/x-pack/plugins/rule_registry/kibana.json b/x-pack/plugins/rule_registry/kibana.json index 7e3f8bf6afb721..8c1e8d0f5e40ed 100644 --- a/x-pack/plugins/rule_registry/kibana.json +++ b/x-pack/plugins/rule_registry/kibana.json @@ -8,6 +8,8 @@ ], "requiredPlugins": [ "alerting", + "data", + "spaces", "triggersActionsUi" ], "server": true diff --git a/x-pack/plugins/rule_registry/server/config.ts b/x-pack/plugins/rule_registry/server/config.ts new file mode 100644 index 00000000000000..498b6d16a6fdac --- /dev/null +++ b/x-pack/plugins/rule_registry/server/config.ts @@ -0,0 +1,20 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { schema, TypeOf } from '@kbn/config-schema'; + +export const config = { + schema: schema.object({ + enabled: schema.boolean({ defaultValue: true }), + write: schema.object({ + enabled: schema.boolean({ defaultValue: true }), + }), + index: schema.string({ defaultValue: '.alerts' }), + }), +}; + +export type RuleRegistryPluginConfig = TypeOf; diff --git a/x-pack/plugins/rule_registry/server/event_log/elasticsearch/index.ts b/x-pack/plugins/rule_registry/server/event_log/elasticsearch/index.ts new file mode 100644 index 00000000000000..1941208ed07cdf --- /dev/null +++ b/x-pack/plugins/rule_registry/server/event_log/elasticsearch/index.ts @@ -0,0 +1,14 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export * from './index_bootstrapper'; +export * from './index_management_gateway'; +export * from './index_reader'; +export * from './index_writer'; +export * from './resources/ilm_policy'; +export * from './resources/index_mappings'; +export * from './resources/index_names'; diff --git a/x-pack/plugins/rule_registry/server/event_log/elasticsearch/index_bootstrapper.ts b/x-pack/plugins/rule_registry/server/event_log/elasticsearch/index_bootstrapper.ts new file mode 100644 index 00000000000000..b0c3927cd7dfe6 --- /dev/null +++ b/x-pack/plugins/rule_registry/server/event_log/elasticsearch/index_bootstrapper.ts @@ -0,0 +1,107 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { PublicMethodsOf } from '@kbn/utility-types'; +import { Logger } from 'src/core/server'; + +import { IndexNames } from './resources/index_names'; +import { IndexMappings } from './resources/index_mappings'; +import { createIndexTemplate } from './resources/index_template'; +import { IlmPolicy, defaultIlmPolicy } from './resources/ilm_policy'; +import { IIndexManagementGateway } from './index_management_gateway'; + +interface ConstructorParams { + gateway: IIndexManagementGateway; + logger: Logger; +} + +export interface IndexSpecification { + indexNames: IndexNames; + indexMappings: IndexMappings; + ilmPolicy?: IlmPolicy; +} + +export type IIndexBootstrapper = PublicMethodsOf; + +// TODO: Converge with the logic of .siem-signals index bootstrapping +// x-pack/plugins/security_solution/server/lib/detection_engine/routes/index/create_index_route.ts + +// TODO: Handle race conditions and potential errors between multiple instances of Kibana +// trying to bootstrap the same index. Possible options: +// - robust idempotent logic with error handling +// - leveraging task_manager to make sure bootstrapping is run only once at a time +// - using some sort of distributed lock +// Maybe we can check how Saved Objects service bootstraps .kibana index + +export class IndexBootstrapper { + private readonly gateway: IIndexManagementGateway; + private readonly logger: Logger; + + constructor(params: ConstructorParams) { + this.gateway = params.gateway; + this.logger = params.logger.get('IndexBootstrapper'); + } + + public async run(indexSpec: IndexSpecification): Promise { + this.logger.debug('bootstrapping elasticsearch resources starting'); + + try { + const { indexNames, indexMappings, ilmPolicy } = indexSpec; + await this.createIlmPolicyIfNotExists(indexNames, ilmPolicy); + await this.createIndexTemplateIfNotExists(indexNames, indexMappings); + await this.createInitialIndexIfNotExists(indexNames); + } catch (err) { + this.logger.error(`error bootstrapping elasticsearch resources: ${err.message}`); + return false; + } + + this.logger.debug('bootstrapping elasticsearch resources complete'); + return true; + } + + private async createIlmPolicyIfNotExists(names: IndexNames, policy?: IlmPolicy): Promise { + const { indexIlmPolicyName } = names; + + const exists = await this.gateway.doesIlmPolicyExist(indexIlmPolicyName); + if (!exists) { + const ilmPolicy = policy ?? defaultIlmPolicy; + await this.gateway.createIlmPolicy(indexIlmPolicyName, ilmPolicy); + } + } + + private async createIndexTemplateIfNotExists( + names: IndexNames, + mappings: IndexMappings + ): Promise { + const { indexTemplateName } = names; + + const templateVersion = 1; // TODO: get from EventSchema definition + const template = createIndexTemplate(names, mappings, templateVersion); + + const exists = await this.gateway.doesIndexTemplateExist(indexTemplateName); + if (!exists) { + await this.gateway.createIndexTemplate(indexTemplateName, template); + } else { + await this.gateway.updateIndexTemplate(indexTemplateName, template); + } + } + + private async createInitialIndexIfNotExists(names: IndexNames): Promise { + const { indexAliasName, indexInitialName } = names; + + const exists = await this.gateway.doesAliasExist(indexAliasName); + if (!exists) { + await this.gateway.createIndex(indexInitialName, { + aliases: { + [indexAliasName]: { + is_write_index: true, + }, + }, + }); + } + } +} diff --git a/x-pack/plugins/rule_registry/server/event_log/elasticsearch/index_management_gateway.ts b/x-pack/plugins/rule_registry/server/event_log/elasticsearch/index_management_gateway.ts new file mode 100644 index 00000000000000..96b5860e143eea --- /dev/null +++ b/x-pack/plugins/rule_registry/server/event_log/elasticsearch/index_management_gateway.ts @@ -0,0 +1,139 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { PublicMethodsOf } from '@kbn/utility-types'; +import { ElasticsearchClient, Logger } from 'src/core/server'; +import { IlmPolicy } from './resources/ilm_policy'; +import { IndexTemplate } from './resources/index_template'; + +interface ConstructorParams { + elasticsearch: Promise; + logger: Logger; +} + +export type IIndexManagementGateway = PublicMethodsOf; + +export class IndexManagementGateway { + private readonly elasticsearch: Promise; + private readonly logger: Logger; + + constructor(params: ConstructorParams) { + this.elasticsearch = params.elasticsearch; + this.logger = params.logger.get('IndexManagementGateway'); + } + + public async doesIlmPolicyExist(policyName: string): Promise { + this.logger.debug(`Checking if ILM policy exists; name="${policyName}"`); + + try { + const es = await this.elasticsearch; + await es.transport.request({ + method: 'GET', + path: `/_ilm/policy/${policyName}`, + }); + } catch (e) { + if (e.statusCode === 404) return false; + throw new Error(`Error checking existence of ILM policy: ${e.message}`); + } + return true; + } + + public async createIlmPolicy(policyName: string, policy: IlmPolicy): Promise { + this.logger.debug(`Creating ILM policy; name="${policyName}"`); + + try { + const es = await this.elasticsearch; + await es.transport.request({ + method: 'PUT', + path: `/_ilm/policy/${policyName}`, + body: policy, + }); + } catch (e) { + throw new Error(`Error creating ILM policy: ${e.message}`); + } + } + + public async doesIndexTemplateExist(templateName: string): Promise { + this.logger.debug(`Checking if index template exists; name="${templateName}"`); + + try { + const es = await this.elasticsearch; + const { body } = await es.indices.existsTemplate({ name: templateName }); + return body as boolean; + } catch (e) { + throw new Error(`Error checking existence of index template: ${e.message}`); + } + } + + public async createIndexTemplate(templateName: string, template: IndexTemplate): Promise { + this.logger.debug(`Creating index template; name="${templateName}"`); + + try { + const es = await this.elasticsearch; + await es.indices.putTemplate({ create: true, name: templateName, body: template }); + } catch (e) { + // The error message doesn't have a type attribute we can look to guarantee it's due + // to the template already existing (only long message) so we'll check ourselves to see + // if the template now exists. This scenario would happen if you startup multiple Kibana + // instances at the same time. + const existsNow = await this.doesIndexTemplateExist(templateName); + if (!existsNow) { + const error = new Error(`Error creating index template: ${e.message}`); + Object.assign(error, { wrapped: e }); + throw error; + } + } + } + + public async updateIndexTemplate(templateName: string, template: IndexTemplate): Promise { + this.logger.debug(`Updating index template; name="${templateName}"`); + + try { + const { settings, ...templateWithoutSettings } = template; + + const es = await this.elasticsearch; + await es.indices.putTemplate({ + create: false, + name: templateName, + body: templateWithoutSettings, + }); + } catch (e) { + throw new Error(`Error updating index template: ${e.message}`); + } + } + + public async doesAliasExist(aliasName: string): Promise { + this.logger.debug(`Checking if index alias exists; name="${aliasName}"`); + + try { + const es = await this.elasticsearch; + const { body } = await es.indices.existsAlias({ name: aliasName }); + return body as boolean; + } catch (e) { + throw new Error(`Error checking existence of initial index: ${e.message}`); + } + } + + public async createIndex(indexName: string, body: Record = {}): Promise { + this.logger.debug(`Creating index; name="${indexName}"`); + this.logger.debug(JSON.stringify(body, null, 2)); + + try { + const es = await this.elasticsearch; + await es.indices.create({ + index: indexName, + body, + }); + } catch (e) { + if (e.body?.error?.type !== 'resource_already_exists_exception') { + this.logger.error(e); + this.logger.error(JSON.stringify(e.meta, null, 2)); + throw new Error(`Error creating initial index: ${e.message}`); + } + } + } +} diff --git a/x-pack/plugins/rule_registry/server/event_log/elasticsearch/index_reader.ts b/x-pack/plugins/rule_registry/server/event_log/elasticsearch/index_reader.ts new file mode 100644 index 00000000000000..84c0b41f7e1a03 --- /dev/null +++ b/x-pack/plugins/rule_registry/server/event_log/elasticsearch/index_reader.ts @@ -0,0 +1,44 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { PublicMethodsOf } from '@kbn/utility-types'; +import { estypes } from '@elastic/elasticsearch'; +import { Logger, ElasticsearchClient } from 'src/core/server'; + +interface ConstructorParams { + indexName: string; + elasticsearch: Promise; + logger: Logger; +} + +export type IIndexReader = PublicMethodsOf; + +export class IndexReader { + private readonly indexName: string; + private readonly elasticsearch: Promise; + private readonly logger: Logger; + + constructor(params: ConstructorParams) { + this.indexName = params.indexName; + this.elasticsearch = params.elasticsearch; + this.logger = params.logger.get('IndexReader'); + } + + public async search(request: estypes.SearchRequest) { + const requestToSend: estypes.SearchRequest = { + ...request, + index: this.indexName, + }; + + this.logger.debug(`Searching; request: ${JSON.stringify(requestToSend, null)}`); + + const esClient = await this.elasticsearch; + const response = await esClient.search(requestToSend); + + return response; + } +} diff --git a/x-pack/plugins/rule_registry/server/event_log/elasticsearch/index_writer.ts b/x-pack/plugins/rule_registry/server/event_log/elasticsearch/index_writer.ts new file mode 100644 index 00000000000000..7f83421ec80d85 --- /dev/null +++ b/x-pack/plugins/rule_registry/server/event_log/elasticsearch/index_writer.ts @@ -0,0 +1,94 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { PublicMethodsOf } from '@kbn/utility-types'; +import util from 'util'; +import { Logger, ElasticsearchClient } from 'src/core/server'; +import { BufferedStream } from './utils/buffered_stream'; + +type Document = Record; + +interface BufferItem { + index: string; + doc: Document; +} + +interface ConstructorParams { + indexName: string; + elasticsearch: Promise; + isWriteEnabled: boolean; + logger: Logger; +} + +export type IIndexWriter = PublicMethodsOf; + +export class IndexWriter { + private readonly indexName: string; + private readonly elasticsearch: Promise; + private readonly isWriteEnabled: boolean; + private readonly logger: Logger; + private readonly buffer: BufferedStream; + + constructor(params: ConstructorParams) { + this.indexName = params.indexName; + this.elasticsearch = params.elasticsearch; + this.isWriteEnabled = params.isWriteEnabled; + this.logger = params.logger.get('IndexWriter'); + + this.buffer = new BufferedStream({ + flush: (items) => this.bulkIndex(items), + }); + } + + public indexOne(doc: Document): void { + if (this.isWriteEnabled) { + this.logger.debug('Buffering 1 document'); + this.buffer.enqueue({ index: this.indexName, doc }); + } + } + + public indexMany(docs: Document[]): void { + if (this.isWriteEnabled) { + this.logger.debug(`Buffering ${docs.length} documents`); + docs.forEach((doc) => { + this.buffer.enqueue({ index: this.indexName, doc }); + }); + } + } + + public async close(): Promise { + await this.buffer.closeAndWaitUntilFlushed(); + } + + private async bulkIndex(items: BufferItem[]): Promise { + this.logger.debug(`Indexing ${items.length} documents`); + + const bulkBody: Array> = []; + + for (const item of items) { + if (item.doc === undefined) continue; + + bulkBody.push({ create: { _index: item.index } }); + bulkBody.push(item.doc); + } + + try { + const es = await this.elasticsearch; + const response = await es.bulk({ body: bulkBody }); + + if (response.body.errors) { + const error = new Error('Error writing some bulk events'); + error.stack += '\n' + util.inspect(response.body.items, { depth: null }); + this.logger.error(error); + } + } catch (e) { + this.logger.error( + `error writing bulk events: "${e.message}"; docs: ${JSON.stringify(bulkBody)}` + ); + } + } +} diff --git a/x-pack/plugins/rule_registry/server/event_log/elasticsearch/resources/ilm_policy.ts b/x-pack/plugins/rule_registry/server/event_log/elasticsearch/resources/ilm_policy.ts new file mode 100644 index 00000000000000..00fc9131523ace --- /dev/null +++ b/x-pack/plugins/rule_registry/server/event_log/elasticsearch/resources/ilm_policy.ts @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { estypes } from '@elastic/elasticsearch'; + +export interface IlmPolicy { + policy: estypes.Policy; +} + +export const defaultIlmPolicy: IlmPolicy = { + policy: { + phases: { + hot: { + min_age: '0ms', + actions: { + rollover: { + max_age: '90d', + max_size: '50gb', + }, + }, + }, + delete: { + actions: { + delete: {}, + }, + }, + }, + }, +}; diff --git a/x-pack/plugins/rule_registry/server/event_log/elasticsearch/resources/index_mappings.ts b/x-pack/plugins/rule_registry/server/event_log/elasticsearch/resources/index_mappings.ts new file mode 100644 index 00000000000000..064bde5001f7bf --- /dev/null +++ b/x-pack/plugins/rule_registry/server/event_log/elasticsearch/resources/index_mappings.ts @@ -0,0 +1,12 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export interface IndexMappings { + dynamic: 'strict' | boolean; + properties: Record; + _meta?: Record; +} diff --git a/x-pack/plugins/rule_registry/server/event_log/elasticsearch/resources/index_names.ts b/x-pack/plugins/rule_registry/server/event_log/elasticsearch/resources/index_names.ts new file mode 100644 index 00000000000000..1082c62b95e70d --- /dev/null +++ b/x-pack/plugins/rule_registry/server/event_log/elasticsearch/resources/index_names.ts @@ -0,0 +1,84 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export interface IndexParams { + /** @example '.alerts' */ + indexPrefix: string; + + /** @example 'security', 'security.alerts', 'observability.events' */ + logName: string; + + /** @example 'default' */ + kibanaSpaceId: string; +} + +export interface IndexNames extends IndexParams { + /** @example '.alerts-security.alerts' */ + indexBaseName: string; + + /** @example '.alerts-security.alerts-*' */ + indexBasePattern: string; + + /** @example '.alerts-security.alerts-default' */ + indexAliasName: string; + + /** @example '.alerts-security.alerts-default-*' */ + indexAliasPattern: string; + + /** @example '.alerts-security.alerts-default-policy' */ + indexIlmPolicyName: string; + + /** @example '.alerts-security.alerts-default-template' */ + indexTemplateName: string; + + /** @example '.alerts-security.alerts-default-000001' */ + indexInitialName: string; +} + +export abstract class IndexNames { + public static create(params: IndexParams): IndexNames { + const { indexPrefix, logName, kibanaSpaceId } = params; + + // TODO: validate params + + const indexBaseName = joinWithDash(indexPrefix, logName); + const indexBasePattern = joinWithDash(indexPrefix, logName, '*'); + const indexAliasName = joinWithDash(indexPrefix, logName, kibanaSpaceId); + const indexAliasPattern = joinWithDash(indexPrefix, logName, kibanaSpaceId, '*'); + const indexIlmPolicyName = joinWithDash(indexPrefix, logName, kibanaSpaceId, 'policy'); + const indexTemplateName = joinWithDash(indexPrefix, logName, kibanaSpaceId, 'template'); + const indexInitialName = joinWithDash(indexPrefix, logName, kibanaSpaceId, '000001'); + + return { + indexPrefix, + logName, + kibanaSpaceId, + indexBaseName, + indexBasePattern, + indexAliasName, + indexAliasPattern, + indexIlmPolicyName, + indexTemplateName, + indexInitialName, + }; + } + + public static createChild(parent: IndexNames, logName: string): IndexNames { + return this.create({ + indexPrefix: parent.indexPrefix, + logName: this.createChildLogName(parent.logName, logName), + kibanaSpaceId: parent.kibanaSpaceId, + }); + } + + public static createChildLogName(parentLogName: string, logName: string): string { + return joinWithDot(parentLogName, logName); + } +} + +const joinWithDash = (...names: string[]): string => names.join('-'); +const joinWithDot = (...names: string[]): string => names.join('.'); diff --git a/x-pack/plugins/rule_registry/server/event_log/elasticsearch/resources/index_template.ts b/x-pack/plugins/rule_registry/server/event_log/elasticsearch/resources/index_template.ts new file mode 100644 index 00000000000000..caf71dadf6e1e5 --- /dev/null +++ b/x-pack/plugins/rule_registry/server/event_log/elasticsearch/resources/index_template.ts @@ -0,0 +1,51 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { estypes } from '@elastic/elasticsearch'; +import { IndexNames } from './index_names'; +import { IndexMappings } from './index_mappings'; + +export type IndexTemplate = estypes.PutIndexTemplateRequest['body']; + +export const createIndexTemplate = ( + names: IndexNames, + mappings: IndexMappings, + version: number +): IndexTemplate => { + const { indexAliasName, indexAliasPattern, indexIlmPolicyName } = names; + + return { + index_patterns: [indexAliasPattern], + settings: { + number_of_shards: 1, // TODO: do we need to set this? + auto_expand_replicas: '0-1', // TODO: do we need to set? + index: { + lifecycle: { + name: indexIlmPolicyName, + rollover_alias: indexAliasName, + }, + }, + mapping: { + total_fields: { + limit: 10000, + }, + }, + sort: { + field: '@timestamp', + order: 'desc', + }, + }, + mappings: { + ...mappings, + _meta: { + ...mappings._meta, + version, + }, + }, + version, + }; +}; diff --git a/x-pack/plugins/rule_registry/server/event_log/elasticsearch/utils/buffered_stream.ts b/x-pack/plugins/rule_registry/server/event_log/elasticsearch/utils/buffered_stream.ts new file mode 100644 index 00000000000000..d968cd5a0ac685 --- /dev/null +++ b/x-pack/plugins/rule_registry/server/event_log/elasticsearch/utils/buffered_stream.ts @@ -0,0 +1,52 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { Subject } from 'rxjs'; +import { bufferTime, filter as rxFilter, switchMap } from 'rxjs/operators'; + +export const DEFAULT_BUFFER_TIME_MS = 1000; +export const DEFAULT_BUFFER_SIZE = 100; + +interface ConstructorParams { + maxBufferTimeMs?: number; + maxBufferSize?: number; + flush: (items: TItem[]) => Promise; +} + +// TODO: handle possible exceptions in flush and maybe add retry logic + +export class BufferedStream { + private readonly buffer$: Subject; + private readonly whenBufferCompleteAndFlushed: Promise; + + constructor(params: ConstructorParams) { + const maxBufferTime = params.maxBufferTimeMs ?? DEFAULT_BUFFER_TIME_MS; + const maxBufferSize = params.maxBufferSize ?? DEFAULT_BUFFER_SIZE; + + this.buffer$ = new Subject(); + + // Buffer items for time/buffer length, ignore empty buffers, + // then flush the buffered items; kick things off with a promise + // on the observable, which we'll wait on in shutdown + this.whenBufferCompleteAndFlushed = this.buffer$ + .pipe( + bufferTime(maxBufferTime, null, maxBufferSize), + rxFilter((docs) => docs.length > 0), + switchMap(async (docs) => await params.flush(docs)) + ) + .toPromise(); + } + + public enqueue(item: TItem): void { + this.buffer$.next(item); + } + + public async closeAndWaitUntilFlushed(): Promise { + this.buffer$.complete(); + await this.whenBufferCompleteAndFlushed; + } +} diff --git a/x-pack/plugins/rule_registry/server/event_log/event_schema/index.ts b/x-pack/plugins/rule_registry/server/event_log/event_schema/index.ts new file mode 100644 index 00000000000000..77c041a4059b56 --- /dev/null +++ b/x-pack/plugins/rule_registry/server/event_log/event_schema/index.ts @@ -0,0 +1,9 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export * from './schema_types'; +export * from './schema'; diff --git a/x-pack/plugins/rule_registry/server/event_log/event_schema/schema.ts b/x-pack/plugins/rule_registry/server/event_log/event_schema/schema.ts new file mode 100644 index 00000000000000..9b5d94918a83f6 --- /dev/null +++ b/x-pack/plugins/rule_registry/server/event_log/event_schema/schema.ts @@ -0,0 +1,51 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { EventSchema, Event } from './schema_types'; +import { FieldMap, runtimeTypeFromFieldMap, mergeFieldMaps } from '../../../common/field_map'; +import { + TechnicalRuleFieldMaps, + technicalRuleFieldMap, +} from '../../../common/assets/field_maps/technical_rule_field_map'; + +const baseSchema = createSchema(technicalRuleFieldMap); + +export abstract class Schema { + public static create(fields: TMap): EventSchema { + return createSchema(fields); + } + + public static combine( + s1: EventSchema, + s2: EventSchema + ): EventSchema { + const combinedFields = mergeFieldMaps(s1.objectFields, s2.objectFields); + return createSchema(combinedFields); + } + + public static getBase(): EventSchema { + return baseSchema; + } + + public static extendBase( + fields: TMap + ): EventSchema { + const extensionSchema = createSchema(fields); + return this.combine(baseSchema, extensionSchema); + } +} + +function createSchema(fields: TMap): EventSchema { + const objectType: Event = ({} as unknown) as Event; + const runtimeType = runtimeTypeFromFieldMap(fields); + + return { + objectFields: fields, + objectType, + runtimeType, + }; +} diff --git a/x-pack/plugins/rule_registry/server/event_log/event_schema/schema_types.ts b/x-pack/plugins/rule_registry/server/event_log/event_schema/schema_types.ts new file mode 100644 index 00000000000000..e5c665652fe974 --- /dev/null +++ b/x-pack/plugins/rule_registry/server/event_log/event_schema/schema_types.ts @@ -0,0 +1,20 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { FieldMap, FieldMapType, TypeOfFieldMap } from '../../../common/field_map'; + +export interface EventSchema { + objectFields: TMap; + objectType: Event; + runtimeType: EventRuntimeType; +} + +export type Event = TypeOfFieldMap; + +export type EventRuntimeType = FieldMapType; + +export { FieldMap }; diff --git a/x-pack/plugins/rule_registry/server/event_log/index.ts b/x-pack/plugins/rule_registry/server/event_log/index.ts new file mode 100644 index 00000000000000..cf7467588c22fe --- /dev/null +++ b/x-pack/plugins/rule_registry/server/event_log/index.ts @@ -0,0 +1,10 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export * from './elasticsearch'; +export * from './event_schema'; +export * from './log'; diff --git a/x-pack/plugins/rule_registry/server/event_log/log/event_log.ts b/x-pack/plugins/rule_registry/server/event_log/log/event_log.ts new file mode 100644 index 00000000000000..2b1ecde48d2db0 --- /dev/null +++ b/x-pack/plugins/rule_registry/server/event_log/log/event_log.ts @@ -0,0 +1,51 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { estypes } from '@elastic/elasticsearch'; +import { DeepPartial } from '../utils/utility_types'; +import { IndexNames } from '../elasticsearch'; +import { IEventLog, IEventLogger, IEventLoggerTemplate, IEventQueryBuilder } from './public_api'; +import { EventLogParams } from './internal_api'; +import { EventLoggerTemplate } from './event_logger_template'; +import { EventQueryBuilder } from './event_query_builder'; + +export class EventLog implements IEventLog { + private readonly params: EventLogParams; + private readonly initialTemplate: IEventLoggerTemplate; + + constructor(params: EventLogParams) { + this.params = params; + this.initialTemplate = new EventLoggerTemplate({ + ...params, + eventLoggerName: '', + eventFields: {}, + }); + } + + public getNames(): IndexNames { + return this.params.indexNames; + } + + public getLoggerTemplate(fields: DeepPartial): IEventLoggerTemplate { + return this.initialTemplate.getLoggerTemplate(fields); + } + + public getLogger(loggerName: string, fields?: DeepPartial): IEventLogger { + return this.initialTemplate.getLogger(loggerName, fields); + } + + public getQueryBuilder(): IEventQueryBuilder { + return new EventQueryBuilder(this.params); + } + + public async search( + request: estypes.SearchRequest + ): Promise> { + const response = await this.params.indexReader.search(request); + return response.body; + } +} diff --git a/x-pack/plugins/rule_registry/server/event_log/log/event_log_bootstrapper.ts b/x-pack/plugins/rule_registry/server/event_log/log/event_log_bootstrapper.ts new file mode 100644 index 00000000000000..0498a7cd97b2f5 --- /dev/null +++ b/x-pack/plugins/rule_registry/server/event_log/log/event_log_bootstrapper.ts @@ -0,0 +1,51 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { Logger } from 'kibana/server'; +import { IIndexBootstrapper, IndexSpecification } from '../elasticsearch'; + +interface ConstructorParams { + indexSpec: IndexSpecification; + indexBootstrapper: IIndexBootstrapper; + isWriteEnabled: boolean; + logger: Logger; +} + +export class EventLogBootstrapper { + private readonly indexSpec: IndexSpecification; + private readonly indexBootstrapper: IIndexBootstrapper; + private readonly logger: Logger; + private readonly isWriteEnabled: boolean; + private isIndexBootstrapped: boolean; + + constructor(params: ConstructorParams) { + this.indexSpec = params.indexSpec; + this.indexBootstrapper = params.indexBootstrapper; + this.logger = params.logger.get('EventLogBootstrapper'); + this.isWriteEnabled = params.isWriteEnabled; + this.isIndexBootstrapped = false; + } + + public async run(): Promise { + if (this.isIndexBootstrapped || !this.isWriteEnabled) { + return; + } + + const { logName, indexAliasName } = this.indexSpec.indexNames; + const logInfo = `log="${logName}" index="${indexAliasName}"`; + + this.logger.debug(`Bootstrapping started, ${logInfo}`); + this.isIndexBootstrapped = await this.indexBootstrapper.run(this.indexSpec); + this.logger.debug( + `Bootstrapping ${this.isIndexBootstrapped ? 'succeeded' : 'failed'}, ${logInfo}` + ); + + if (!this.isIndexBootstrapped) { + throw new Error(`Event log bootstrapping failed, ${logInfo}`); + } + } +} diff --git a/x-pack/plugins/rule_registry/server/event_log/log/event_log_definition.ts b/x-pack/plugins/rule_registry/server/event_log/log/event_log_definition.ts new file mode 100644 index 00000000000000..124664d5578b00 --- /dev/null +++ b/x-pack/plugins/rule_registry/server/event_log/log/event_log_definition.ts @@ -0,0 +1,37 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { IlmPolicy, defaultIlmPolicy, IndexNames } from '../elasticsearch'; +import { EventSchema, FieldMap, Schema } from '../event_schema'; +import { EventLogOptions, IEventLogDefinition } from './public_api'; + +export class EventLogDefinition implements IEventLogDefinition { + public readonly eventLogName: string; + public readonly eventSchema: EventSchema; + public readonly ilmPolicy: IlmPolicy; + + constructor(options: EventLogOptions) { + // TODO: validate options; options.name should not contain "-" and "." + this.eventLogName = options.name; + this.eventSchema = options.schema; + this.ilmPolicy = options.ilmPolicy ?? defaultIlmPolicy; + } + + public defineChild( + options: EventLogOptions + ): IEventLogDefinition { + const childName = IndexNames.createChildLogName(this.eventLogName, options.name); + const childSchema = Schema.combine(this.eventSchema, options.schema); + const childPolicy = options.ilmPolicy ?? this.ilmPolicy; + + return new EventLogDefinition({ + name: childName, + schema: childSchema, + ilmPolicy: childPolicy, + }); + } +} diff --git a/x-pack/plugins/rule_registry/server/event_log/log/event_log_provider.ts b/x-pack/plugins/rule_registry/server/event_log/log/event_log_provider.ts new file mode 100644 index 00000000000000..d1ecd6a977a080 --- /dev/null +++ b/x-pack/plugins/rule_registry/server/event_log/log/event_log_provider.ts @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { IIndexWriter } from '../elasticsearch'; +import { IEventLog } from './public_api'; +import { IEventLogProvider } from './internal_api'; +import { EventLogBootstrapper } from './event_log_bootstrapper'; + +interface ConstructorParams { + log: IEventLog; + logBootstrapper: EventLogBootstrapper; + indexWriter: IIndexWriter; +} + +export class EventLogProvider implements IEventLogProvider { + constructor(private readonly params: ConstructorParams) {} + + public getLog(): IEventLog { + return this.params.log; + } + + public async bootstrapLog(): Promise { + await this.params.logBootstrapper.run(); + } + + public async shutdownLog(): Promise { + await this.params.indexWriter.close(); + } +} diff --git a/x-pack/plugins/rule_registry/server/event_log/log/event_log_registry.ts b/x-pack/plugins/rule_registry/server/event_log/log/event_log_registry.ts new file mode 100644 index 00000000000000..52f6c6bd918d49 --- /dev/null +++ b/x-pack/plugins/rule_registry/server/event_log/log/event_log_registry.ts @@ -0,0 +1,56 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { Event, FieldMap } from '../event_schema'; +import { IEventLogDefinition } from './public_api'; +import { IEventLogRegistry, IEventLogProvider } from './internal_api'; + +const getRegistryKey = (definition: IEventLogDefinition, spaceId: string) => + `${definition.eventLogName}-${spaceId}`; + +interface RegistryEntry { + definition: IEventLogDefinition; + spaceId: string; + provider: IEventLogProvider; +} + +export class EventLogRegistry implements IEventLogRegistry { + private readonly map = new Map(); + + public get( + definition: IEventLogDefinition, + spaceId: string + ): IEventLogProvider> | null { + const key = getRegistryKey(definition, spaceId); + const entry = this.map.get(key); + return entry != null ? (entry.provider as IEventLogProvider>) : null; + } + + public add( + definition: IEventLogDefinition, + spaceId: string, + provider: IEventLogProvider> + ): void { + const key = getRegistryKey(definition, spaceId); + + if (this.map.has(key)) { + throw new Error(`Event log already registered, key="${key}"`); + } + + this.map.set(key, { + definition, + spaceId, + provider, + }); + } + + public async shutdown(): Promise { + const entries = Array.from(this.map.values()); + const promises = entries.map(({ provider }) => provider.shutdownLog()); + await Promise.all(promises); + } +} diff --git a/x-pack/plugins/rule_registry/server/event_log/log/event_log_resolver.ts b/x-pack/plugins/rule_registry/server/event_log/log/event_log_resolver.ts new file mode 100644 index 00000000000000..8440f554323041 --- /dev/null +++ b/x-pack/plugins/rule_registry/server/event_log/log/event_log_resolver.ts @@ -0,0 +1,162 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { + IndexBootstrapper, + IndexManagementGateway, + IndexNames, + IndexReader, + IndexSpecification, + IndexWriter, +} from '../elasticsearch'; + +import { Event, FieldMap } from '../event_schema'; +import { IEventLogRegistry, IEventLogProvider } from './internal_api'; +import { + EventLogServiceConfig, + EventLogServiceDependencies, + IEventLog, + IEventLogDefinition, + IEventLogResolver, +} from './public_api'; + +import { EventLog } from './event_log'; +import { EventLogBootstrapper } from './event_log_bootstrapper'; +import { EventLogProvider } from './event_log_provider'; +import { mappingFromFieldMap } from './utils/mapping_from_field_map'; + +export class EventLogResolver implements IEventLogResolver { + private readonly indexBootstrapper: IndexBootstrapper; + + constructor( + private readonly config: EventLogServiceConfig, + private readonly deps: EventLogServiceDependencies, + private readonly registry: IEventLogRegistry, + private readonly bootstrapLog: boolean + ) { + this.indexBootstrapper = this.createIndexBootstrapper(); + } + + public async resolve( + definition: IEventLogDefinition, + kibanaSpaceId: string + ): Promise>> { + const provider = this.resolveLogProvider(definition, kibanaSpaceId); + + if (this.bootstrapLog) { + await provider.bootstrapLog(); + } + + return provider.getLog(); + } + + private resolveLogProvider( + definition: IEventLogDefinition, + kibanaSpaceId: string + ): IEventLogProvider> { + const existingProvider = this.registry.get(definition, kibanaSpaceId); + if (existingProvider) { + return existingProvider; + } + + const indexSpec = this.createIndexSpec(definition, kibanaSpaceId); + const indexReader = this.createIndexReader(indexSpec); + const indexWriter = this.createIndexWriter(indexSpec); + const logBootstrapper = this.createEventLogBootstrapper(indexSpec); + const log = this.createEventLog(indexSpec, indexReader, indexWriter); + const logProvider = new EventLogProvider({ + log, + logBootstrapper, + indexWriter, + }); + + this.registry.add(definition, kibanaSpaceId, logProvider); + + return logProvider; + } + + private createIndexSpec( + definition: IEventLogDefinition, + kibanaSpaceId: string + ): IndexSpecification { + const { indexPrefix } = this.config; + const { eventLogName, eventSchema, ilmPolicy } = definition; + + const indexNames = IndexNames.create({ + indexPrefix, + logName: eventLogName, + kibanaSpaceId, + }); + + const indexMappings = mappingFromFieldMap(eventSchema.objectFields); + + return { indexNames, indexMappings, ilmPolicy }; + } + + private createIndexBootstrapper(): IndexBootstrapper { + const { clusterClient, logger } = this.deps; + + return new IndexBootstrapper({ + gateway: new IndexManagementGateway({ + elasticsearch: clusterClient.then((c) => c.asInternalUser), + logger, + }), + logger, + }); + } + + private createIndexReader(indexSpec: IndexSpecification): IndexReader { + const { clusterClient, logger } = this.deps; + const { indexNames } = indexSpec; + + return new IndexReader({ + indexName: indexNames.indexAliasPattern, + elasticsearch: clusterClient.then((c) => c.asInternalUser), // TODO: internal or current? + logger, + }); + } + + private createIndexWriter(indexSpec: IndexSpecification): IndexWriter { + const { clusterClient, logger } = this.deps; + const { isWriteEnabled } = this.config; + const { indexNames } = indexSpec; + + return new IndexWriter({ + indexName: indexNames.indexAliasName, + elasticsearch: clusterClient.then((c) => c.asInternalUser), // TODO: internal or current? + isWriteEnabled, + logger, + }); + } + + private createEventLogBootstrapper(indexSpec: IndexSpecification): EventLogBootstrapper { + const { logger } = this.deps; + const { isWriteEnabled } = this.config; + + return new EventLogBootstrapper({ + indexSpec, + indexBootstrapper: this.indexBootstrapper, + isWriteEnabled, + logger, + }); + } + + private createEventLog( + indexSpec: IndexSpecification, + indexReader: IndexReader, + indexWriter: IndexWriter + ): IEventLog> { + const { logger } = this.deps; + + return new EventLog>({ + indexNames: indexSpec.indexNames, + indexReader, + indexWriter, + logger, + }); + } +} diff --git a/x-pack/plugins/rule_registry/server/event_log/log/event_log_service.ts b/x-pack/plugins/rule_registry/server/event_log/log/event_log_service.ts new file mode 100644 index 00000000000000..b5b1d23f2e2158 --- /dev/null +++ b/x-pack/plugins/rule_registry/server/event_log/log/event_log_service.ts @@ -0,0 +1,67 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { KibanaRequest } from 'kibana/server'; + +import { Event, FieldMap } from '../event_schema'; +import { + EventLogServiceConfig, + EventLogServiceDependencies, + IEventLog, + IEventLogDefinition, + IEventLogResolver, + IEventLogService, + IScopedEventLogResolver, +} from './public_api'; + +import { EventLogRegistry } from './event_log_registry'; +import { EventLogResolver } from './event_log_resolver'; + +const BOOTSTRAP_BY_DEFAULT = true; + +interface ConstructorParams { + config: EventLogServiceConfig; + dependencies: EventLogServiceDependencies; +} + +export class EventLogService implements IEventLogService { + private readonly registry: EventLogRegistry; + + constructor(private readonly params: ConstructorParams) { + this.registry = new EventLogRegistry(); + } + + public getResolver(bootstrapLog = BOOTSTRAP_BY_DEFAULT): IEventLogResolver { + const { params, registry } = this; + const { config, dependencies } = params; + + return new EventLogResolver(config, dependencies, registry, bootstrapLog); + } + + public getScopedResolver( + request: KibanaRequest, + bootstrapLog = BOOTSTRAP_BY_DEFAULT + ): IScopedEventLogResolver { + const resolver = this.getResolver(bootstrapLog); + + return { + resolve: async ( + definition: IEventLogDefinition + ): Promise>> => { + const spaces = await this.params.dependencies.spacesService; + const spaceId = spaces.getSpaceId(request); + + const log = await resolver.resolve(definition, spaceId); + return log; + }, + }; + } + + public async stop(): Promise { + await this.registry.shutdown(); + } +} diff --git a/x-pack/plugins/rule_registry/server/event_log/log/event_logger.ts b/x-pack/plugins/rule_registry/server/event_log/log/event_logger.ts new file mode 100644 index 00000000000000..c6f88f49835d70 --- /dev/null +++ b/x-pack/plugins/rule_registry/server/event_log/log/event_logger.ts @@ -0,0 +1,36 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { DeepPartial } from '../utils/utility_types'; +import { mergeFields } from '../utils/fields'; +import { EventLoggerParams } from './internal_api'; +import { IEventLogger, IEventLoggerTemplate } from './public_api'; + +export class EventLogger implements IEventLogger { + private readonly params: EventLoggerParams; + private readonly ownTemplate: IEventLoggerTemplate; + + constructor(params: EventLoggerParams, template: IEventLoggerTemplate) { + this.params = params; + this.ownTemplate = template; + } + + public getLoggerTemplate(fields: DeepPartial): IEventLoggerTemplate { + return this.ownTemplate.getLoggerTemplate(fields); + } + + public getLogger(name: string, fields?: DeepPartial): IEventLogger { + return this.ownTemplate.getLogger(name, fields); + } + + public logEvent(fields: DeepPartial): void { + const { eventFields, indexWriter } = this.params; + + const event = mergeFields(eventFields, fields); + indexWriter.indexOne(event); + } +} diff --git a/x-pack/plugins/rule_registry/server/event_log/log/event_logger_template.ts b/x-pack/plugins/rule_registry/server/event_log/log/event_logger_template.ts new file mode 100644 index 00000000000000..3872a5c744269c --- /dev/null +++ b/x-pack/plugins/rule_registry/server/event_log/log/event_logger_template.ts @@ -0,0 +1,55 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { DeepPartial } from '../utils/utility_types'; +import { mergeFields } from '../utils/fields'; +import { IEventLogger, IEventLoggerTemplate } from './public_api'; +import { EventLoggerParams } from './internal_api'; +import { EventLogger } from './event_logger'; + +export class EventLoggerTemplate implements IEventLoggerTemplate { + private readonly params: EventLoggerParams; + + constructor(params: EventLoggerParams) { + this.params = params; + } + + public getLoggerTemplate(fields: DeepPartial): IEventLoggerTemplate { + const nextParams = this.getNextParams('', fields); + return new EventLoggerTemplate(nextParams); + } + + public getLogger(name: string, fields?: DeepPartial): IEventLogger { + const nextParams = this.getNextParams(name, fields); + const nextTemplate = new EventLoggerTemplate(nextParams); + return new EventLogger(nextParams, nextTemplate); + } + + private getNextParams( + extName: string, + extFields?: DeepPartial + ): EventLoggerParams { + const { indexNames, eventLoggerName, eventFields } = this.params; + + const baseName = eventLoggerName; + const nextName = [baseName, extName].filter(Boolean).join('.'); + + const baseFields = eventFields; + const nextFields = mergeFields(baseFields, extFields, { + // TODO: Define a schema for own fields used/set by event log. Add it to the base schema. + // Then maybe introduce a base type for TEvent. + 'kibana.rac.event_log.log_name': indexNames.logName, + 'kibana.rac.event_log.logger_name': nextName, + } as any); + + return { + ...this.params, + eventLoggerName: nextName, + eventFields: nextFields, + }; + } +} diff --git a/x-pack/plugins/rule_registry/server/event_log/log/event_query.ts b/x-pack/plugins/rule_registry/server/event_log/log/event_query.ts new file mode 100644 index 00000000000000..0eabe4be64837a --- /dev/null +++ b/x-pack/plugins/rule_registry/server/event_log/log/event_query.ts @@ -0,0 +1,27 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { estypes } from '@elastic/elasticsearch'; +import { IIndexReader } from '../elasticsearch'; +import { truthy } from '../utils/predicates'; +import { IEventQuery } from './public_api'; + +export interface EventQueryParams { + indexReader: IIndexReader; + request: estypes.SearchRequest; +} + +export class EventQuery implements IEventQuery { + constructor(private readonly params: EventQueryParams) {} + + public async execute(): Promise { + const { indexReader, request } = this.params; + + const response = await indexReader.search(request); + return response.body.hits.hits.map((hit) => hit._source).filter(truthy); + } +} diff --git a/x-pack/plugins/rule_registry/server/event_log/log/event_query_builder.ts b/x-pack/plugins/rule_registry/server/event_log/log/event_query_builder.ts new file mode 100644 index 00000000000000..48228ce5352b7b --- /dev/null +++ b/x-pack/plugins/rule_registry/server/event_log/log/event_query_builder.ts @@ -0,0 +1,110 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { getFlattenedObject } from '@kbn/std'; +import { estypes } from '@elastic/elasticsearch'; +import { esKuery } from '../../../../../../src/plugins/data/server'; + +import { DeepPartial } from '../utils/utility_types'; +import { mergeFields } from '../utils/fields'; +import { EventLogParams } from './internal_api'; +import { IEventQueryBuilder, IEventQuery, SortingParams, PaginationParams } from './public_api'; +import { EventQuery } from './event_query'; + +export class EventQueryBuilder implements IEventQueryBuilder { + private readonly params: EventLogParams; + private loggerName: string; + private fields: DeepPartial | null; + private kql: string; + private sorting: SortingParams; + private pagination: PaginationParams; + + constructor(params: EventLogParams) { + this.params = params; + this.loggerName = ''; + this.fields = null; + this.kql = ''; + this.sorting = [{ '@timestamp': { order: 'desc' } }, { 'event.sequence': { order: 'desc' } }]; + this.pagination = { page: 1, perPage: 20 }; + } + + public filterByLogger(loggerName: string): IEventQueryBuilder { + this.loggerName = loggerName; + return this; + } + + public filterByFields(fields: DeepPartial): IEventQueryBuilder { + this.fields = mergeFields(this.fields ?? {}, fields); + return this; + } + + public filterByKql(kql: string): IEventQueryBuilder { + this.kql = kql; + return this; + } + + public sortBy(params: SortingParams): IEventQueryBuilder { + this.sorting = params; + return this; + } + + public paginate(params: PaginationParams): IEventQueryBuilder { + this.pagination = params; + return this; + } + + public buildQuery(): IEventQuery { + const { indexReader } = this.params; + const { page, perPage } = this.pagination; + + const request: estypes.SearchRequest = { + track_total_hits: true, + body: { + from: (page - 1) * perPage, + size: perPage, + sort: this.sorting, + query: { + bool: { + filter: this.buildFilter(), + }, + }, + }, + }; + + return new EventQuery({ indexReader, request }); + } + + private buildFilter(): estypes.QueryContainer[] { + const result: estypes.QueryContainer[] = []; + + if (this.loggerName) { + result.push({ + term: { 'kibana.rac.event_log.logger_name': this.loggerName }, + }); + } + + if (this.fields) { + const flatFields = getFlattenedObject(this.fields); + Object.entries(flatFields) + .map(([key, value]) => { + const queryName = Array.isArray(value) ? 'terms' : 'term'; + return { [queryName]: { [key]: value } }; + }) + .forEach((query) => { + result.push(query); + }); + } + + if (this.kql) { + const dsl = esKuery.toElasticsearchQuery(esKuery.fromKueryExpression(this.kql)); + const queries = Array.isArray(dsl) ? dsl : [dsl]; + result.push(...queries); + } + + return result; + } +} diff --git a/x-pack/plugins/rule_registry/server/event_log/log/index.ts b/x-pack/plugins/rule_registry/server/event_log/log/index.ts new file mode 100644 index 00000000000000..e5593390733e4a --- /dev/null +++ b/x-pack/plugins/rule_registry/server/event_log/log/index.ts @@ -0,0 +1,10 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export * from './event_log_definition'; +export * from './event_log_service'; +export * from './public_api'; diff --git a/x-pack/plugins/rule_registry/server/event_log/log/internal_api.ts b/x-pack/plugins/rule_registry/server/event_log/log/internal_api.ts new file mode 100644 index 00000000000000..8db931b35912d1 --- /dev/null +++ b/x-pack/plugins/rule_registry/server/event_log/log/internal_api.ts @@ -0,0 +1,46 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { Logger } from 'kibana/server'; + +import { IIndexReader, IIndexWriter, IndexNames } from '../elasticsearch'; +import { Event, FieldMap } from '../event_schema'; +import { DeepPartial } from '../utils/utility_types'; +import { IEventLogDefinition, IEventLog } from './public_api'; + +export interface IEventLogRegistry { + get( + definition: IEventLogDefinition, + spaceId: string + ): IEventLogProvider> | null; + + add( + definition: IEventLogDefinition, + spaceId: string, + provider: IEventLogProvider> + ): void; + + shutdown(): Promise; +} + +export interface IEventLogProvider { + getLog(): IEventLog; + bootstrapLog(): Promise; + shutdownLog(): Promise; +} + +export interface EventLogParams { + indexNames: IndexNames; + indexReader: IIndexReader; + indexWriter: IIndexWriter; + logger: Logger; +} + +export interface EventLoggerParams extends EventLogParams { + eventLoggerName: string; + eventFields: DeepPartial; +} diff --git a/x-pack/plugins/rule_registry/server/event_log/log/public_api.ts b/x-pack/plugins/rule_registry/server/event_log/log/public_api.ts new file mode 100644 index 00000000000000..7807dd9ca6b36b --- /dev/null +++ b/x-pack/plugins/rule_registry/server/event_log/log/public_api.ts @@ -0,0 +1,113 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { estypes } from '@elastic/elasticsearch'; +import { IClusterClient, KibanaRequest, Logger } from 'kibana/server'; +import { SpacesServiceStart } from '../../../../spaces/server'; + +import { IlmPolicy, IndexNames, IndexSpecification } from '../elasticsearch'; +import { FieldMap, Event, EventSchema } from '../event_schema'; +import { DeepPartial } from '../utils/utility_types'; + +export { IlmPolicy, IndexSpecification }; + +// ------------------------------------------------------------------------------------------------- +// Definition API (defining log hierarchies as simple objects) + +export interface EventLogOptions { + name: string; + schema: EventSchema; + ilmPolicy?: IlmPolicy; +} + +export interface IEventLogDefinition { + eventLogName: string; + eventSchema: EventSchema; + ilmPolicy: IlmPolicy; + + defineChild( + options: EventLogOptions + ): IEventLogDefinition; +} + +// ------------------------------------------------------------------------------------------------- +// Resolving and bootstrapping API (creating runtime objects representing logs, bootstrapping indices) + +export interface EventLogServiceConfig { + indexPrefix: string; + isWriteEnabled: boolean; +} + +export interface EventLogServiceDependencies { + clusterClient: Promise; + spacesService: Promise; + logger: Logger; +} + +export interface IEventLogService { + getResolver(bootstrapLog?: boolean): IEventLogResolver; + getScopedResolver(request: KibanaRequest, bootstrapLog?: boolean): IScopedEventLogResolver; +} + +export interface IEventLogResolver { + resolve( + definition: IEventLogDefinition, + spaceId: string + ): Promise>>; +} + +export interface IScopedEventLogResolver { + resolve( + definition: IEventLogDefinition + ): Promise>>; +} + +export interface IEventLog extends IEventLoggerTemplate { + getNames(): IndexNames; + + getQueryBuilder(): IEventQueryBuilder; + + search( + request: estypes.SearchRequest + ): Promise>; +} + +// ------------------------------------------------------------------------------------------------- +// Write API (logging events) + +export interface IEventLoggerTemplate { + getLoggerTemplate(fields: DeepPartial): IEventLoggerTemplate; + getLogger(name: string, fields?: DeepPartial): IEventLogger; +} + +export interface IEventLogger extends IEventLoggerTemplate { + logEvent(fields: DeepPartial): void; +} + +// ------------------------------------------------------------------------------------------------- +// Read API (searching, filtering, sorting, pagination, aggregation over events) + +export interface IEventQueryBuilder { + filterByLogger(loggerName: string): IEventQueryBuilder; + filterByFields(fields: DeepPartial): IEventQueryBuilder; + filterByKql(kql: string): IEventQueryBuilder; + sortBy(params: SortingParams): IEventQueryBuilder; + paginate(params: PaginationParams): IEventQueryBuilder; + + buildQuery(): IEventQuery; +} + +export type SortingParams = estypes.Sort; + +export interface PaginationParams { + page: number; + perPage: number; +} + +export interface IEventQuery { + execute(): Promise; +} diff --git a/x-pack/plugins/rule_registry/server/event_log/log/utils/mapping_from_field_map.ts b/x-pack/plugins/rule_registry/server/event_log/log/utils/mapping_from_field_map.ts new file mode 100644 index 00000000000000..fd5dc3ae022881 --- /dev/null +++ b/x-pack/plugins/rule_registry/server/event_log/log/utils/mapping_from_field_map.ts @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { set } from '@elastic/safer-lodash-set'; +import { FieldMap } from '../../../../common/field_map'; +import { IndexMappings } from '../../elasticsearch'; + +export function mappingFromFieldMap(fieldMap: FieldMap): IndexMappings { + const mappings = { + dynamic: 'strict' as const, + properties: {}, + }; + + const fields = Object.keys(fieldMap).map((key) => { + const field = fieldMap[key]; + return { + name: key, + ...field, + }; + }); + + fields.forEach((field) => { + const { name, required, array, ...rest } = field; + + set(mappings.properties, field.name.split('.').join('.properties.'), rest); + }); + + return mappings; +} diff --git a/x-pack/plugins/rule_registry/server/event_log/utils/fields.ts b/x-pack/plugins/rule_registry/server/event_log/utils/fields.ts new file mode 100644 index 00000000000000..4f140cfb3434f1 --- /dev/null +++ b/x-pack/plugins/rule_registry/server/event_log/utils/fields.ts @@ -0,0 +1,18 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { merge } from 'lodash'; +import { DeepPartial } from './utility_types'; + +export const mergeFields = ( + base: DeepPartial, + ext1?: DeepPartial, + ext2?: DeepPartial, + ext3?: DeepPartial +): DeepPartial => { + return merge({}, base, ext1 ?? {}, ext2 ?? {}, ext3 ?? {}); +}; diff --git a/x-pack/plugins/rule_registry/server/event_log/utils/predicates.ts b/x-pack/plugins/rule_registry/server/event_log/utils/predicates.ts new file mode 100644 index 00000000000000..40daac1fedcc63 --- /dev/null +++ b/x-pack/plugins/rule_registry/server/event_log/utils/predicates.ts @@ -0,0 +1,16 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export function nonNullable(value: T): value is NonNullable { + return value !== null && value !== undefined; +} + +export type Truthy = T extends false | '' | 0 | null | undefined ? never : T; // from lodash + +export function truthy(value: T): value is Truthy { + return Boolean(value); +} diff --git a/x-pack/plugins/rule_registry/server/event_log/utils/ready_signal.test.ts b/x-pack/plugins/rule_registry/server/event_log/utils/ready_signal.test.ts new file mode 100644 index 00000000000000..f01d438ce79a00 --- /dev/null +++ b/x-pack/plugins/rule_registry/server/event_log/utils/ready_signal.test.ts @@ -0,0 +1,22 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { createReadySignal, ReadySignal } from './ready_signal'; + +describe('ReadySignal', () => { + let readySignal: ReadySignal; + + beforeEach(() => { + readySignal = createReadySignal(); + }); + + test('works as expected', async () => { + readySignal.signal(42); + const ready = await readySignal.wait(); + expect(ready).toBe(42); + }); +}); diff --git a/x-pack/plugins/rule_registry/server/event_log/utils/ready_signal.ts b/x-pack/plugins/rule_registry/server/event_log/utils/ready_signal.ts new file mode 100644 index 00000000000000..0512def2b59779 --- /dev/null +++ b/x-pack/plugins/rule_registry/server/event_log/utils/ready_signal.ts @@ -0,0 +1,29 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export interface ReadySignal { + wait(): Promise; + signal(value: T): void; +} + +export function createReadySignal(): ReadySignal { + let resolver: (value: T) => void; + + const promise = new Promise((resolve) => { + resolver = resolve; + }); + + async function wait(): Promise { + return await promise; + } + + function signal(value: T) { + resolver(value); + } + + return { wait, signal }; +} diff --git a/x-pack/plugins/rule_registry/server/event_log/utils/utility_types.ts b/x-pack/plugins/rule_registry/server/event_log/utils/utility_types.ts new file mode 100644 index 00000000000000..78e145740da544 --- /dev/null +++ b/x-pack/plugins/rule_registry/server/event_log/utils/utility_types.ts @@ -0,0 +1,12 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export type DeepWriteable = { -readonly [P in keyof T]: DeepWriteable }; + +export type DeepPartial = { + [P in keyof T]?: T[P] extends Array ? Array> : DeepPartial; +}; diff --git a/x-pack/plugins/rule_registry/server/index.ts b/x-pack/plugins/rule_registry/server/index.ts index b51ba3e10f91a9..9547f165cd7058 100644 --- a/x-pack/plugins/rule_registry/server/index.ts +++ b/x-pack/plugins/rule_registry/server/index.ts @@ -5,27 +5,15 @@ * 2.0. */ -import { schema, TypeOf } from '@kbn/config-schema'; import { PluginInitializerContext } from 'src/core/server'; import { RuleRegistryPlugin } from './plugin'; +export * from './config'; export type { RuleRegistryPluginSetupContract, RuleRegistryPluginStartContract } from './plugin'; export { RuleDataClient } from './rule_data_client'; export { IRuleDataClient } from './rule_data_client/types'; export { getRuleExecutorData, RuleExecutorData } from './utils/get_rule_executor_data'; export { createLifecycleRuleTypeFactory } from './utils/create_lifecycle_rule_type_factory'; -export const config = { - schema: schema.object({ - enabled: schema.boolean({ defaultValue: true }), - write: schema.object({ - enabled: schema.boolean({ defaultValue: true }), - }), - index: schema.string({ defaultValue: '.alerts' }), - }), -}; - -export type RuleRegistryPluginConfig = TypeOf; - export const plugin = (initContext: PluginInitializerContext) => new RuleRegistryPlugin(initContext); diff --git a/x-pack/plugins/rule_registry/server/plugin.ts b/x-pack/plugins/rule_registry/server/plugin.ts index 3c645f98f5c719..043b07f9d67c12 100644 --- a/x-pack/plugins/rule_registry/server/plugin.ts +++ b/x-pack/plugins/rule_registry/server/plugin.ts @@ -5,45 +5,99 @@ * 2.0. */ -import { PluginInitializerContext, Plugin, CoreSetup } from 'src/core/server'; +import { PluginInitializerContext, Plugin, CoreSetup, Logger } from 'src/core/server'; +import { SpacesPluginStart } from '../../spaces/server'; + +import { RuleRegistryPluginConfig } from './config'; import { RuleDataPluginService } from './rule_data_plugin_service'; -import { RuleRegistryPluginConfig } from '.'; +import { EventLogService, IEventLogService } from './event_log'; + +// eslint-disable-next-line @typescript-eslint/no-empty-interface +interface RuleRegistryPluginSetupDependencies {} + +interface RuleRegistryPluginStartDependencies { + spaces: SpacesPluginStart; +} + +export interface RuleRegistryPluginSetupContract { + ruleDataService: RuleDataPluginService; + eventLogService: IEventLogService; +} -export type RuleRegistryPluginSetupContract = RuleDataPluginService; export type RuleRegistryPluginStartContract = void; -export class RuleRegistryPlugin implements Plugin { - constructor(private readonly initContext: PluginInitializerContext) { - this.initContext = initContext; +export class RuleRegistryPlugin + implements + Plugin< + RuleRegistryPluginSetupContract, + RuleRegistryPluginStartContract, + RuleRegistryPluginSetupDependencies, + RuleRegistryPluginStartDependencies + > { + private readonly config: RuleRegistryPluginConfig; + private readonly logger: Logger; + private eventLogService: EventLogService | null; + + constructor(initContext: PluginInitializerContext) { + this.config = initContext.config.get(); + this.logger = initContext.logger.get(); + this.eventLogService = null; } - public setup(core: CoreSetup): RuleRegistryPluginSetupContract { - const config = this.initContext.config.get(); + public setup( + core: CoreSetup + ): RuleRegistryPluginSetupContract { + const { config, logger } = this; - const logger = this.initContext.logger.get(); + const startDependencies = core.getStartServices().then(([coreStart, pluginStart]) => { + return { + core: coreStart, + ...pluginStart, + }; + }); - const service = new RuleDataPluginService({ + const ruleDataService = new RuleDataPluginService({ logger, isWriteEnabled: config.write.enabled, index: config.index, getClusterClient: async () => { - const [coreStart] = await core.getStartServices(); - - return coreStart.elasticsearch.client.asInternalUser; + const deps = await startDependencies; + return deps.core.elasticsearch.client.asInternalUser; }, }); - service.init().catch((originalError) => { + ruleDataService.init().catch((originalError) => { const error = new Error('Failed installing assets'); // @ts-ignore error.stack = originalError.stack; logger.error(error); }); - return service; + const eventLogService = new EventLogService({ + config: { + indexPrefix: this.config.index, + isWriteEnabled: this.config.write.enabled, + }, + dependencies: { + clusterClient: startDependencies.then((deps) => deps.core.elasticsearch.client), + spacesService: startDependencies.then((deps) => deps.spaces.spacesService), + logger: logger.get('eventLog'), + }, + }); + + this.eventLogService = eventLogService; + return { ruleDataService, eventLogService }; } public start(): RuleRegistryPluginStartContract {} - public stop() {} + public stop() { + const { eventLogService, logger } = this; + + if (eventLogService) { + eventLogService.stop().catch((e) => { + logger.error(e); + }); + } + } } diff --git a/x-pack/plugins/rule_registry/tsconfig.json b/x-pack/plugins/rule_registry/tsconfig.json index 707e1ccb98dad9..5aefe9769da22b 100644 --- a/x-pack/plugins/rule_registry/tsconfig.json +++ b/x-pack/plugins/rule_registry/tsconfig.json @@ -10,7 +10,9 @@ "include": ["common/**/*", "server/**/*", "public/**/*", "../../../typings/**/*"], "references": [ { "path": "../../../src/core/tsconfig.json" }, + { "path": "../../../src/plugins/data/tsconfig.json" }, { "path": "../alerting/tsconfig.json" }, + { "path": "../spaces/tsconfig.json" }, { "path": "../triggers_actions_ui/tsconfig.json" } ] }