Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Monitoring] Convert Logstash-related server files that read from _source to typescript #86787

Merged
merged 4 commits into from
Jan 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions x-pack/plugins/monitoring/common/types/es.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,40 @@ export interface ElasticsearchSourceKibanaStats {
};
}

export interface ElasticsearchSourceLogstashPipelineVertex {
id: string;
plugin_type: string;
stats?: {
[key: string]: {
data?: any[];
};
};
}

export interface ElasticsearchSource {
timestamp: string;
kibana_stats?: ElasticsearchSourceKibanaStats;
logstash_state?: {
pipeline?: {
representation?: {
graph?: {
vertices?: ElasticsearchSourceLogstashPipelineVertex[];
};
};
};
};
logstash_stats?: {
timestamp?: string;
logstash?: {};
events?: {};
reloads?: {};
queue?: {
type?: string;
};
jvm?: {
uptime_in_millis?: number;
};
};
beats_stats?: {
timestamp?: string;
beat?: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,31 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { get, merge } from 'lodash';
import { merge } from 'lodash';
// @ts-ignore
import { checkParam } from '../error_missing_required';
import { calculateAvailability } from './../calculate_availability';
// @ts-ignore
import { calculateAvailability } from '../calculate_availability';
import { LegacyRequest, ElasticsearchResponse } from '../../types';

export function handleResponse(resp) {
const source = get(resp, 'hits.hits[0]._source.logstash_stats');
const logstash = get(source, 'logstash');
export function handleResponse(resp: ElasticsearchResponse) {
const source = resp.hits?.hits[0]?._source?.logstash_stats;
const logstash = source?.logstash;
const info = merge(logstash, {
availability: calculateAvailability(get(source, 'timestamp')),
events: get(source, 'events'),
reloads: get(source, 'reloads'),
queue_type: get(source, 'queue.type'),
uptime: get(source, 'jvm.uptime_in_millis'),
availability: calculateAvailability(source?.timestamp),
events: source?.events,
reloads: source?.reloads,
queue_type: source?.queue?.type,
uptime: source?.jvm?.uptime_in_millis,
});
return info;
}

export function getNodeInfo(req, lsIndexPattern, { clusterUuid, logstashUuid }) {
export function getNodeInfo(
req: LegacyRequest,
lsIndexPattern: string,
{ clusterUuid, logstashUuid }: { clusterUuid: string; logstashUuid: string }
) {
checkParam(lsIndexPattern, 'lsIndexPattern in getNodeInfo');

const params = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,24 @@

import boom from '@hapi/boom';
import { get } from 'lodash';
// @ts-ignore
import { checkParam } from '../error_missing_required';
import { getPipelineStateDocument } from './get_pipeline_state_document';
// @ts-ignore
import { getPipelineStatsAggregation } from './get_pipeline_stats_aggregation';
// @ts-ignore
import { calculateTimeseriesInterval } from '../calculate_timeseries_interval';
import { LegacyRequest } from '../../types';
import {
ElasticsearchSource,
ElasticsearchSourceLogstashPipelineVertex,
} from '../../../common/types/es';

export function _vertexStats(
vertex,
vertexStatsBucket,
totalProcessorsDurationInMillis,
timeseriesIntervalInSeconds
vertex: ElasticsearchSourceLogstashPipelineVertex,
vertexStatsBucket: any,
totalProcessorsDurationInMillis: number,
timeseriesIntervalInSeconds: number
) {
const isInput = vertex.plugin_type === 'input';
const isProcessor = vertex.plugin_type === 'filter' || vertex.plugin_type === 'output';
Expand All @@ -27,8 +35,11 @@ export function _vertexStats(

const durationInMillis = vertexStatsBucket.duration_in_millis_total.value;

const processorStats = {};
const eventsProcessedStats = {
const processorStats: any = {};
const eventsProcessedStats: {
events_out_per_millisecond: number;
events_in_per_millisecond?: number;
} = {
events_out_per_millisecond: eventsOutTotal / timeseriesIntervalInMillis,
};

Expand Down Expand Up @@ -63,14 +74,14 @@ export function _vertexStats(
* @param {Integer} timeseriesIntervalInSeconds The size of each timeseries bucket, in seconds
*/
export function _enrichStateWithStatsAggregation(
stateDocument,
statsAggregation,
timeseriesIntervalInSeconds
stateDocument: ElasticsearchSource,
statsAggregation: any,
timeseriesIntervalInSeconds: number
) {
const logstashState = stateDocument.logstash_state;
const vertices = logstashState.pipeline.representation.graph.vertices;
const vertices = logstashState?.pipeline?.representation?.graph?.vertices ?? [];

const verticesById = {};
const verticesById: any = {};
vertices.forEach((vertex) => {
verticesById[vertex.id] = vertex;
vertex.stats = {};
Expand All @@ -82,7 +93,7 @@ export function _enrichStateWithStatsAggregation(

const verticesWithStatsBuckets =
statsAggregation.aggregations.pipelines.scoped.vertices.vertex_id.buckets;
verticesWithStatsBuckets.forEach((vertexStatsBucket) => {
verticesWithStatsBuckets.forEach((vertexStatsBucket: any) => {
// Each vertexStats bucket contains a list of stats for a single vertex within a single timeseries interval
const vertexId = vertexStatsBucket.key;
const vertex = verticesById[vertexId];
Expand All @@ -98,13 +109,20 @@ export function _enrichStateWithStatsAggregation(
}
});

return stateDocument.logstash_state.pipeline;
return stateDocument.logstash_state?.pipeline;
}

export async function getPipeline(req, config, lsIndexPattern, clusterUuid, pipelineId, version) {
export async function getPipeline(
req: LegacyRequest,
config: { get: (key: string) => string | undefined },
lsIndexPattern: string,
clusterUuid: string,
pipelineId: string,
version: { firstSeen: string; lastSeen: string; hash: string }
) {
checkParam(lsIndexPattern, 'lsIndexPattern in getPipeline');

const options = {
const options: any = {
clusterUuid,
pipelineId,
version,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,22 @@
* you may not use this file except in compliance with the Elastic License.
*/

// @ts-ignore
import { createQuery } from '../create_query';
// @ts-ignore
import { LogstashMetric } from '../metrics';
import { get } from 'lodash';
import { LegacyRequest, ElasticsearchResponse } from '../../types';

export async function getPipelineStateDocument(
req,
logstashIndexPattern,
{ clusterUuid, pipelineId, version }
req: LegacyRequest,
logstashIndexPattern: string,
{
clusterUuid,
pipelineId,
version,
}: { clusterUuid: string; pipelineId: string; version: { hash: string } }
) {
const { callWithRequest } = req.server.plugins.elasticsearch.getCluster('monitoring');
const { callWithRequest } = req.server.plugins?.elasticsearch.getCluster('monitoring');
const filters = [
{ term: { 'logstash_state.pipeline.id': pipelineId } },
{ term: { 'logstash_state.pipeline.hash': version.hash } },
Expand Down Expand Up @@ -43,8 +49,8 @@ export async function getPipelineStateDocument(
},
};

const resp = await callWithRequest(req, 'search', params);
const resp = (await callWithRequest(req, 'search', params)) as ElasticsearchResponse;

// Return null if doc not found
return get(resp, 'hits.hits[0]._source', null);
return resp.hits?.hits[0]?._source ?? null;
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,24 @@

import boom from '@hapi/boom';
import { get } from 'lodash';
// @ts-ignore
import { checkParam } from '../error_missing_required';
import { getPipelineStateDocument } from './get_pipeline_state_document';
// @ts-ignore
import { getPipelineVertexStatsAggregation } from './get_pipeline_vertex_stats_aggregation';
// @ts-ignore
import { calculateTimeseriesInterval } from '../calculate_timeseries_interval';
import { LegacyRequest } from '../../types';
import {
ElasticsearchSource,
ElasticsearchSourceLogstashPipelineVertex,
} from '../../../common/types/es';

export function _vertexStats(
vertex,
vertexStatsBucket,
totalProcessorsDurationInMillis,
timeseriesIntervalInSeconds
vertex: ElasticsearchSourceLogstashPipelineVertex,
vertexStatsBucket: any,
totalProcessorsDurationInMillis: number,
timeseriesIntervalInSeconds: number
) {
const isInput = vertex.plugin_type === 'input';
const isProcessor = vertex.plugin_type === 'filter' || vertex.plugin_type === 'output';
Expand All @@ -27,9 +35,12 @@ export function _vertexStats(

const durationInMillis = vertexStatsBucket.duration_in_millis_total.value;

const inputStats = {};
const processorStats = {};
const eventsProcessedStats = {
const inputStats: any = {};
const processorStats: any = {};
const eventsProcessedStats: {
events_out_per_millisecond: number;
events_in_per_millisecond?: number;
} = {
events_out_per_millisecond: eventsOutTotal / timeseriesIntervalInMillis,
};

Expand Down Expand Up @@ -72,53 +83,59 @@ export function _vertexStats(
* @param {Integer} timeseriesIntervalInSeconds The size of each timeseries bucket, in seconds
*/
export function _enrichVertexStateWithStatsAggregation(
stateDocument,
vertexStatsAggregation,
vertexId,
timeseriesIntervalInSeconds
stateDocument: ElasticsearchSource,
vertexStatsAggregation: any,
vertexId: string,
timeseriesIntervalInSeconds: number
) {
const logstashState = stateDocument.logstash_state;
const vertices = logstashState.pipeline.representation.graph.vertices;
const vertices = logstashState?.pipeline?.representation?.graph?.vertices;

// First, filter out the vertex we care about
const vertex = vertices.find((v) => v.id === vertexId);
vertex.stats = {};
const vertex = vertices?.find((v) => v.id === vertexId);
if (vertex) {
vertex.stats = {};
}

// Next, iterate over timeseries metrics and attach them to vertex
const timeSeriesBuckets = vertexStatsAggregation.aggregations.timeseries.buckets;
timeSeriesBuckets.forEach((timeSeriesBucket) => {
timeSeriesBuckets.forEach((timeSeriesBucket: any) => {
// each bucket calculates stats for total pipeline CPU time for the associated timeseries
const totalDurationStats = timeSeriesBucket.pipelines.scoped.total_processor_duration_stats;
const totalProcessorsDurationInMillis = totalDurationStats.max - totalDurationStats.min;

const timestamp = timeSeriesBucket.key;

const vertexStatsBucket = timeSeriesBucket.pipelines.scoped.vertices.vertex_id;
const vertexStats = _vertexStats(
vertex,
vertexStatsBucket,
totalProcessorsDurationInMillis,
timeseriesIntervalInSeconds
);
Object.keys(vertexStats).forEach((stat) => {
if (!vertex.stats.hasOwnProperty(stat)) {
vertex.stats[stat] = { data: [] };
}
vertex.stats[stat].data.push([timestamp, vertexStats[stat]]);
});
if (vertex) {
const vertexStats = _vertexStats(
vertex,
vertexStatsBucket,
totalProcessorsDurationInMillis,
timeseriesIntervalInSeconds
);
Object.keys(vertexStats).forEach((stat) => {
if (vertex?.stats) {
if (!vertex.stats.hasOwnProperty(stat)) {
vertex.stats[stat] = { data: [] };
}
vertex.stats[stat].data?.push([timestamp, vertexStats[stat]]);
}
});
}
});

return vertex;
}

export async function getPipelineVertex(
req,
config,
lsIndexPattern,
clusterUuid,
pipelineId,
version,
vertexId
req: LegacyRequest,
config: { get: (key: string) => string | undefined },
lsIndexPattern: string,
clusterUuid: string,
pipelineId: string,
version: { hash: string; firstSeen: string; lastSeen: string },
vertexId: string
) {
checkParam(lsIndexPattern, 'lsIndexPattern in getPipeline');

Expand Down