Skip to content

Commit

Permalink
Merge pull request #658 from brizental/1712920-rate-limit
Browse files Browse the repository at this point in the history
Bug 1712920 - Implement rate limiting in ping upload
  • Loading branch information
Beatriz Rizental authored Aug 24, 2021
2 parents 40b66d6 + 268feaf commit ca0abbc
Show file tree
Hide file tree
Showing 5 changed files with 263 additions and 9 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
* [#580](https://github.com/mozilla/glean.js/pull/580): BUGFIX: Pending pings at startup up are uploaded from oldest to newest.
* [#607](https://github.com/mozilla/glean.js/pull/607): Record an error when incoherent timestamps are calculated for events after a restart.
* [#630](https://github.com/mozilla/glean.js/pull/630): Accept booleans and numbers as event extras.
* [#658](https://github.com/mozilla/glean.js/pull/658): Implement rate limiting for ping upload.
* Only up to 15 ping submissions every 60 seconds are now allowed.
* [#658](https://github.com/mozilla/glean.js/pull/658): BUGFIX: Unblock ping uploading jobs after the maximum of upload failures are hit for a given uploading window.

# v0.18.1 (2021-07-22)

Expand Down
39 changes: 38 additions & 1 deletion glean/src/core/upload/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,15 @@ import type PlatformInfo from "../platform_info.js";
import { UploadResult } from "./uploader.js";
import type Uploader from "./uploader.js";
import { UploadResultStatus } from "./uploader.js";
import RateLimiter, { RateLimiterState } from "./rate_limiter.js";

const LOG_TAG = "core.Upload";

// Default rate limiter interval, in milliseconds.
const RATE_LIMITER_INTERVAL_MS = 60 * 1000;
// Default max pings per internal.
const MAX_PINGS_PER_INTERVAL = 15;

/**
* Create and initialize a dispatcher for the PingUplaoder.
*
Expand Down Expand Up @@ -91,7 +97,8 @@ class PingUploader implements PingsDatabaseObserver {
config: Configuration,
platform: Platform,
private readonly pingsDatabase = Context.pingsDatabase,
private readonly policy = new Policy()
private readonly policy = new Policy(),
private readonly rateLimiter = new RateLimiter(RATE_LIMITER_INTERVAL_MS, MAX_PINGS_PER_INTERVAL)
) {
this.processing = [];
// Initialize the ping uploader with either the platform defaults or a custom
Expand Down Expand Up @@ -121,6 +128,35 @@ class PingUploader implements PingsDatabaseObserver {
// Add the ping to the list of pings being processsed.
this.processing.push(ping);

const { state: rateLimiterState, remainingTime } = this.rateLimiter.getState();
if (rateLimiterState === RateLimiterState.Incrementing) {
this.dispatcher.resume();
} else {
this.dispatcher.stop();

if (rateLimiterState === RateLimiterState.Throttled) {
log(
LOG_TAG,
[
"Attempted to upload a ping, but Glean is currently throttled.",
`Pending pings will be processed in ${(remainingTime || 0) / 1000}s.`
],
LoggingLevel.Debug
);
}
else if (rateLimiterState === RateLimiterState.Stopped) {
log(
LOG_TAG,
[
"Attempted to upload a ping, but Glean has reached maximum recoverable upload failures",
"for the current uploading window.",
`Will retry in ${(remainingTime || 0) / 1000}s.`
],
LoggingLevel.Debug
);
}
}

// If the ping is a deletion-request ping, we want to enqueue it as a persistent task,
// so that clearing the queue does not clear it.
//
Expand All @@ -143,6 +179,7 @@ class PingUploader implements PingsDatabaseObserver {
`Reached maximum recoverable failures for ping "${JSON.stringify(ping.name)}". You are done.`,
LoggingLevel.Info
);
this.rateLimiter.stop();
this.dispatcher.stop();
ping.retries = 0;
}
Expand Down
125 changes: 125 additions & 0 deletions glean/src/core/upload/rate_limiter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */

import { isUndefined, getMonotonicNow } from "../utils.js";

/**
* An enum to represent the current state of the RateLimiter.
*/
export const enum RateLimiterState {
// The RateLimiter has not reached the maximum count and is still incrementing.
Incrementing,
// The RateLimiter has not reached the maximum count, but it is also not incrementing.
Stopped,
// The RateLimiter has reached the maximum count for the current interval.
Throttled,
}

class RateLimiter {
// Whether or not the RateLimiter is not counting any further for the current interval.
// This is different from the RateLimiter being throttled, because it may happen
// even if max count for the current interval has not been reached.
private stopped = false;

constructor(
// The duration of each interval, in millisecods.
private interval: number,
// The maximum count per interval.
private maxCount: number,
// The count for the current interval.
private count: number = 0,
// The instant the current interval has started, in milliseconds.
private started?: number,
) {}

get elapsed(): number {
if (isUndefined(this.started)) {
return NaN;
}

const now = getMonotonicNow();
const elapsed = now - this.started;

// It's very unlikely elapsed will be a negative number since we are using a monotonic timer
// here, but just to be extra sure, we account for it.
if (elapsed < 0) {
return NaN;
}

return elapsed;
}

private reset(): void {
this.started = getMonotonicNow();
this.count = 0;
this.stopped = false;
}

/**
* The rate limiter should reset if
*
* 1. It has never started i.e. `started` is still `undefined`;
* 2. It has been started more than the interval time ago;
* 3. Something goes wrong while trying to calculate the elapsed time since the last reset.
*
* @returns Whether or not this rate limiter should reset.
*/
private shouldReset(): boolean {
if (isUndefined(this.started)) {
return true;
}

if (isNaN(this.elapsed) || this.elapsed > this.interval) {
return true;
}

return false;
}

/**
* Tries to increment the internal counter.
*
* @returns The current state of the RateLimiter plus the remaining time
* (in milliseconds) until the end of the current window.
*/
getState(): {
state: RateLimiterState,
remainingTime?: number,
} {
if (this.shouldReset()) {
this.reset();
}

const remainingTime = this.interval - this.elapsed;
if (this.stopped) {
return {
state: RateLimiterState.Stopped,
remainingTime,
};
}

if (this.count >= this.maxCount) {
return {
state: RateLimiterState.Throttled,
remainingTime,
};
}

this.count++;
return {
state: RateLimiterState.Incrementing
};
}

/**
* Stops counting for the current interval, regardless of the max count being reached.
*
* The RateLimiter will still be reset when time interval is over.
*/
stop(): void {
this.stopped = true;
}
}

export default RateLimiter;
14 changes: 6 additions & 8 deletions glean/tests/unit/core/upload/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,12 @@ describe("PingUploader", function() {
assert.deepStrictEqual(Object.keys(allPings).length, 1);
});

it("duplicates are not enqueued", function() {
// Don't initialize to keep the dispatcher in an uninitialized state
// thus making sure no upload attempt is executed and we can look at the dispatcher queue.
const uploader = new PingUploader(new Configuration(), Glean.platform, Context.pingsDatabase);
// Stop the dispatcher so that pings can be enqueued but not sent.
uploader["dispatcher"].stop();
it("duplicates are not enqueued", async function() {
const httpClient = new CounterUploader();
await Glean.testResetGlean(testAppId, true, { httpClient });

for (let i = 0; i < 10; i++) {
uploader["enqueuePing"]({
Glean["pingUploader"]["enqueuePing"]({
collectionDate: (new Date()).toISOString(),
identifier: "id",
retries: 0,
Expand All @@ -163,7 +160,8 @@ describe("PingUploader", function() {
});
}

assert.strictEqual(uploader["dispatcher"]["queue"].length, 1);
await Glean["pingUploader"].testBlockOnPingsQueue();
assert.strictEqual(httpClient.count, 1);
});

it("maximum of recoverable errors is enforced", async function () {
Expand Down
91 changes: 91 additions & 0 deletions glean/tests/unit/core/upload/rate_limiter.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */

import assert from "assert";
import type { SinonFakeTimers } from "sinon";
import sinon from "sinon";

import RateLimiter, { RateLimiterState } from "../../../../src/core/upload/rate_limiter";

const sandbox = sinon.createSandbox();
const now = new Date();


describe("RateLimiter", function() {
let clock: SinonFakeTimers;

beforeEach(function() {
clock = sandbox.useFakeTimers(now.getTime());
});

afterEach(function () {
clock.restore();
});

it("rate limiter correctly resets in case elapsed time return an error", function () {
const rateLimiter = new RateLimiter(
1000, /* interval */
3, /* maxCount */
);

// Reach the count for the current interval.
assert.deepStrictEqual(rateLimiter.getState(), { state: RateLimiterState.Incrementing });
assert.deepStrictEqual(rateLimiter.getState(), { state: RateLimiterState.Incrementing });
assert.deepStrictEqual(rateLimiter.getState(), { state: RateLimiterState.Incrementing });

sinon.replaceGetter(rateLimiter, "elapsed", () => NaN);
assert.deepStrictEqual(rateLimiter.getState(), { state: RateLimiterState.Incrementing });
});

it("rate limiter correctly resets in case interval is over", function () {
const rateLimiter = new RateLimiter(
1000, /* interval */
3, /* maxCount */
);

// Reach the count for the current interval.
assert.deepStrictEqual(rateLimiter.getState(), { state: RateLimiterState.Incrementing });
assert.deepStrictEqual(rateLimiter.getState(), { state: RateLimiterState.Incrementing });
assert.deepStrictEqual(rateLimiter.getState(), { state: RateLimiterState.Incrementing });

// Fake the time passing over the current interval
sinon.replaceGetter(rateLimiter, "elapsed", () => 1000 * 2);
assert.deepStrictEqual(rateLimiter.getState(), { state: RateLimiterState.Incrementing });
});

it("rate limiter returns throttled state when it is throttled", function () {
const rateLimiter = new RateLimiter(
1000, /* interval */
3, /* maxCount */
);

// Reach the count for the current interval.
assert.deepStrictEqual(rateLimiter.getState(), { state: RateLimiterState.Incrementing });
assert.deepStrictEqual(rateLimiter.getState(), { state: RateLimiterState.Incrementing });
assert.deepStrictEqual(rateLimiter.getState(), { state: RateLimiterState.Incrementing });

// Try one more time and we should be throttled.
const nextState = rateLimiter.getState();
assert.strictEqual(nextState.state, RateLimiterState.Throttled);
assert.ok(nextState.remainingTime as number <= 1000 && nextState.remainingTime as number > 0);
});

it("rate limiter returns stopped state when it is stopped", function () {
const rateLimiter = new RateLimiter(
1000, /* interval */
3, /* maxCount */
);

// Don't reach the count for the current interval.
assert.deepStrictEqual(rateLimiter.getState(), { state: RateLimiterState.Incrementing });

// Stop the rate limiter
rateLimiter.stop();

// Try one more time and we should be stopped.
const nextState = rateLimiter.getState();
assert.strictEqual(nextState.state, RateLimiterState.Stopped);
assert.ok(nextState.remainingTime as number <= 1000 && nextState.remainingTime as number > 0);
});
});

0 comments on commit ca0abbc

Please sign in to comment.