Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Throttle file events #126811

Merged
merged 4 commits into from
Jun 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 98 additions & 1 deletion src/vs/base/common/async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import { CancellationToken, CancellationTokenSource } from 'vs/base/common/cancellation';
import { canceled, onUnexpectedError } from 'vs/base/common/errors';
import { Emitter, Event, Listener } from 'vs/base/common/event';
import { IDisposable, toDisposable } from 'vs/base/common/lifecycle';
import { IDisposable, toDisposable, Disposable, MutableDisposable } from 'vs/base/common/lifecycle';
import { LinkedList } from 'vs/base/common/linkedList';
import { extUri as defaultExtUri, IExtUri } from 'vs/base/common/resources';
import { URI } from 'vs/base/common/uri';
Expand Down Expand Up @@ -787,6 +787,103 @@ export class RunOnceWorker<T> extends RunOnceScheduler {
}
}

/**
* The `ThrottledWorker` will accept units of work `T`
* to handle. The contract is:
* * there is a maximum of units the worker can handle at once (via `chunkSize`)
* * after having handled units, the worker needs to rest (via `throttleDelay`)
*/
export class ThrottledWorker<T> extends Disposable {

private readonly pendingWork: T[] = [];

private readonly throttler = this._register(new MutableDisposable<RunOnceScheduler>());
private disposed = false;

constructor(
private readonly maxWorkChunkSize: number,
private readonly maxPendingWork: number | undefined,
private readonly throttleDelay: number,
private readonly handler: (units: readonly T[]) => void
) {
super();
}

/**
* The number of work units that are pending to be processed.
*/
get pending(): number { return this.pendingWork.length; }

/**
* Add units to be worked on. Use `pending` to figure out
* how many units are not yet processed after this method
* was called.
*
* @returns whether the work was accepted or not. If the
* worker is disposed, it will not accept any more work.
* If the number of pending units would become larger
* than `maxPendingWork`, more work will also not be accepted.
*/
work(units: readonly T[]): boolean {
if (this.disposed) {
return false; // work not accepted: disposed
}

// Check for reaching maximum of pending work
if (typeof this.maxPendingWork === 'number') {

// Throttled: simple check if pending + units exceeds max pending
if (this.throttler.value) {
if (this.pending + units.length > this.maxPendingWork) {
return false; // work not accepted: too much pending work
}
}

// Unthrottled: same as throttled, but account for max chunk getting
// worked on directly without being pending
else {
if (this.pending + units.length - this.maxWorkChunkSize > this.maxPendingWork) {
return false; // work not accepted: too much pending work
}
}
}

// Add to pending units first
this.pendingWork.push(...units);

// If not throttled, start working directly
// Otherwise, when the throttle delay has
// past, pending work will be worked again.
if (!this.throttler.value) {
this.doWork();
}

return true; // work accepted
}

private doWork(): void {

// Extract chunk to handle and handle it
this.handler(this.pendingWork.splice(0, this.maxWorkChunkSize));

// If we have remaining work, schedule it after a delay
if (this.pendingWork.length > 0) {
this.throttler.value = new RunOnceScheduler(() => {
this.throttler.clear();

this.doWork();
}, this.throttleDelay);
this.throttler.value.schedule();
}
}

override dispose(): void {
super.dispose();

this.disposed = true;
}
}

//#region -- run on idle tricks ------------

export interface IdleDeadline {
Expand Down
185 changes: 185 additions & 0 deletions src/vs/base/test/common/async.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1026,4 +1026,189 @@ suite('Async', () => {
assert.ok(p3Handled);
});
});

