diff --git a/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/__tests__/bulk_uploader.js b/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/__tests__/bulk_uploader.js index fff0b742925c..ef7d3f1224fa 100644 --- a/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/__tests__/bulk_uploader.js +++ b/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/__tests__/bulk_uploader.js @@ -209,7 +209,7 @@ describe('BulkUploader', () => { }, CHECK_DELAY); }); - it('refetches UsageCollectors if uploading to local cluster was not successful', done => { + it('stops refetching UsageCollectors if uploading to local cluster was not successful', async () => { const usageCollectorFetch = sinon .stub() .returns({ type: 'type_usage_collector_test', result: { testData: 12345 } }); @@ -227,12 +227,52 @@ describe('BulkUploader', () => { uploader._onPayload = async () => ({ took: 0, ignored: true, errors: false }); - uploader.start(collectors); - setTimeout(() => { - uploader.stop(); - expect(usageCollectorFetch.callCount).to.be.greaterThan(1); - done(); - }, CHECK_DELAY); + await uploader._fetchAndUpload(uploader.filterCollectorSet(collectors)); + await uploader._fetchAndUpload(uploader.filterCollectorSet(collectors)); + await uploader._fetchAndUpload(uploader.filterCollectorSet(collectors)); + + expect(uploader._holdSendingUsage).to.eql(true); + expect(usageCollectorFetch.callCount).to.eql(1); + }); + + it('fetches UsageCollectors once uploading to local cluster is successful again', async () => { + const usageCollectorFetch = sinon + .stub() + .returns({ type: 'type_usage_collector_test', result: { usageData: 12345 } }); + + const statsCollectorFetch = sinon + .stub() + .returns({ type: 'type_stats_collector_test', result: { statsData: 12345 } }); + + const collectors = new MockCollectorSet(server, [ + { + fetch: statsCollectorFetch, + isReady: () => true, + formatForBulkUpload: result => result, + isUsageCollector: false, + }, + { + fetch: usageCollectorFetch, + isReady: () => true, + formatForBulkUpload: result => result, + isUsageCollector: true, + }, + ]); + + const uploader = new BulkUploader({ ...server, interval: FETCH_INTERVAL }); + let bulkIgnored = true; + uploader._onPayload = async () => ({ took: 0, ignored: bulkIgnored, errors: false }); + + await uploader._fetchAndUpload(uploader.filterCollectorSet(collectors)); + expect(uploader._holdSendingUsage).to.eql(true); + + bulkIgnored = false; + await uploader._fetchAndUpload(uploader.filterCollectorSet(collectors)); + await uploader._fetchAndUpload(uploader.filterCollectorSet(collectors)); + + expect(uploader._holdSendingUsage).to.eql(false); + expect(usageCollectorFetch.callCount).to.eql(2); + expect(statsCollectorFetch.callCount).to.eql(3); }); it('calls UsageCollectors if last reported exceeds during a _usageInterval', done => { diff --git a/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/bulk_uploader.js b/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/bulk_uploader.js index 2d81cb23b6b3..5e0d8aa4be1f 100644 --- a/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/bulk_uploader.js +++ b/x-pack/legacy/plugins/monitoring/server/kibana_monitoring/bulk_uploader.js @@ -40,8 +40,14 @@ export class BulkUploader { } this._timer = null; + // Hold sending and fetching usage until monitoring.bulk is successful. This means that we + // send usage data on the second tick. But would save a lot of bandwidth fetching usage on + // every tick when ES is failing or monitoring is disabled. + this._holdSendingUsage = false; this._interval = interval; this._lastFetchUsageTime = null; + // Limit sending and fetching usage to once per day once usage is successfully stored + // into the monitoring indices. this._usageInterval = TELEMETRY_COLLECTION_INTERVAL; this._log = { @@ -65,6 +71,29 @@ export class BulkUploader { }); } + filterCollectorSet(usageCollection) { + const successfulUploadInLastDay = + this._lastFetchUsageTime && this._lastFetchUsageTime + this._usageInterval > Date.now(); + + return usageCollection.getFilteredCollectorSet(c => { + // this is internal bulk upload, so filter out API-only collectors + if (c.ignoreForInternalUploader) { + return false; + } + // Only collect usage data at the same interval as telemetry would (default to once a day) + if (usageCollection.isUsageCollector(c)) { + if (this._holdSendingUsage) { + return false; + } + if (successfulUploadInLastDay) { + return false; + } + } + + return true; + }); + } + /* * Start the interval timer * @param {usageCollection} usageCollection object to use for initial the fetch/upload and fetch/uploading on interval @@ -72,31 +101,15 @@ export class BulkUploader { */ start(usageCollection) { this._log.info('Starting monitoring stats collection'); - const filterCollectorSet = _usageCollection => { - const successfulUploadInLastDay = - this._lastFetchUsageTime && this._lastFetchUsageTime + this._usageInterval > Date.now(); - - return _usageCollection.getFilteredCollectorSet(c => { - // this is internal bulk upload, so filter out API-only collectors - if (c.ignoreForInternalUploader) { - return false; - } - // Only collect usage data at the same interval as telemetry would (default to once a day) - if (successfulUploadInLastDay && _usageCollection.isUsageCollector(c)) { - return false; - } - return true; - }); - }; if (this._timer) { clearInterval(this._timer); } else { - this._fetchAndUpload(filterCollectorSet(usageCollection)); // initial fetch + this._fetchAndUpload(this.filterCollectorSet(usageCollection)); // initial fetch } this._timer = setInterval(() => { - this._fetchAndUpload(filterCollectorSet(usageCollection)); + this._fetchAndUpload(this.filterCollectorSet(usageCollection)); }, this._interval); } @@ -146,12 +159,17 @@ export class BulkUploader { const sendSuccessful = !result.ignored && !result.errors; if (!sendSuccessful && hasUsageCollectors) { this._lastFetchUsageTime = null; + this._holdSendingUsage = true; this._log.debug( 'Resetting lastFetchWithUsage because uploading to the cluster was not successful.' ); } - if (sendSuccessful && hasUsageCollectors) { - this._lastFetchUsageTime = Date.now(); + + if (sendSuccessful) { + this._holdSendingUsage = false; + if (hasUsageCollectors) { + this._lastFetchUsageTime = Date.now(); + } } this._log.debug(`Uploaded bulk stats payload to the local cluster`); } catch (err) {