Skip to content

Commit

Permalink
[Telemetry] [Monitoring] Only retry fetching usage once monito… (#54309)
Browse files Browse the repository at this point in the history
* fix interval and add tests

* Update x-pack/legacy/plugins/monitoring/server/kibana_monitoring/bulk_uploader.js

Co-Authored-By: Christiane (Tina) Heiligers <christiane.heiligers@elastic.co>

Co-authored-by: Christiane (Tina) Heiligers <christiane.heiligers@elastic.co>
  • Loading branch information
Bamieh and TinaHeiligers authored Jan 9, 2020
1 parent 0e46b24 commit a27c4c4
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ describe('BulkUploader', () => {
}, CHECK_DELAY);
});

it('refetches UsageCollectors if uploading to local cluster was not successful', done => {
it('stops refetching UsageCollectors if uploading to local cluster was not successful', async () => {
const usageCollectorFetch = sinon
.stub()
.returns({ type: 'type_usage_collector_test', result: { testData: 12345 } });
Expand All @@ -227,12 +227,52 @@ describe('BulkUploader', () => {

uploader._onPayload = async () => ({ took: 0, ignored: true, errors: false });

uploader.start(collectors);
setTimeout(() => {
uploader.stop();
expect(usageCollectorFetch.callCount).to.be.greaterThan(1);
done();
}, CHECK_DELAY);
await uploader._fetchAndUpload(uploader.filterCollectorSet(collectors));
await uploader._fetchAndUpload(uploader.filterCollectorSet(collectors));
await uploader._fetchAndUpload(uploader.filterCollectorSet(collectors));

expect(uploader._holdSendingUsage).to.eql(true);
expect(usageCollectorFetch.callCount).to.eql(1);
});

it('fetches UsageCollectors once uploading to local cluster is successful again', async () => {
const usageCollectorFetch = sinon
.stub()
.returns({ type: 'type_usage_collector_test', result: { usageData: 12345 } });

const statsCollectorFetch = sinon
.stub()
.returns({ type: 'type_stats_collector_test', result: { statsData: 12345 } });

const collectors = new MockCollectorSet(server, [
{
fetch: statsCollectorFetch,
isReady: () => true,
formatForBulkUpload: result => result,
isUsageCollector: false,
},
{
fetch: usageCollectorFetch,
isReady: () => true,
formatForBulkUpload: result => result,
isUsageCollector: true,
},
]);

const uploader = new BulkUploader({ ...server, interval: FETCH_INTERVAL });
let bulkIgnored = true;
uploader._onPayload = async () => ({ took: 0, ignored: bulkIgnored, errors: false });

await uploader._fetchAndUpload(uploader.filterCollectorSet(collectors));
expect(uploader._holdSendingUsage).to.eql(true);

bulkIgnored = false;
await uploader._fetchAndUpload(uploader.filterCollectorSet(collectors));
await uploader._fetchAndUpload(uploader.filterCollectorSet(collectors));

expect(uploader._holdSendingUsage).to.eql(false);
expect(usageCollectorFetch.callCount).to.eql(2);
expect(statsCollectorFetch.callCount).to.eql(3);
});

it('calls UsageCollectors if last reported exceeds during a _usageInterval', done => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,14 @@ export class BulkUploader {
}

this._timer = null;
// Hold sending and fetching usage until monitoring.bulk is successful. This means that we
// send usage data on the second tick. But would save a lot of bandwidth fetching usage on
// every tick when ES is failing or monitoring is disabled.
this._holdSendingUsage = false;
this._interval = interval;
this._lastFetchUsageTime = null;
// Limit sending and fetching usage to once per day once usage is successfully stored
// into the monitoring indices.
this._usageInterval = TELEMETRY_COLLECTION_INTERVAL;

this._log = {
Expand All @@ -65,38 +71,45 @@ export class BulkUploader {
});
}

filterCollectorSet(usageCollection) {
const successfulUploadInLastDay =
this._lastFetchUsageTime && this._lastFetchUsageTime + this._usageInterval > Date.now();

return usageCollection.getFilteredCollectorSet(c => {
// this is internal bulk upload, so filter out API-only collectors
if (c.ignoreForInternalUploader) {
return false;
}
// Only collect usage data at the same interval as telemetry would (default to once a day)
if (usageCollection.isUsageCollector(c)) {
if (this._holdSendingUsage) {
return false;
}
if (successfulUploadInLastDay) {
return false;
}
}

return true;
});
}

/*
* Start the interval timer
* @param {usageCollection} usageCollection object to use for initial the fetch/upload and fetch/uploading on interval
* @return undefined
*/
start(usageCollection) {
this._log.info('Starting monitoring stats collection');
const filterCollectorSet = _usageCollection => {
const successfulUploadInLastDay =
this._lastFetchUsageTime && this._lastFetchUsageTime + this._usageInterval > Date.now();

return _usageCollection.getFilteredCollectorSet(c => {
// this is internal bulk upload, so filter out API-only collectors
if (c.ignoreForInternalUploader) {
return false;
}
// Only collect usage data at the same interval as telemetry would (default to once a day)
if (successfulUploadInLastDay && _usageCollection.isUsageCollector(c)) {
return false;
}
return true;
});
};

if (this._timer) {
clearInterval(this._timer);
} else {
this._fetchAndUpload(filterCollectorSet(usageCollection)); // initial fetch
this._fetchAndUpload(this.filterCollectorSet(usageCollection)); // initial fetch
}

this._timer = setInterval(() => {
this._fetchAndUpload(filterCollectorSet(usageCollection));
this._fetchAndUpload(this.filterCollectorSet(usageCollection));
}, this._interval);
}

Expand Down Expand Up @@ -146,12 +159,17 @@ export class BulkUploader {
const sendSuccessful = !result.ignored && !result.errors;
if (!sendSuccessful && hasUsageCollectors) {
this._lastFetchUsageTime = null;
this._holdSendingUsage = true;
this._log.debug(
'Resetting lastFetchWithUsage because uploading to the cluster was not successful.'
);
}
if (sendSuccessful && hasUsageCollectors) {
this._lastFetchUsageTime = Date.now();

if (sendSuccessful) {
this._holdSendingUsage = false;
if (hasUsageCollectors) {
this._lastFetchUsageTime = Date.now();
}
}
this._log.debug(`Uploaded bulk stats payload to the local cluster`);
} catch (err) {
Expand Down

0 comments on commit a27c4c4

Please sign in to comment.