Skip to content

Commit

Permalink
[Monitoring] Rename TypeCollector to CollectorSet for semantics (#18987
Browse files Browse the repository at this point in the history
)

* [Monitoring] Rename TypeCollector to CollectorSet for semantics

* boring test changes

* usage collector renames

* rename initKibanaMonitoring => createCollectorSet

* fix lint
  • Loading branch information
tsullivan authored May 15, 2018
1 parent 8b44aa8 commit 0940f57
Show file tree
Hide file tree
Showing 11 changed files with 72 additions and 71 deletions.
6 changes: 3 additions & 3 deletions x-pack/plugins/monitoring/init.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import { requireUIRoutes } from './server/routes';
import { instantiateClient } from './server/es_client/instantiate_client';
import { initMonitoringXpackInfo } from './server/init_monitoring_xpack_info';
import { initKibanaMonitoring } from './server/kibana_monitoring';
import { createCollectorSet } from './server/kibana_monitoring';

/**
* Initialize the Kibana Monitoring plugin by starting up asynchronous server tasks
Expand Down Expand Up @@ -35,8 +35,8 @@ export const init = (monitoringPlugin, server) => {
}

if (config.get('xpack.monitoring.kibana.collection.enabled')) {
const collector = initKibanaMonitoring(monitoringPlugin.kbnServer, server); // instantiate an object for collecting/sending metrics and usage stats
server.expose('typeCollector', collector); // expose the collector object on the server. other plugins will call typeCollector.register(typeDefinition) to define their own collection
const collectorSet = createCollectorSet(monitoringPlugin.kbnServer, server); // instantiate an object for collecting/sending metrics and usage stats
server.expose('collectorSet', collectorSet); // expose the collector set object on the server. other plugins will call statsCollectors.register(collector) to define their own collection
}

monitoringPlugin.status.green('Ready');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@

import expect from 'expect.js';
import sinon from 'sinon';
import { getUsageCollector } from '../get_usage_collector';
import { getKibanaUsageCollector } from '../get_kibana_usage_collector';
import { callClusterFactory } from '../../../../../xpack_main';

describe('getUsageCollector', () => {
describe('getKibanaUsageCollector', () => {
let clusterStub;
let serverStub;
let callClusterStub;
Expand All @@ -29,7 +29,7 @@ describe('getUsageCollector', () => {
});

it('correctly defines usage collector.', () => {
const usageCollector = getUsageCollector(serverStub, callClusterStub);
const usageCollector = getKibanaUsageCollector(serverStub, callClusterStub);

expect(usageCollector.type).to.be('kibana');
expect(usageCollector.fetch).to.be.a(Function);
Expand All @@ -44,7 +44,7 @@ describe('getUsageCollector', () => {
}
});

const usageCollector = getUsageCollector(serverStub, callClusterStub);
const usageCollector = getKibanaUsageCollector(serverStub, callClusterStub);
await usageCollector.fetch();

sinon.assert.calledOnce(clusterStub.callWithInternalUser);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@

import expect from 'expect.js';
import sinon from 'sinon';
import { getReportingCollector } from '../get_reporting_collector';
import { getReportingUsageCollector } from '../get_reporting_usage_collector';
import { callClusterFactory } from '../../../../../xpack_main';

describe('getReportingCollector', () => {
describe('getReportingUsageCollector', () => {
let clusterStub;
let serverStub;
let callClusterStub;
Expand All @@ -36,7 +36,7 @@ describe('getReportingCollector', () => {
});

it('correctly defines reporting collector.', () => {
const reportingCollector = getReportingCollector(serverStub, callClusterStub);
const reportingCollector = getReportingUsageCollector(serverStub, callClusterStub);

expect(reportingCollector.type).to.be('reporting_stats');
expect(reportingCollector.fetch).to.be.a(Function);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const TYPES = [
/**
* Fetches saved object client counts by querying the saved object index
*/
export function getUsageCollector(server, callCluster) {
export function getKibanaUsageCollector(server, callCluster) {
return {
type: KIBANA_USAGE_TYPE,
async fetch() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import { KIBANA_REPORTING_TYPE } from '../../../common/constants';
import { getReportingUsage } from '../../../../reporting';

export function getReportingCollector(server, callCluster) {
export function getReportingUsageCollector(server, callCluster) {
return {
type: KIBANA_REPORTING_TYPE,
fetch() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,30 @@

import { LOGGING_TAG, KIBANA_MONITORING_LOGGING_TAG, } from '../../common/constants';
import { monitoringBulk } from './lib/monitoring_bulk';
import { startCollector } from './start_collector';
import { startCollectorSet } from './start_collector_set';

/**
* @param kbnServer {Object} manager of Kibana services - see `src/server/kbn_server` in Kibana core
* @param server {Object} HapiJS server instance
* @return {Object} TypeCollector instance to be exposed at a higher level, for other plugins to register their own type collectors
* @return {Object} CollectorSet instance to be exposed at a higher level, for other plugins to register their own type collectors
*/
export function initKibanaMonitoring(kbnServer, server) {
export function createCollectorSet(kbnServer, server) {
const mainXpackInfo = server.plugins.xpack_main.info;
const mainMonitoring = mainXpackInfo.feature('monitoring');

let collector;
let collectorSet;

if (mainXpackInfo && mainMonitoring.isAvailable() && mainMonitoring.isEnabled()) {
const client = server.plugins.elasticsearch.getCluster('admin').createClient({
plugins: [monitoringBulk]
});
collector = startCollector(kbnServer, server, client);
collectorSet = startCollectorSet(kbnServer, server, client);
} else {
server.log(
['error', LOGGING_TAG, KIBANA_MONITORING_LOGGING_TAG],
'Unable to retrieve X-Pack info from the admin cluster. Kibana monitoring will be disabled until Kibana is restarted.'
);
}

return collector;
return collectorSet;
}
6 changes: 3 additions & 3 deletions x-pack/plugins/monitoring/server/kibana_monitoring/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@
* you may not use this file except in compliance with the Elastic License.
*/

export { initKibanaMonitoring } from './init_kibana_monitoring';
export { getUsageCollector } from './collectors/get_usage_collector';
export { getReportingCollector } from './collectors/get_reporting_collector';
export { createCollectorSet } from './create_collector_set';
export { getKibanaUsageCollector } from './collectors/get_kibana_usage_collector';
export { getReportingUsageCollector } from './collectors/get_reporting_usage_collector';
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@
import { identity, noop } from 'lodash';
import sinon from 'sinon';
import expect from 'expect.js';
import { TypeCollector } from '../type_collector';
import { CollectorSet } from '../collector_set';

const DEBUG_LOG = [ 'debug', 'monitoring-ui', 'kibana-monitoring' ];
const INFO_LOG = [ 'info', 'monitoring-ui', 'kibana-monitoring' ];

const COLLECTOR_INTERVAL = 10000;
const CHECK_DELAY = 100; // can be lower than COLLECTOR_INTERVAL because the collectors use fetchAfterInit

describe('TypeCollector', () => {
describe('registers a collector and runs lifecycle events', () => {
describe('CollectorSet', () => {
describe('registers a collector set and runs lifecycle events', () => {
let log;
let init;
let cleanup;
Expand All @@ -28,40 +28,40 @@ describe('TypeCollector', () => {
fetch = noop;
});

it('for skipping bulk upload because payload is empty', (done) => {
const collector = new TypeCollector({
it('should skip bulk upload if payload is empty', (done) => {
const collectors = new CollectorSet({
interval: COLLECTOR_INTERVAL,
logger: log,
combineTypes: identity,
onPayload: identity
});

collector.register({
collectors.register({
type: 'type_collector_test',
fetchAfterInit: true,
init,
fetch,
cleanup
});

collector.start();
collectors.start();

// allow interval to tick a few times
setTimeout(() => {
collector.cleanup();
collectors.cleanup();

expect(log.calledWith(INFO_LOG, 'Starting all Kibana monitoring collectors')).to.be(true);
expect(log.calledWith(INFO_LOG, 'Starting all stats collectors')).to.be(true);
expect(log.calledWith(DEBUG_LOG, 'Initializing type_collector_test collector')).to.be(true);
expect(log.calledWith(DEBUG_LOG, 'Fetching data from type_collector_test collector')).to.be(true);
expect(log.calledWith(DEBUG_LOG, 'Skipping bulk uploading of empty Kibana monitoring payload')).to.be(true); // proof of skip
expect(log.calledWith(INFO_LOG, 'Stopping all Kibana monitoring collectors')).to.be(true);
expect(log.calledWith(DEBUG_LOG, 'Skipping bulk uploading of an empty stats payload')).to.be(true); // proof of skip
expect(log.calledWith(INFO_LOG, 'Stopping all stats collectors')).to.be(true);
expect(log.calledWith(DEBUG_LOG, 'Running type_collector_test cleanup')).to.be(true);

done(); // for async exit
}, CHECK_DELAY);
});

it('for running the bulk upload handler', (done) => {
it('should run the bulk upload handler', (done) => {
const log = sinon.spy();
const combineTypes = sinon.spy(data => {
return [
Expand All @@ -71,33 +71,33 @@ describe('TypeCollector', () => {
});
const onPayload = sinon.spy();

const collector = new TypeCollector({
const collectors = new CollectorSet({
interval: COLLECTOR_INTERVAL,
logger: log,
combineTypes,
onPayload
});

fetch = () => ({ testFetch: true });
collector.register({
collectors.register({
type: 'type_collector_test',
fetchAfterInit: true,
init,
fetch,
cleanup
});

collector.start();
collectors.start();

// allow interval to tick a few times
setTimeout(() => {
collector.cleanup();
collectors.cleanup();

expect(log.calledWith(INFO_LOG, 'Starting all Kibana monitoring collectors')).to.be(true);
expect(log.calledWith(INFO_LOG, 'Starting all stats collectors')).to.be(true);
expect(log.calledWith(DEBUG_LOG, 'Initializing type_collector_test collector')).to.be(true);
expect(log.calledWith(DEBUG_LOG, 'Fetching data from type_collector_test collector')).to.be(true);
expect(log.calledWith(DEBUG_LOG, 'Uploading bulk Kibana monitoring payload')).to.be(true);
expect(log.calledWith(INFO_LOG, 'Stopping all Kibana monitoring collectors')).to.be(true);
expect(log.calledWith(DEBUG_LOG, 'Uploading bulk stats payload to the local cluster')).to.be(true);
expect(log.calledWith(INFO_LOG, 'Stopping all stats collectors')).to.be(true);
expect(log.calledWith(DEBUG_LOG, 'Running type_collector_test cleanup')).to.be(true);

// un-flattened
Expand All @@ -114,33 +114,33 @@ describe('TypeCollector', () => {
}, CHECK_DELAY);
});

it('logs info-level status of stopping and restarting', (done) => {
const collector = new TypeCollector({
it('should log the info-level status of stopping and restarting', (done) => {
const collectors = new CollectorSet({
interval: COLLECTOR_INTERVAL,
logger: log,
combineTypes: identity,
onPayload: identity
});

collector.register({
collectors.register({
type: 'type_collector_test',
fetchAfterInit: true,
init,
fetch,
cleanup
});

collector.start();
expect(log.calledWith(INFO_LOG, 'Starting all Kibana monitoring collectors')).to.be(true);
collectors.start();
expect(log.calledWith(INFO_LOG, 'Starting all stats collectors')).to.be(true);

collector.cleanup();
expect(log.calledWith(INFO_LOG, 'Stopping all Kibana monitoring collectors')).to.be(true);
collectors.cleanup();
expect(log.calledWith(INFO_LOG, 'Stopping all stats collectors')).to.be(true);

collector.start();
expect(log.calledWith(INFO_LOG, 'Starting all Kibana monitoring collectors')).to.be(true);
collectors.start();
expect(log.calledWith(INFO_LOG, 'Starting all stats collectors')).to.be(true);

// exit
collector.cleanup();
collectors.cleanup();
done();
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const LOGGING_TAGS = [LOGGING_TAG, KIBANA_MONITORING_LOGGING_TAG];
* function. Each type that gets registered defines how to fetch its own data
* and combine it into a unified payload for bulk upload.
*/
export class TypeCollector {
export class CollectorSet {

/*
* @param options.interval {Number} in milliseconds
Expand Down Expand Up @@ -67,7 +67,7 @@ export class TypeCollector {
*/
start() {
const initialCollectors = [];
this._log.info(`Starting all Kibana monitoring collectors`);
this._log.info(`Starting all stats collectors`);

this._collectors.forEach(collector => {
if (collector.init) {
Expand Down Expand Up @@ -110,14 +110,14 @@ export class TypeCollector {
if (payload.length > 0) {
try {
const combinedData = this._combineTypes(payload); // use the collector types combiner
this._log.debug(`Uploading bulk Kibana monitoring payload`);
this._log.debug(`Uploading bulk stats payload to the local cluster`);
this._onPayload(flatten(combinedData));
} catch(err) {
this._log.warn(err);
this._log.warn(`Unable to bulk upload the Kibana monitoring payload`);
this._log.warn(`Unable to bulk upload the stats payload to the local cluster`);
}
} else {
this._log.debug(`Skipping bulk uploading of empty Kibana monitoring payload`);
this._log.debug(`Skipping bulk uploading of an empty stats payload`);
}
}

Expand All @@ -140,7 +140,7 @@ export class TypeCollector {
}

cleanup() {
this._log.info(`Stopping all Kibana monitoring collectors`);
this._log.info(`Stopping all stats collectors`);

// stop fetching
clearInterval(this._timer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
*/

import { callClusterFactory } from '../../../xpack_main';
import { TypeCollector } from './lib/type_collector';
import { CollectorSet } from './lib/collector_set';
import { getOpsStatsCollector } from './collectors/get_ops_stats_collector';
import { getSettingsCollector } from './collectors/get_settings_collector';
import { getUsageCollector } from './collectors/get_usage_collector';
import { getReportingCollector } from './collectors/get_reporting_collector';
import { getKibanaUsageCollector } from './collectors/get_kibana_usage_collector';
import { getReportingUsageCollector } from './collectors/get_reporting_usage_collector';
import { sendBulkPayload } from './lib/send_bulk_payload';
import { getCollectorTypesCombiner } from './lib/get_collector_types_combiner';

Expand All @@ -21,12 +21,13 @@ import { getCollectorTypesCombiner } from './lib/get_collector_types_combiner';
* @param kbnServer {Object} manager of Kibana services - see `src/server/kbn_server` in Kibana core
* @param server {Object} HapiJS server instance
* @param client {Object} Dedicated ES Client with monitoringBulk plugin
* @return {Object} CollectorSet instance
*/
export function startCollector(kbnServer, server, client, _sendBulkPayload = sendBulkPayload) {
export function startCollectorSet(kbnServer, server, client, _sendBulkPayload = sendBulkPayload) {
const config = server.config();
const interval = config.get('xpack.monitoring.kibana.collection.interval');

const collector = new TypeCollector({
const collectorSet = new CollectorSet({
interval,
logger(...message) {
server.log(...message);
Expand All @@ -38,21 +39,21 @@ export function startCollector(kbnServer, server, client, _sendBulkPayload = sen
});
const callCluster = callClusterFactory(server).getCallClusterInternal();

collector.register(getUsageCollector(server, callCluster));
collector.register(getOpsStatsCollector(server));
collector.register(getSettingsCollector(server));
collector.register(getReportingCollector(server, callCluster)); // TODO: move this to Reporting init
collectorSet.register(getKibanaUsageCollector(server, callCluster));
collectorSet.register(getOpsStatsCollector(server));
collectorSet.register(getSettingsCollector(server));
collectorSet.register(getReportingUsageCollector(server, callCluster)); // TODO: move this to Reporting init

// Startup Kibana cleanly or reconnect to Elasticsearch
server.plugins.elasticsearch.status.on('green', () => {
collector.start();
collectorSet.start();
});

// If connection to elasticsearch is lost
// NOTE it is possible for the plugin status to go from red to red and trigger cleanup twice
server.plugins.elasticsearch.status.on('red', () => {
collector.cleanup();
collectorSet.cleanup();
});

return collector;
return collectorSet;
}
Loading

0 comments on commit 0940f57

Please sign in to comment.