Skip to content

Commit

Permalink
Throttle file events (#126811)
Browse files Browse the repository at this point in the history
* file watcher - some code 💄

* file watcher - implement some render side throttling

* file watcher - provide an event for raw changes access

* file watcher - give explicitly watched resources higher prio
  • Loading branch information
bpasero authored Jun 21, 2021
2 parents 78946a0 + 242d33b commit b06322b
Show file tree
Hide file tree
Showing 21 changed files with 675 additions and 155 deletions.
99 changes: 98 additions & 1 deletion src/vs/base/common/async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import { CancellationToken, CancellationTokenSource } from 'vs/base/common/cancellation';
import { canceled, onUnexpectedError } from 'vs/base/common/errors';
import { Emitter, Event, Listener } from 'vs/base/common/event';
import { IDisposable, toDisposable } from 'vs/base/common/lifecycle';
import { IDisposable, toDisposable, Disposable, MutableDisposable } from 'vs/base/common/lifecycle';
import { LinkedList } from 'vs/base/common/linkedList';
import { extUri as defaultExtUri, IExtUri } from 'vs/base/common/resources';
import { URI } from 'vs/base/common/uri';
Expand Down Expand Up @@ -787,6 +787,103 @@ export class RunOnceWorker<T> extends RunOnceScheduler {
}
}

/**
* The `ThrottledWorker` will accept units of work `T`
* to handle. The contract is:
* * there is a maximum of units the worker can handle at once (via `chunkSize`)
* * after having handled units, the worker needs to rest (via `throttleDelay`)
*/
export class ThrottledWorker<T> extends Disposable {

private readonly pendingWork: T[] = [];

private readonly throttler = this._register(new MutableDisposable<RunOnceScheduler>());
private disposed = false;

constructor(
private readonly maxWorkChunkSize: number,
private readonly maxPendingWork: number | undefined,
private readonly throttleDelay: number,
private readonly handler: (units: readonly T[]) => void
) {
super();
}

/**
* The number of work units that are pending to be processed.
*/
get pending(): number { return this.pendingWork.length; }

/**
* Add units to be worked on. Use `pending` to figure out
* how many units are not yet processed after this method
* was called.
*
* @returns whether the work was accepted or not. If the
* worker is disposed, it will not accept any more work.
* If the number of pending units would become larger
* than `maxPendingWork`, more work will also not be accepted.
*/
work(units: readonly T[]): boolean {
if (this.disposed) {
return false; // work not accepted: disposed
}

// Check for reaching maximum of pending work
if (typeof this.maxPendingWork === 'number') {

// Throttled: simple check if pending + units exceeds max pending
if (this.throttler.value) {
if (this.pending + units.length > this.maxPendingWork) {
return false; // work not accepted: too much pending work
}
}

// Unthrottled: same as throttled, but account for max chunk getting
// worked on directly without being pending
else {
if (this.pending + units.length - this.maxWorkChunkSize > this.maxPendingWork) {
return false; // work not accepted: too much pending work
}
}
}

// Add to pending units first
this.pendingWork.push(...units);

// If not throttled, start working directly
// Otherwise, when the throttle delay has
// past, pending work will be worked again.
if (!this.throttler.value) {
this.doWork();
}

return true; // work accepted
}

private doWork(): void {

// Extract chunk to handle and handle it
this.handler(this.pendingWork.splice(0, this.maxWorkChunkSize));

// If we have remaining work, schedule it after a delay
if (this.pendingWork.length > 0) {
this.throttler.value = new RunOnceScheduler(() => {
this.throttler.clear();

this.doWork();
}, this.throttleDelay);
this.throttler.value.schedule();
}
}

override dispose(): void {
super.dispose();

this.disposed = true;
}
}

//#region -- run on idle tricks ------------

export interface IdleDeadline {
Expand Down
185 changes: 185 additions & 0 deletions src/vs/base/test/common/async.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1026,4 +1026,189 @@ suite('Async', () => {
assert.ok(p3Handled);
});
});