suite('ThrottledWorker', () => {

function assertArrayEquals(actual: unknown[], expected: unknown[]) {
assert.strictEqual(actual.length, expected.length);

for (let i = 0; i < actual.length; i++) {
assert.strictEqual(actual[i], expected[i]);
}
}

test('basics', async () => {
let handled: number[] = [];

let handledCallback: Function;
let handledPromise = new Promise(resolve => handledCallback = resolve);
let handledCounterToResolve = 1;
let currentHandledCounter = 0;

const handler = (units: readonly number[]) => {
handled.push(...units);

currentHandledCounter++;
if (currentHandledCounter === handledCounterToResolve) {
handledCallback();

handledPromise = new Promise(resolve => handledCallback = resolve);
currentHandledCounter = 0;
}
};

const worker = new async.ThrottledWorker<number>(5, undefined, 1, handler);

// Work less than chunk size

let worked = worker.work([1, 2, 3]);

assertArrayEquals(handled, [1, 2, 3]);
assert.strictEqual(worker.pending, 0);
assert.strictEqual(worked, true);

worker.work([4, 5]);
worked = worker.work([6]);

assertArrayEquals(handled, [1, 2, 3, 4, 5, 6]);
assert.strictEqual(worker.pending, 0);
assert.strictEqual(worked, true);

// Work more than chunk size (variant 1)

handled = [];
handledCounterToResolve = 2;

worked = worker.work([1, 2, 3, 4, 5, 6, 7]);

assertArrayEquals(handled, [1, 2, 3, 4, 5]);
assert.strictEqual(worker.pending, 2);
assert.strictEqual(worked, true);

await handledPromise;

assertArrayEquals(handled, [1, 2, 3, 4, 5, 6, 7]);

handled = [];
handledCounterToResolve = 4;

worked = worker.work([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]);

assertArrayEquals(handled, [1, 2, 3, 4, 5]);
assert.strictEqual(worker.pending, 14);
assert.strictEqual(worked, true);

await handledPromise;

assertArrayEquals(handled, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]);

// Work more than chunk size (variant 2)

handled = [];
handledCounterToResolve = 2;

worked = worker.work([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);

assertArrayEquals(handled, [1, 2, 3, 4, 5]);
assert.strictEqual(worker.pending, 5);
assert.strictEqual(worked, true);

await handledPromise;

assertArrayEquals(handled, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);

// Work more while throttled (variant 1)

handled = [];
handledCounterToResolve = 3;

worked = worker.work([1, 2, 3, 4, 5, 6, 7]);

assertArrayEquals(handled, [1, 2, 3, 4, 5]);
assert.strictEqual(worker.pending, 2);
assert.strictEqual(worked, true);

worker.work([8]);
worked = worker.work([9, 10, 11]);

assertArrayEquals(handled, [1, 2, 3, 4, 5]);
assert.strictEqual(worker.pending, 6);
assert.strictEqual(worked, true);

await handledPromise;

assertArrayEquals(handled, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]);
assert.strictEqual(worker.pending, 0);

// Work more while throttled (variant 2)

handled = [];
handledCounterToResolve = 2;

worked = worker.work([1, 2, 3, 4, 5, 6, 7]);

assertArrayEquals(handled, [1, 2, 3, 4, 5]);
assert.strictEqual(worked, true);

worker.work([8]);
worked = worker.work([9, 10]);

assertArrayEquals(handled, [1, 2, 3, 4, 5]);
assert.strictEqual(worked, true);

await handledPromise;

assertArrayEquals(handled, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
});

test('do not accept too much work', async () => {
let handled: number[] = [];
const handler = (units: readonly number[]) => handled.push(...units);

const worker = new async.ThrottledWorker<number>(5, 5, 1, handler);

let worked = worker.work([1, 2, 3]);
assert.strictEqual(worked, true);

worked = worker.work([1, 2, 3, 4, 5, 6]);
assert.strictEqual(worked, true);
assert.strictEqual(worker.pending, 1);

worked = worker.work([7]);
assert.strictEqual(worked, true);
assert.strictEqual(worker.pending, 2);

worked = worker.work([8, 9, 10, 11]);
assert.strictEqual(worked, false);
assert.strictEqual(worker.pending, 2);
});

test('do not accept too much work (account for max chunk size', async () => {
let handled: number[] = [];
const handler = (units: readonly number[]) => handled.push(...units);

const worker = new async.ThrottledWorker<number>(5, 5, 1, handler);

let worked = worker.work([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]);
assert.strictEqual(worked, false);
assert.strictEqual(worker.pending, 0);

worked = worker.work([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
assert.strictEqual(worked, true);
assert.strictEqual(worker.pending, 5);
});

test('disposed', async () => {
let handled: number[] = [];
const handler = (units: readonly number[]) => handled.push(...units);

const worker = new async.ThrottledWorker<number>(5, undefined, 1, handler);
worker.dispose();
const worked = worker.work([1, 2, 3]);

assertArrayEquals(handled, []);
assert.strictEqual(worker.pending, 0);
assert.strictEqual(worked, false);
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export class ExtensionsWatcher extends Disposable {
const extensionsResource = URI.file(environmentService.extensionsPath);
const extUri = new ExtUri(resource => !fileService.hasCapability(resource, FileSystemProviderCapabilities.PathCaseSensitive));
this._register(fileService.watch(extensionsResource));
this._register(Event.filter(fileService.onDidFilesChange, e => e.raw.some(change => this.doesChangeAffects(change, extensionsResource, extUri)))(() => this.onDidChange()));
this._register(Event.filter(fileService.onDidChangeFilesRaw, raw => raw.some(change => this.doesChangeAffects(change, extensionsResource, extUri)))(() => this.onDidChange()));
}

private doesChangeAffects(change: IFileChange, extensionsResource: URI, extUri: ExtUri): boolean {
Expand Down
Loading