diff --git a/x-pack/plugins/monitoring/common/types/es.ts b/x-pack/plugins/monitoring/common/types/es.ts index 853e140ec66c7..725ff214ae795 100644 --- a/x-pack/plugins/monitoring/common/types/es.ts +++ b/x-pack/plugins/monitoring/common/types/es.ts @@ -24,9 +24,40 @@ export interface ElasticsearchSourceKibanaStats { }; } +export interface ElasticsearchSourceLogstashPipelineVertex { + id: string; + plugin_type: string; + stats?: { + [key: string]: { + data?: any[]; + }; + }; +} + export interface ElasticsearchSource { timestamp: string; kibana_stats?: ElasticsearchSourceKibanaStats; + logstash_state?: { + pipeline?: { + representation?: { + graph?: { + vertices?: ElasticsearchSourceLogstashPipelineVertex[]; + }; + }; + }; + }; + logstash_stats?: { + timestamp?: string; + logstash?: {}; + events?: {}; + reloads?: {}; + queue?: { + type?: string; + }; + jvm?: { + uptime_in_millis?: number; + }; + }; beats_stats?: { timestamp?: string; beat?: { diff --git a/x-pack/plugins/monitoring/server/lib/logstash/get_node_info.js b/x-pack/plugins/monitoring/server/lib/logstash/get_node_info.ts similarity index 65% rename from x-pack/plugins/monitoring/server/lib/logstash/get_node_info.js rename to x-pack/plugins/monitoring/server/lib/logstash/get_node_info.ts index fdfc523e53527..ead8764607786 100644 --- a/x-pack/plugins/monitoring/server/lib/logstash/get_node_info.js +++ b/x-pack/plugins/monitoring/server/lib/logstash/get_node_info.ts @@ -4,24 +4,31 @@ * you may not use this file except in compliance with the Elastic License. */ -import { get, merge } from 'lodash'; +import { merge } from 'lodash'; +// @ts-ignore import { checkParam } from '../error_missing_required'; -import { calculateAvailability } from './../calculate_availability'; +// @ts-ignore +import { calculateAvailability } from '../calculate_availability'; +import { LegacyRequest, ElasticsearchResponse } from '../../types'; -export function handleResponse(resp) { - const source = get(resp, 'hits.hits[0]._source.logstash_stats'); - const logstash = get(source, 'logstash'); +export function handleResponse(resp: ElasticsearchResponse) { + const source = resp.hits?.hits[0]?._source?.logstash_stats; + const logstash = source?.logstash; const info = merge(logstash, { - availability: calculateAvailability(get(source, 'timestamp')), - events: get(source, 'events'), - reloads: get(source, 'reloads'), - queue_type: get(source, 'queue.type'), - uptime: get(source, 'jvm.uptime_in_millis'), + availability: calculateAvailability(source?.timestamp), + events: source?.events, + reloads: source?.reloads, + queue_type: source?.queue?.type, + uptime: source?.jvm?.uptime_in_millis, }); return info; } -export function getNodeInfo(req, lsIndexPattern, { clusterUuid, logstashUuid }) { +export function getNodeInfo( + req: LegacyRequest, + lsIndexPattern: string, + { clusterUuid, logstashUuid }: { clusterUuid: string; logstashUuid: string } +) { checkParam(lsIndexPattern, 'lsIndexPattern in getNodeInfo'); const params = { diff --git a/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline.js b/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline.ts similarity index 78% rename from x-pack/plugins/monitoring/server/lib/logstash/get_pipeline.js rename to x-pack/plugins/monitoring/server/lib/logstash/get_pipeline.ts index 2977159f4206d..0461d69e3d68d 100644 --- a/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline.js +++ b/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline.ts @@ -6,16 +6,24 @@ import boom from '@hapi/boom'; import { get } from 'lodash'; +// @ts-ignore import { checkParam } from '../error_missing_required'; import { getPipelineStateDocument } from './get_pipeline_state_document'; +// @ts-ignore import { getPipelineStatsAggregation } from './get_pipeline_stats_aggregation'; +// @ts-ignore import { calculateTimeseriesInterval } from '../calculate_timeseries_interval'; +import { LegacyRequest } from '../../types'; +import { + ElasticsearchSource, + ElasticsearchSourceLogstashPipelineVertex, +} from '../../../common/types/es'; export function _vertexStats( - vertex, - vertexStatsBucket, - totalProcessorsDurationInMillis, - timeseriesIntervalInSeconds + vertex: ElasticsearchSourceLogstashPipelineVertex, + vertexStatsBucket: any, + totalProcessorsDurationInMillis: number, + timeseriesIntervalInSeconds: number ) { const isInput = vertex.plugin_type === 'input'; const isProcessor = vertex.plugin_type === 'filter' || vertex.plugin_type === 'output'; @@ -27,8 +35,11 @@ export function _vertexStats( const durationInMillis = vertexStatsBucket.duration_in_millis_total.value; - const processorStats = {}; - const eventsProcessedStats = { + const processorStats: any = {}; + const eventsProcessedStats: { + events_out_per_millisecond: number; + events_in_per_millisecond?: number; + } = { events_out_per_millisecond: eventsOutTotal / timeseriesIntervalInMillis, }; @@ -63,14 +74,14 @@ export function _vertexStats( * @param {Integer} timeseriesIntervalInSeconds The size of each timeseries bucket, in seconds */ export function _enrichStateWithStatsAggregation( - stateDocument, - statsAggregation, - timeseriesIntervalInSeconds + stateDocument: ElasticsearchSource, + statsAggregation: any, + timeseriesIntervalInSeconds: number ) { const logstashState = stateDocument.logstash_state; - const vertices = logstashState.pipeline.representation.graph.vertices; + const vertices = logstashState?.pipeline?.representation?.graph?.vertices ?? []; - const verticesById = {}; + const verticesById: any = {}; vertices.forEach((vertex) => { verticesById[vertex.id] = vertex; vertex.stats = {}; @@ -82,7 +93,7 @@ export function _enrichStateWithStatsAggregation( const verticesWithStatsBuckets = statsAggregation.aggregations.pipelines.scoped.vertices.vertex_id.buckets; - verticesWithStatsBuckets.forEach((vertexStatsBucket) => { + verticesWithStatsBuckets.forEach((vertexStatsBucket: any) => { // Each vertexStats bucket contains a list of stats for a single vertex within a single timeseries interval const vertexId = vertexStatsBucket.key; const vertex = verticesById[vertexId]; @@ -98,13 +109,20 @@ export function _enrichStateWithStatsAggregation( } }); - return stateDocument.logstash_state.pipeline; + return stateDocument.logstash_state?.pipeline; } -export async function getPipeline(req, config, lsIndexPattern, clusterUuid, pipelineId, version) { +export async function getPipeline( + req: LegacyRequest, + config: { get: (key: string) => string | undefined }, + lsIndexPattern: string, + clusterUuid: string, + pipelineId: string, + version: { firstSeen: string; lastSeen: string; hash: string } +) { checkParam(lsIndexPattern, 'lsIndexPattern in getPipeline'); - const options = { + const options: any = { clusterUuid, pipelineId, version, diff --git a/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_state_document.js b/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_state_document.ts similarity index 75% rename from x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_state_document.js rename to x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_state_document.ts index dae8d52e6c57b..96419ceb4cc70 100644 --- a/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_state_document.js +++ b/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_state_document.ts @@ -4,16 +4,22 @@ * you may not use this file except in compliance with the Elastic License. */ +// @ts-ignore import { createQuery } from '../create_query'; +// @ts-ignore import { LogstashMetric } from '../metrics'; -import { get } from 'lodash'; +import { LegacyRequest, ElasticsearchResponse } from '../../types'; export async function getPipelineStateDocument( - req, - logstashIndexPattern, - { clusterUuid, pipelineId, version } + req: LegacyRequest, + logstashIndexPattern: string, + { + clusterUuid, + pipelineId, + version, + }: { clusterUuid: string; pipelineId: string; version: { hash: string } } ) { - const { callWithRequest } = req.server.plugins.elasticsearch.getCluster('monitoring'); + const { callWithRequest } = req.server.plugins?.elasticsearch.getCluster('monitoring'); const filters = [ { term: { 'logstash_state.pipeline.id': pipelineId } }, { term: { 'logstash_state.pipeline.hash': version.hash } }, @@ -43,8 +49,8 @@ export async function getPipelineStateDocument( }, }; - const resp = await callWithRequest(req, 'search', params); + const resp = (await callWithRequest(req, 'search', params)) as ElasticsearchResponse; // Return null if doc not found - return get(resp, 'hits.hits[0]._source', null); + return resp.hits?.hits[0]?._source ?? null; } diff --git a/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_vertex.js b/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_vertex.ts similarity index 73% rename from x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_vertex.js rename to x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_vertex.ts index cdbe26d993f75..b275e4c7bc207 100644 --- a/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_vertex.js +++ b/x-pack/plugins/monitoring/server/lib/logstash/get_pipeline_vertex.ts @@ -6,16 +6,24 @@ import boom from '@hapi/boom'; import { get } from 'lodash'; +// @ts-ignore import { checkParam } from '../error_missing_required'; import { getPipelineStateDocument } from './get_pipeline_state_document'; +// @ts-ignore import { getPipelineVertexStatsAggregation } from './get_pipeline_vertex_stats_aggregation'; +// @ts-ignore import { calculateTimeseriesInterval } from '../calculate_timeseries_interval'; +import { LegacyRequest } from '../../types'; +import { + ElasticsearchSource, + ElasticsearchSourceLogstashPipelineVertex, +} from '../../../common/types/es'; export function _vertexStats( - vertex, - vertexStatsBucket, - totalProcessorsDurationInMillis, - timeseriesIntervalInSeconds + vertex: ElasticsearchSourceLogstashPipelineVertex, + vertexStatsBucket: any, + totalProcessorsDurationInMillis: number, + timeseriesIntervalInSeconds: number ) { const isInput = vertex.plugin_type === 'input'; const isProcessor = vertex.plugin_type === 'filter' || vertex.plugin_type === 'output'; @@ -27,9 +35,12 @@ export function _vertexStats( const durationInMillis = vertexStatsBucket.duration_in_millis_total.value; - const inputStats = {}; - const processorStats = {}; - const eventsProcessedStats = { + const inputStats: any = {}; + const processorStats: any = {}; + const eventsProcessedStats: { + events_out_per_millisecond: number; + events_in_per_millisecond?: number; + } = { events_out_per_millisecond: eventsOutTotal / timeseriesIntervalInMillis, }; @@ -72,21 +83,23 @@ export function _vertexStats( * @param {Integer} timeseriesIntervalInSeconds The size of each timeseries bucket, in seconds */ export function _enrichVertexStateWithStatsAggregation( - stateDocument, - vertexStatsAggregation, - vertexId, - timeseriesIntervalInSeconds + stateDocument: ElasticsearchSource, + vertexStatsAggregation: any, + vertexId: string, + timeseriesIntervalInSeconds: number ) { const logstashState = stateDocument.logstash_state; - const vertices = logstashState.pipeline.representation.graph.vertices; + const vertices = logstashState?.pipeline?.representation?.graph?.vertices; // First, filter out the vertex we care about - const vertex = vertices.find((v) => v.id === vertexId); - vertex.stats = {}; + const vertex = vertices?.find((v) => v.id === vertexId); + if (vertex) { + vertex.stats = {}; + } // Next, iterate over timeseries metrics and attach them to vertex const timeSeriesBuckets = vertexStatsAggregation.aggregations.timeseries.buckets; - timeSeriesBuckets.forEach((timeSeriesBucket) => { + timeSeriesBuckets.forEach((timeSeriesBucket: any) => { // each bucket calculates stats for total pipeline CPU time for the associated timeseries const totalDurationStats = timeSeriesBucket.pipelines.scoped.total_processor_duration_stats; const totalProcessorsDurationInMillis = totalDurationStats.max - totalDurationStats.min; @@ -94,31 +107,35 @@ export function _enrichVertexStateWithStatsAggregation( const timestamp = timeSeriesBucket.key; const vertexStatsBucket = timeSeriesBucket.pipelines.scoped.vertices.vertex_id; - const vertexStats = _vertexStats( - vertex, - vertexStatsBucket, - totalProcessorsDurationInMillis, - timeseriesIntervalInSeconds - ); - Object.keys(vertexStats).forEach((stat) => { - if (!vertex.stats.hasOwnProperty(stat)) { - vertex.stats[stat] = { data: [] }; - } - vertex.stats[stat].data.push([timestamp, vertexStats[stat]]); - }); + if (vertex) { + const vertexStats = _vertexStats( + vertex, + vertexStatsBucket, + totalProcessorsDurationInMillis, + timeseriesIntervalInSeconds + ); + Object.keys(vertexStats).forEach((stat) => { + if (vertex?.stats) { + if (!vertex.stats.hasOwnProperty(stat)) { + vertex.stats[stat] = { data: [] }; + } + vertex.stats[stat].data?.push([timestamp, vertexStats[stat]]); + } + }); + } }); return vertex; } export async function getPipelineVertex( - req, - config, - lsIndexPattern, - clusterUuid, - pipelineId, - version, - vertexId + req: LegacyRequest, + config: { get: (key: string) => string | undefined }, + lsIndexPattern: string, + clusterUuid: string, + pipelineId: string, + version: { hash: string; firstSeen: string; lastSeen: string }, + vertexId: string ) { checkParam(lsIndexPattern, 'lsIndexPattern in getPipeline');