suite('ThrottledWorker', () => {

function assertArrayEquals(actual: unknown[], expected: unknown[]) {
assert.strictEqual(actual.length, expected.length);

for (let i = 0; i < actual.length; i++) {
assert.strictEqual(actual[i], expected[i]);
}
}

test('basics', async () => {
let handled: number[] = [];

let handledCallback: Function;
let handledPromise = new Promise(resolve => handledCallback = resolve);
let handledCounterToResolve = 1;
let currentHandledCounter = 0;

const handler = (units: readonly number[]) => {
handled.push(...units);

currentHandledCounter++;
if (currentHandledCounter === handledCounterToResolve) {
handledCallback();

handledPromise = new Promise(resolve => handledCallback = resolve);
currentHandledCounter = 0;
}
};

const worker = new async.ThrottledWorker<number>(5, undefined, 1, handler);

// Work less than chunk size

let worked = worker.work([1, 2, 3]);

assertArrayEquals(handled, [1, 2, 3]);
assert.strictEqual(worker.pending, 0);
assert.strictEqual(worked, true);

worker.work([4, 5]);
worked = worker.work([6]);

assertArrayEquals(handled, [1, 2, 3, 4, 5, 6]);
assert.strictEqual(worker.pending, 0);
assert.strictEqual(worked, true);

// Work more than chunk size (variant 1)

handled = [];
handledCounterToResolve = 2;

worked = worker.work([1, 2, 3, 4, 5, 6, 7]);

assertArrayEquals(handled, [1, 2, 3, 4, 5]);
assert.strictEqual(worker.pending, 2);
assert.strictEqual(worked, true);

await handledPromise;

assertArrayEquals(handled, [1, 2, 3, 4, 5, 6, 7]);

handled = [];
handledCounterToResolve = 4;

worked = worker.work([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]);

assertArrayEquals(handled, [1, 2, 3, 4, 5]);
assert.strictEqual(worker.pending, 14);
assert.strictEqual(worked, true);

await handledPromise;

assertArrayEquals(handled, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]);

// Work more than chunk size (variant 2)

handled = [];
handledCounterToResolve = 2;

worked = worker.work([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);

assertArrayEquals(handled, [1, 2, 3, 4, 5]);
assert.strictEqual(worker.pending, 5);
assert.strictEqual(worked, true);

await handledPromise;

assertArrayEquals(handled, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);

// Work more while throttled (variant 1)

handled = [];
handledCounterToResolve = 3;

worked = worker.work([1, 2, 3, 4, 5, 6, 7]);

assertArrayEquals(handled, [1, 2, 3, 4, 5]);
assert.strictEqual(worker.pending, 2);
assert.strictEqual(worked, true);

worker.work([8]);
worked = worker.work([9, 10, 11]);

assertArrayEquals(handled, [1, 2, 3, 4, 5]);
assert.strictEqual(worker.pending, 6);
assert.strictEqual(worked, true);

await handledPromise;

assertArrayEquals(handled, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]);
assert.strictEqual(worker.pending, 0);

// Work more while throttled (variant 2)

handled = [];
handledCounterToResolve = 2;

worked = worker.work([1, 2, 3, 4, 5, 6, 7]);

assertArrayEquals(handled, [1, 2, 3, 4, 5]);
assert.strictEqual(worked, true);

worker.work([8]);
worked = worker.work([9, 10]);

assertArrayEquals(handled, [1, 2, 3, 4, 5]);
assert.strictEqual(worked, true);

await handledPromise;

assertArrayEquals(handled, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
});

test('do not accept too much work', async () => {
let handled: number[] = [];
const handler = (units: readonly number[]) => handled.push(...units);

const worker = new async.ThrottledWorker<number>(5, 5, 1, handler);

let worked = worker.work([1, 2, 3]);
assert.strictEqual(worked, true);

worked = worker.work([1, 2, 3, 4, 5, 6]);
assert.strictEqual(worked, true);
assert.strictEqual(worker.pending, 1);

worked = worker.work([7]);
assert.strictEqual(worked, true);
assert.strictEqual(worker.pending, 2);

worked = worker.work([8, 9, 10, 11]);
assert.strictEqual(worked, false);
assert.strictEqual(worker.pending, 2);
});

test('do not accept too much work (account for max chunk size', async () => {
let handled: number[] = [];
const handler = (units: readonly number[]) => handled.push(...units);

const worker = new async.ThrottledWorker<number>(5, 5, 1, handler);

let worked = worker.work([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]);
assert.strictEqual(worked, false);
assert.strictEqual(worker.pending, 0);

worked = worker.work([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
assert.strictEqual(worked, true);
assert.strictEqual(worker.pending, 5);
});

test('disposed', async () => {
let handled: number[] = [];
const handler = (units: readonly number[]) => handled.push(...units);

const worker = new async.ThrottledWorker<number>(5, undefined, 1, handler);
worker.dispose();
const worked = worker.work([1, 2, 3]);

assertArrayEquals(handled, []);
assert.strictEqual(worker.pending, 0);
assert.strictEqual(worked, false);
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export class ExtensionsWatcher extends Disposable {
const extensionsResource = URI.file(environmentService.extensionsPath);
const extUri = new ExtUri(resource => !fileService.hasCapability(resource, FileSystemProviderCapabilities.PathCaseSensitive));
this._register(fileService.watch(extensionsResource));
this._register(Event.filter(fileService.onDidFilesChange, e => e.raw.some(change => this.doesChangeAffects(change, extensionsResource, extUri)))(() => this.onDidChange()));
this._register(Event.filter(fileService.onDidChangeFilesRaw, raw => raw.some(change => this.doesChangeAffects(change, extensionsResource, extUri)))(() => this.onDidChange()));
}

private doesChangeAffects(change: IFileChange, extensionsResource: URI, extUri: ExtUri): boolean {
Expand Down
Loading

0 comments on commit b06322b

Please sign in to comment.