Skip to content

Commit

Permalink
[Monitoring] remove imports from the legacy 'elasticsearch' package (#…
Browse files Browse the repository at this point in the history
…106955) (#107079)

* do not import types from the legacy es package

* adjust tests

Co-authored-by: Mikhail Shustov <restrry@gmail.com>
  • Loading branch information
kibanamachine and mshustov authored Jul 28, 2021
1 parent 8695046 commit 11dc333
Show file tree
Hide file tree
Showing 10 changed files with 56 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { ConfigOptions } from 'elasticsearch';
import { Logger, ICustomClusterClient, ElasticsearchClientConfig } from 'kibana/server';
// @ts-ignore
import { monitoringBulk } from '../kibana_monitoring/lib/monitoring_bulk';
Expand All @@ -18,7 +16,7 @@ import { MonitoringElasticsearchConfig } from '../config';
* Kibana itself is connected to a production cluster.
*/

type ESClusterConfig = MonitoringElasticsearchConfig & Pick<ConfigOptions, 'plugins'>;
type ESClusterConfig = MonitoringElasticsearchConfig;

export function instantiateClient(
elasticsearchConfig: MonitoringElasticsearchConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@
* 2.0.
*/

import { errors } from 'elasticsearch';
import { forbidden, unauthorized } from '@hapi/boom';
import { isAuthError, handleAuthError } from './auth_errors';

describe('Error handling for 401/403 errors', () => {
it('ignores an unknown type', () => {
const err = new errors.Generic();
const err = new Error();
expect(isAuthError(err)).toBe(false);
});

Expand Down
16 changes: 8 additions & 8 deletions x-pack/plugins/monitoring/server/lib/errors/known_errors.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@
* 2.0.
*/

import { errors } from 'elasticsearch';
import { errors } from '@elastic/elasticsearch';
import { isKnownError, handleKnownError } from './known_errors';
import { MonitoringLicenseError } from './custom_errors';

// TODO: tests were not running and are not up to date
describe.skip('Error handling for 503 errors', () => {
it('ignores an unknown type', () => {
const err = new errors.Generic();
const err = new Error();
expect(isKnownError(err)).toBe(false);
});

it('handles ConnectionFault', () => {
const err = new errors.ConnectionFault();
it('handles ConnectionError', () => {
const err = new errors.ConnectionError();
expect(isKnownError(err)).toBe(true);

const wrappedErr = handleKnownError(err);
Expand All @@ -41,8 +41,8 @@ describe.skip('Error handling for 503 errors', () => {
});
});

it('handles NoConnections', () => {
const err = new errors.NoConnections();
it('handles NoLivingConnectionsError', () => {
const err = new errors.NoLivingConnectionsError();
expect(isKnownError(err)).toBe(true);

const wrappedErr = handleKnownError(err);
Expand All @@ -66,8 +66,8 @@ describe.skip('Error handling for 503 errors', () => {
});
});

it('handles RequestTimeout', () => {
const err = new errors.RequestTimeout();
it('handles TimeoutError', () => {
const err = new errors.TimeoutError();
expect(isKnownError(err)).toBe(true);

const wrappedErr = handleKnownError(err);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
*/

import { get } from 'lodash';
import { SearchResponse } from 'elasticsearch';
import { ElasticsearchClient } from 'kibana/server';
import { estypes } from '@elastic/elasticsearch';
import { createQuery } from './create_query';
Expand Down Expand Up @@ -156,7 +155,7 @@ export interface BeatsArchitecture {
* @param {Object} clusterModuleSets - the object keyed by cluster UUIDs to count the unique modules
*/
export function processResults(
results: SearchResponse<BeatsStats>,
results: estypes.SearchResponse<BeatsStats>,
{
clusters,
clusterHostSets,
Expand All @@ -167,7 +166,7 @@ export function processResults(
) {
const currHits = results?.hits?.hits || [];
currHits.forEach((hit) => {
const clusterUuid = hit._source.cluster_uuid;
const clusterUuid = hit._source!.cluster_uuid;
if (clusters[clusterUuid] === undefined) {
clusters[clusterUuid] = getBaseStats();
clusterHostSets[clusterUuid] = new Set();
Expand All @@ -178,30 +177,30 @@ export function processResults(

const processBeatsStatsResults = () => {
const { versions, types, outputs } = clusters[clusterUuid];
const thisVersion = hit._source.beats_stats?.beat?.version;
const thisVersion = hit._source?.beats_stats?.beat?.version;
if (thisVersion !== undefined) {
const thisVersionAccum = versions[thisVersion] || 0;
versions[thisVersion] = thisVersionAccum + 1;
}

const thisType = hit._source.beats_stats?.beat?.type;
const thisType = hit._source?.beats_stats?.beat?.type;
if (thisType !== undefined) {
const thisTypeAccum = types[thisType] || 0;
types[thisType] = thisTypeAccum + 1;
}

const thisOutput = hit._source.beats_stats?.metrics?.libbeat?.output?.type;
const thisOutput = hit._source?.beats_stats?.metrics?.libbeat?.output?.type;
if (thisOutput !== undefined) {
const thisOutputAccum = outputs[thisOutput] || 0;
outputs[thisOutput] = thisOutputAccum + 1;
}

const thisEvents = hit._source.beats_stats?.metrics?.libbeat?.pipeline?.events?.published;
const thisEvents = hit._source?.beats_stats?.metrics?.libbeat?.pipeline?.events?.published;
if (thisEvents !== undefined) {
clusters[clusterUuid].eventsPublished += thisEvents;
}

const thisHost = hit._source.beats_stats?.beat?.host;
const thisHost = hit._source?.beats_stats?.beat?.host;
if (thisHost !== undefined) {
const hostsMap = clusterHostSets[clusterUuid];
hostsMap.add(thisHost);
Expand All @@ -210,29 +209,29 @@ export function processResults(
};

const processBeatsStateResults = () => {
const stateInput = hit._source.beats_state?.state?.input;
const stateInput = hit._source?.beats_state?.state?.input;
if (stateInput !== undefined) {
const inputSet = clusterInputSets[clusterUuid];
stateInput.names.forEach((name) => inputSet.add(name));
clusters[clusterUuid].input.names = Array.from(inputSet);
clusters[clusterUuid].input.count += stateInput.count;
}

const stateModule = hit._source.beats_state?.state?.module;
const statsType = hit._source.beats_state?.beat?.type;
const stateModule = hit._source?.beats_state?.state?.module;
const statsType = hit._source?.beats_state?.beat?.type;
if (stateModule !== undefined) {
const moduleSet = clusterModuleSets[clusterUuid];
stateModule.names.forEach((name) => moduleSet.add(statsType + '.' + name));
clusters[clusterUuid].module.names = Array.from(moduleSet);
clusters[clusterUuid].module.count += stateModule.count;
}

const stateQueue = hit._source.beats_state?.state?.queue?.name;
const stateQueue = hit._source?.beats_state?.state?.queue?.name;
if (stateQueue !== undefined) {
clusters[clusterUuid].queue[stateQueue] += 1;
}

const heartbeatState = hit._source.beats_state?.state?.heartbeat;
const heartbeatState = hit._source?.beats_state?.state?.heartbeat;
if (heartbeatState !== undefined) {
if (!clusters[clusterUuid].hasOwnProperty('heartbeat')) {
clusters[clusterUuid].heartbeat = {
Expand Down Expand Up @@ -264,7 +263,7 @@ export function processResults(
}
}

const functionbeatState = hit._source.beats_state?.state?.functionbeat;
const functionbeatState = hit._source?.beats_state?.state?.functionbeat;
if (functionbeatState !== undefined) {
if (!clusters[clusterUuid].hasOwnProperty('functionbeat')) {
clusters[clusterUuid].functionbeat = {
Expand All @@ -278,7 +277,7 @@ export function processResults(
functionbeatState.functions?.count || 0;
}

const stateHost = hit._source.beats_state?.state?.host;
const stateHost = hit._source?.beats_state?.state?.host;
if (stateHost !== undefined) {
const hostMap = clusterArchitectureMaps[clusterUuid];
const hostKey = `${stateHost.architecture}/${stateHost.os.platform}`;
Expand Down Expand Up @@ -362,11 +361,11 @@ async function fetchBeatsByType(
},
};

const { body: results } = await callCluster.search(params);
const { body: results } = await callCluster.search<BeatsStats>(params);
const hitsLength = results?.hits?.hits.length || 0;
if (hitsLength > 0) {
// further augment the clusters object with more stats
processResults(results as SearchResponse<BeatsStats>, options);
processResults(results, options);

if (hitsLength === HITS_SIZE) {
// call recursively
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
* 2.0.
*/

import { SearchResponse } from 'elasticsearch';
import { ElasticsearchClient } from 'kibana/server';
import { estypes } from '@elastic/elasticsearch';
import { INDEX_PATTERN_ELASTICSEARCH } from '../../common/constants';
Expand Down Expand Up @@ -70,8 +69,8 @@ export async function fetchElasticsearchStats(
},
};

const { body: response } = await callCluster.search(params);
return response as SearchResponse<ESClusterStats>;
const { body: response } = await callCluster.search<ESClusterStats>(params);
return response;
}

export interface ESClusterStats {
Expand All @@ -86,8 +85,8 @@ export interface ESClusterStats {
/**
* Extract the cluster stats for each cluster.
*/
export function handleElasticsearchStats(response: SearchResponse<ESClusterStats>) {
export function handleElasticsearchStats(response: estypes.SearchResponse<ESClusterStats>) {
const clusters = response.hits?.hits || [];

return clusters.map((cluster) => cluster._source);
return clusters.map((cluster) => cluster._source!);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
*/

import { get } from 'lodash';
import { SearchResponse } from 'elasticsearch';
import { ElasticsearchClient } from 'kibana/server';
import { estypes } from '@elastic/elasticsearch';
import { createQuery } from './create_query';
Expand Down Expand Up @@ -149,14 +148,14 @@ interface OSData {
* Returns a map of the Cluster UUID to an {@link Object} containing the {@code count} and {@code versions} {@link Map}
*/
function groupInstancesByCluster<T extends { cluster_uuid?: string }>(
instances: Array<{ _source: T }>,
instances: Array<{ _source?: T }>,
product: string
) {
const clusterMap = new Map<string, InternalClusterMap>();

// hits are sorted arbitrarily by product UUID
instances.map((instance) => {
const clusterUuid = instance._source.cluster_uuid;
const clusterUuid = instance._source!.cluster_uuid;
const version: string | undefined = get(
instance,
`_source.${product}_stats.${product}.version`
Expand Down Expand Up @@ -276,7 +275,7 @@ export async function fetchHighLevelStats<
end: string,
product: string,
maxBucketSize: number
): Promise<SearchResponse<T>> {
): Promise<estypes.SearchResponse<T>> {
const isKibanaIndex = product === KIBANA_SYSTEM_ID;
const filters: object[] = [{ terms: { cluster_uuid: clusterUuids } }];

Expand Down Expand Up @@ -332,12 +331,12 @@ export async function fetchHighLevelStats<
},
};

const { body: response } = await callCluster.search(params, {
const { body: response } = await callCluster.search<T>(params, {
headers: {
'X-QUERY-SOURCE': TELEMETRY_QUERY_SOURCE,
},
});
return response as SearchResponse<T>;
return response;
}

/**
Expand All @@ -349,7 +348,7 @@ export async function fetchHighLevelStats<
* Returns an object keyed by the cluster UUIDs to make grouping easier.
*/
export function handleHighLevelStatsResponse(
response: SearchResponse<{ cluster_uuid?: string }>,
response: estypes.SearchResponse<{ cluster_uuid?: string }>,
product: string
): ClustersHighLevelStats {
const instances = response.hits?.hits || [];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,19 @@
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import type { estypes } from '@elastic/elasticsearch';
import {
getUsageStats,
combineStats,
rollUpTotals,
ensureTimeSpan,
KibanaUsageStats,
} from './get_kibana_stats';
import { SearchResponse } from 'elasticsearch';

describe('Get Kibana Stats', () => {
describe('Make a map of usage stats for each cluster', () => {
test('passes through if there are no kibana instances', () => {
const rawStats = {} as SearchResponse<KibanaUsageStats>;
const rawStats = {} as estypes.SearchResponse<KibanaUsageStats>;
expect(getUsageStats(rawStats)).toStrictEqual({});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import moment from 'moment';
import { isEmpty } from 'lodash';
import { SearchResponse } from 'elasticsearch';
import type { estypes } from '@elastic/elasticsearch';
import { ElasticsearchClient } from 'kibana/server';
import { KIBANA_SYSTEM_ID, TELEMETRY_COLLECTION_INTERVAL } from '../../common/constants';
import {
Expand Down Expand Up @@ -70,14 +70,14 @@ export interface KibanaStats {
/*
* @param {Object} rawStats
*/
export function getUsageStats(rawStats: SearchResponse<KibanaUsageStats>) {
export function getUsageStats(rawStats: estypes.SearchResponse<KibanaUsageStats>) {
const clusterIndexCache = new Set();
const rawStatsHits = rawStats.hits?.hits || [];

// get usage stats per cluster / .kibana index
return rawStatsHits.reduce((accum, currInstance) => {
const clusterUuid = currInstance._source.cluster_uuid;
const currUsage = currInstance._source.kibana_stats?.usage || {};
const clusterUuid = currInstance._source!.cluster_uuid;
const currUsage = currInstance._source!.kibana_stats?.usage || {};
const clusterIndexCombination = clusterUuid + currUsage.index;

// return early if usage data is empty or if this cluster/index has already been processed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
* 2.0.
*/

import { SearchResponse } from 'elasticsearch';
import { ElasticsearchClient } from 'kibana/server';
import { estypes } from '@elastic/elasticsearch';
import { ESLicense } from '../../../telemetry_collection_xpack/server';
Expand Down Expand Up @@ -60,8 +59,8 @@ export async function fetchLicenses(
},
};

const { body: response } = await callCluster.search(params);
return response as SearchResponse<ESClusterStatsWithLicense>;
const { body: response } = await callCluster.search<ESClusterStatsWithLicense>(params);
return response;
}

export interface ESClusterStatsWithLicense {
Expand All @@ -73,13 +72,13 @@ export interface ESClusterStatsWithLicense {
/**
* Extract the cluster stats for each cluster.
*/
export function handleLicenses(response: SearchResponse<ESClusterStatsWithLicense>) {
export function handleLicenses(response: estypes.SearchResponse<ESClusterStatsWithLicense>) {
const clusters = response.hits?.hits || [];

return clusters.reduce(
(acc, { _source }) => ({
...acc,
[_source.cluster_uuid]: _source.license,
[_source!.cluster_uuid]: _source!.license,
}),
{}
);
Expand Down
Loading

0 comments on commit 11dc333

Please sign in to comment.