Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Logs UI] Logs overview queries for the observability dashboard #70413

Merged
merged 16 commits into from
Jul 3, 2020
Merged
7 changes: 7 additions & 0 deletions x-pack/plugins/infra/common/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

export const DEFAULT_SOURCE_ID = 'default';
268 changes: 199 additions & 69 deletions x-pack/plugins/infra/public/utils/logs_overview_fetchers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,90 +4,220 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { InfraClientCoreSetup } from '../types';
import { LogsFetchDataResponse } from '../../../observability/public';
import { encode } from 'rison-node';
import { i18n } from '@kbn/i18n';
import { DEFAULT_SOURCE_ID } from '../../common/constants';
import { InfraClientCoreSetup, InfraClientStartDeps } from '../types';
import {
FetchData,
LogsFetchDataResponse,
HasData,
FetchDataParams,
} from '../../../observability/public';
import { callFetchLogSourceConfigurationAPI } from '../containers/logs/log_source/api/fetch_log_source_configuration';
import { callFetchLogSourceStatusAPI } from '../containers/logs/log_source/api/fetch_log_source_status';

export function getLogsHasDataFetcher(getStartServices: InfraClientCoreSetup['getStartServices']) {
return async () => {
// if you need the data plugin, this is how you get it
// const [, startPlugins] = await getStartServices();
// const { data } = startPlugins;
interface StatsAggregation {
buckets: Array<{ key: string; doc_count: number }>;
}

interface SeriesAggregation {
buckets: Array<{
key_as_string: string;
key: number;
doc_count: number;
dataset: StatsAggregation;
}>;
}

interface LogParams {
index: string;
timestampField: string;
}

// if you need a core dep, we need to pass in more than just getStartServices
type StatsAndSeries = Pick<LogsFetchDataResponse, 'stats' | 'series'>;

// perform query
return true;
export function getLogsHasDataFetcher(
getStartServices: InfraClientCoreSetup['getStartServices']
): HasData {
return async () => {
const [core] = await getStartServices();
const sourceStatus = await callFetchLogSourceStatusAPI(DEFAULT_SOURCE_ID, core.http.fetch);
return sourceStatus.data.logIndicesExist;
};
}

export function getLogsOverviewDataFetcher(
getStartServices: InfraClientCoreSetup['getStartServices']
) {
return async (): Promise<LogsFetchDataResponse> => {
// if you need the data plugin, this is how you get it
// const [, startPlugins] = await getStartServices();
// const { data } = startPlugins;
): FetchData<LogsFetchDataResponse> {
return async (params) => {
const [core, startPlugins] = await getStartServices();
const { data } = startPlugins;

const sourceConfiguration = await callFetchLogSourceConfigurationAPI(
DEFAULT_SOURCE_ID,
core.http.fetch
);

const { stats, series } = await fetchLogsOverview(
{
index: sourceConfiguration.data.configuration.logAlias,
timestampField: sourceConfiguration.data.configuration.fields.timestamp,
},
params,
data
);

// if you need a core dep, we need to pass in more than just getStartServices
const timeSpanInMinutes =
(Date.parse(params.endTime).valueOf() - Date.parse(params.startTime).valueOf()) / (1000 * 60);

// perform query
return {
title: 'Log rate',
appLink: 'TBD', // TODO: what format should this be in, relative I assume?
stats: {
nginx: {
type: 'number',
label: 'nginx',
value: 345341,
},
'elasticsearch.audit': {
type: 'number',
label: 'elasticsearch.audit',
value: 164929,
title: i18n.translate('xpack.infra.logs.logOverview.logOverviewTitle', {
defaultMessage: 'Logs',
}),
appLink: `/app/logs/stream?logPosition=(end:${encode(params.endTime)},start:${encode(
params.startTime
)})`,
stats: normalizeStats(stats, timeSpanInMinutes),
series: normalizeSeries(series),
};
};
}

async function fetchLogsOverview(
logParams: LogParams,
params: FetchDataParams,
dataPlugin: InfraClientStartDeps['data']
): Promise<StatsAndSeries> {
const esSearcher = dataPlugin.search.getSearchStrategy('es');
return new Promise((resolve, reject) => {
esSearcher
.search({
params: {
index: logParams.index,
body: {
size: 0,
query: buildLogOverviewQuery(logParams, params),
aggs: buildLogOverviewAggregations(logParams, params),
},
},
'haproxy.log': {
type: 'number',
label: 'haproxy.log',
value: 51101,
})
.subscribe(
(response) => {
if (response.rawResponse.aggregations) {
resolve(processLogsOverviewAggregations(response.rawResponse.aggregations));
} else {
resolve({ stats: {}, series: {} });
}
},
(error) => reject(error)
);
});
}

function buildLogOverviewQuery(logParams: LogParams, params: FetchDataParams) {
return {
range: {
[logParams.timestampField]: {
gt: params.startTime,
lte: params.endTime,
format: 'strict_date_optional_time',
},
// Note: My understanding is that these series coordinates will be
// combined into objects that look like:
// { x: timestamp, y: value, g: label (e.g. nginx) }
// so they fit the stacked bar chart API
// https://elastic.github.io/elastic-charts/?path=/story/bar-chart--stacked-with-axis-and-legend
series: {
nginx: {
label: 'nginx',
coordinates: [
{ x: 1593000000000, y: 10014 },
{ x: 1593000900000, y: 12827 },
{ x: 1593001800000, y: 2946 },
{ x: 1593002700000, y: 14298 },
{ x: 1593003600000, y: 4096 },
],
},
'elasticsearch.audit': {
label: 'elasticsearch.audit',
coordinates: [
{ x: 1593000000000, y: 5676 },
{ x: 1593000900000, y: 6783 },
{ x: 1593001800000, y: 2394 },
{ x: 1593002700000, y: 4554 },
{ x: 1593003600000, y: 5659 },
],
},
'haproxy.log': {
label: 'haproxy.log',
coordinates: [
{ x: 1593000000000, y: 9085 },
{ x: 1593000900000, y: 9002 },
{ x: 1593001800000, y: 3940 },
{ x: 1593002700000, y: 5451 },
{ x: 1593003600000, y: 9133 },
],
},
};
}

function buildLogOverviewAggregations(logParams: LogParams, params: FetchDataParams) {
return {
stats: {
terms: {
field: 'event.dataset',
size: 4,
},
},
series: {
date_histogram: {
field: logParams.timestampField,
fixed_interval: params.bucketSize,
},
aggs: {
dataset: {
terms: {
field: 'event.dataset',
size: 4,
},
},
},
};
},
};
}

function processLogsOverviewAggregations(aggregations: {
stats: StatsAggregation;
series: SeriesAggregation;
}): StatsAndSeries {
const processedStats = aggregations.stats.buckets.reduce<StatsAndSeries['stats']>(
(result, bucket) => {
result[bucket.key] = {
type: 'number',
label: bucket.key,
value: bucket.doc_count,
};

return result;
},
{}
);

const processedSeries = aggregations.series.buckets.reduce<StatsAndSeries['series']>(
(result, bucket) => {
const x = bucket.key; // the timestamp of the bucket
bucket.dataset.buckets.forEach((b) => {
const label = b.key;
result[label] = result[label] || { label, coordinates: [] };
result[label].coordinates.push({ x, y: b.doc_count });
});

return result;
},
{}
);

return {
stats: processedStats,
series: processedSeries,
};
}

function normalizeStats(
stats: LogsFetchDataResponse['stats'],
timeSpanInMinutes: number
): LogsFetchDataResponse['stats'] {
return Object.keys(stats).reduce<LogsFetchDataResponse['stats']>((normalized, key) => {
normalized[key] = {
...stats[key],
value: stats[key].value / timeSpanInMinutes,
};
return normalized;
}, {});
}

function normalizeSeries(series: LogsFetchDataResponse['series']): LogsFetchDataResponse['series'] {
const seriesKeys = Object.keys(series);
const timestamps = seriesKeys.flatMap((key) => series[key].coordinates.map((c) => c.x));
const [first, second] = [...new Set(timestamps)].sort();
const timeSpanInMinutes = (second - first) / (1000 * 60);

return seriesKeys.reduce<LogsFetchDataResponse['series']>((normalized, key) => {
normalized[key] = {
...series[key],
coordinates: series[key].coordinates.map((c) => {
if (c.y) {
return { ...c, y: c.y / timeSpanInMinutes };
}
return c;
}),
};
return normalized;
}, {});
}
75 changes: 75 additions & 0 deletions x-pack/plugins/infra/public/utils/logs_overview_fetches.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { coreMock } from 'src/core/public/mocks';
import { dataPluginMock } from 'src/plugins/data/public/mocks';
import { CoreStart } from 'kibana/public';
import { getLogsHasDataFetcher } from './logs_overview_fetchers';
import { InfraClientStartDeps, InfraClientStartExports } from '../types';
import { callFetchLogSourceStatusAPI } from '../containers/logs/log_source/api/fetch_log_source_status';

// Note
// Calls to `.mock*` functions will fail the typecheck because how jest does the mocking.
// The calls will be preluded with a `@ts-expect-error`
jest.mock('../containers/logs/log_source/api/fetch_log_source_status');

function setup() {
const core = coreMock.createStart();
const data = dataPluginMock.createStartContract();

const mockedGetStartServices = jest.fn(() => {
const deps = { data };
return Promise.resolve([
core as CoreStart,
deps as InfraClientStartDeps,
void 0 as InfraClientStartExports,
]) as Promise<[CoreStart, InfraClientStartDeps, InfraClientStartExports]>;
});
return { core, mockedGetStartServices };
}

describe('Logs UI Observability Homepage Functions', () => {
describe('getLogsHasDataFetcher()', () => {
beforeEach(() => {
// @ts-expect-error
callFetchLogSourceStatusAPI.mockReset();
});
it('should return true when some index is present', async () => {
const { mockedGetStartServices } = setup();

// @ts-expect-error
callFetchLogSourceStatusAPI.mockResolvedValue({
data: { logIndexFields: [], logIndicesExist: true },
});

const hasData = getLogsHasDataFetcher(mockedGetStartServices);
const response = await hasData();

expect(callFetchLogSourceStatusAPI).toHaveBeenCalledTimes(1);
expect(response).toBe(true);
});

it('should return false when no index is present', async () => {
const { mockedGetStartServices } = setup();

// @ts-expect-error
callFetchLogSourceStatusAPI.mockResolvedValue({
data: { logIndexFields: [], logIndicesExist: false },
});

const hasData = getLogsHasDataFetcher(mockedGetStartServices);
const response = await hasData();

expect(callFetchLogSourceStatusAPI).toHaveBeenCalledTimes(1);
expect(response).toBe(false);
});
});

describe('getLogsOverviewDataFetcher()', () => {
it.skip('should work', async () => {
// Pending
});
});
});