Skip to content

Commit

Permalink
watcher - correlate events to their requesting source (#194776)
Browse files Browse the repository at this point in the history
* watcher - emit `URI` instead of `string` for faster `fsPath` compute  (for #194341)

* wip

* more

* adopt

* some cleanup

* cleanup

* cleanup

* cleanup

* cleanup

* cleanup

* cleanup

* implement correlation

* cleanup

* add correlation

* undo, leave for later

* tests

* tests

* tests

* tests

* tests

* log cId

* simpler correlation id

* 💄

* tests

* runs

* skip normalization

* fix tests

* tests

* fix tests

* add `createWatcher` API

* partition events in ext host

* allow custom excludes

* remove disk file change

* 💄

* 💄

* 💄

* wire in

* wire in
  • Loading branch information
bpasero authored Oct 10, 2023
1 parent 8988b0f commit 29b6943
Show file tree
Hide file tree
Showing 36 changed files with 879 additions and 389 deletions.
1 change: 1 addition & 0 deletions extensions/vscode-api-tests/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"authSession",
"contribViewsRemote",
"contribStatusBarItems",
"createFileSystemWatcher",
"customEditorMove",
"diffCommand",
"documentFiltersExclusive",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,28 @@ suite('vscode API - workspace-watcher', () => {
}
}

teardown(assertNoRpc);
let fs: WatcherTestFs;
let disposable: vscode.Disposable;

test('createFileSystemWatcher', async function () {
const fs = new WatcherTestFs('watcherTest', false);
vscode.workspace.registerFileSystemProvider('watcherTest', fs);
function onDidWatchPromise() {
const onDidWatchPromise = new Promise<IWatchRequest>(resolve => {
fs.onDidWatch(request => resolve(request));
});

function onDidWatchPromise() {
const onDidWatchPromise = new Promise<IWatchRequest>(resolve => {
fs.onDidWatch(request => resolve(request));
});
return onDidWatchPromise;
}

return onDidWatchPromise;
}
setup(() => {
fs = new WatcherTestFs('watcherTest', false);
disposable = vscode.workspace.registerFileSystemProvider('watcherTest', fs);
});

teardown(() => {
disposable.dispose();
assertNoRpc();
});

test('createFileSystemWatcher (old style)', async function () {

// Non-recursive
let watchUri = vscode.Uri.from({ scheme: 'watcherTest', path: '/somePath/folder' });
Expand All @@ -59,4 +68,29 @@ suite('vscode API - workspace-watcher', () => {
assert.strictEqual(request.uri.toString(), watchUri.toString());
assert.strictEqual(request.options.recursive, true);
});

test('createFileSystemWatcher (new style)', async function () {

// Non-recursive
let watchUri = vscode.Uri.from({ scheme: 'watcherTest', path: '/somePath/folder' });
const watcher = vscode.workspace.createFileSystemWatcher(new vscode.RelativePattern(watchUri, '*.txt'), { excludes: ['testing'], ignoreChangeEvents: true });
let request = await onDidWatchPromise();

assert.strictEqual(request.uri.toString(), watchUri.toString());
assert.strictEqual(request.options.recursive, false);
assert.strictEqual(request.options.excludes.length, 1);
assert.strictEqual(request.options.excludes[0], 'testing');

watcher.dispose();

// Recursive
watchUri = vscode.Uri.from({ scheme: 'watcherTest', path: '/somePath/folder' });
vscode.workspace.createFileSystemWatcher(new vscode.RelativePattern(watchUri, '**/*.txt'), { excludes: ['testing'], ignoreCreateEvents: true });
request = await onDidWatchPromise();

assert.strictEqual(request.uri.toString(), watchUri.toString());
assert.strictEqual(request.options.recursive, true);
assert.strictEqual(request.options.excludes.length, 1);
assert.strictEqual(request.options.excludes[0], 'testing');
});
});
14 changes: 7 additions & 7 deletions src/vs/platform/files/common/diskFileSystemProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { Disposable, IDisposable, toDisposable } from 'vs/base/common/lifecycle'
import { normalize } from 'vs/base/common/path';
import { URI } from 'vs/base/common/uri';
import { IFileChange, IFileSystemProvider, IWatchOptions } from 'vs/platform/files/common/files';
import { AbstractNonRecursiveWatcherClient, AbstractUniversalWatcherClient, IDiskFileChange, ILogMessage, INonRecursiveWatchRequest, IRecursiveWatcherOptions, isRecursiveWatchRequest, IUniversalWatchRequest, toFileChanges } from 'vs/platform/files/common/watcher';
import { AbstractNonRecursiveWatcherClient, AbstractUniversalWatcherClient, ILogMessage, INonRecursiveWatchRequest, IRecursiveWatcherOptions, isRecursiveWatchRequest, IUniversalWatchRequest, reviveFileChanges } from 'vs/platform/files/common/watcher';
import { ILogService, LogLevel } from 'vs/platform/log/common/log';

export interface IDiskFileSystemProviderOptions {
Expand Down Expand Up @@ -72,7 +72,7 @@ export abstract class AbstractDiskFileSystemProvider extends Disposable implemen
private watchUniversal(resource: URI, opts: IWatchOptions): IDisposable {

// Add to list of paths to watch universally
const pathToWatch: IUniversalWatchRequest = { path: this.toFilePath(resource), excludes: opts.excludes, includes: opts.includes, recursive: opts.recursive };
const pathToWatch: IUniversalWatchRequest = { path: this.toFilePath(resource), excludes: opts.excludes, includes: opts.includes, recursive: opts.recursive, correlationId: opts.correlationId };
const remove = insert(this.universalPathsToWatch, pathToWatch);

// Trigger update
Expand Down Expand Up @@ -102,7 +102,7 @@ export abstract class AbstractDiskFileSystemProvider extends Disposable implemen
// Create watcher if this is the first time
if (!this.universalWatcher) {
this.universalWatcher = this._register(this.createUniversalWatcher(
changes => this._onDidChangeFile.fire(toFileChanges(changes)),
changes => this._onDidChangeFile.fire(reviveFileChanges(changes)),
msg => this.onWatcherLogMessage(msg),
this.logService.getLevel() === LogLevel.Trace
));
Expand Down Expand Up @@ -136,7 +136,7 @@ export abstract class AbstractDiskFileSystemProvider extends Disposable implemen
}

protected abstract createUniversalWatcher(
onChange: (changes: IDiskFileChange[]) => void,
onChange: (changes: IFileChange[]) => void,
onLogMessage: (msg: ILogMessage) => void,
verboseLogging: boolean
): AbstractUniversalWatcherClient;
Expand All @@ -153,7 +153,7 @@ export abstract class AbstractDiskFileSystemProvider extends Disposable implemen
private watchNonRecursive(resource: URI, opts: IWatchOptions): IDisposable {

// Add to list of paths to watch non-recursively
const pathToWatch: INonRecursiveWatchRequest = { path: this.toFilePath(resource), excludes: opts.excludes, includes: opts.includes, recursive: false };
const pathToWatch: INonRecursiveWatchRequest = { path: this.toFilePath(resource), excludes: opts.excludes, includes: opts.includes, recursive: false, correlationId: opts.correlationId };
const remove = insert(this.nonRecursivePathsToWatch, pathToWatch);

// Trigger update
Expand Down Expand Up @@ -183,7 +183,7 @@ export abstract class AbstractDiskFileSystemProvider extends Disposable implemen
// Create watcher if this is the first time
if (!this.nonRecursiveWatcher) {
this.nonRecursiveWatcher = this._register(this.createNonRecursiveWatcher(
changes => this._onDidChangeFile.fire(toFileChanges(changes)),
changes => this._onDidChangeFile.fire(reviveFileChanges(changes)),
msg => this.onWatcherLogMessage(msg),
this.logService.getLevel() === LogLevel.Trace
));
Expand All @@ -199,7 +199,7 @@ export abstract class AbstractDiskFileSystemProvider extends Disposable implemen
}

protected abstract createNonRecursiveWatcher(
onChange: (changes: IDiskFileChange[]) => void,
onChange: (changes: IFileChange[]) => void,
onLogMessage: (msg: ILogMessage) => void,
verboseLogging: boolean
): AbstractNonRecursiveWatcherClient;
Expand Down
9 changes: 5 additions & 4 deletions src/vs/platform/files/common/diskFileSystemProviderClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ import { canceled } from 'vs/base/common/errors';
import { Emitter, Event } from 'vs/base/common/event';
import { Disposable, DisposableStore, IDisposable, toDisposable } from 'vs/base/common/lifecycle';
import { newWriteableStream, ReadableStreamEventPayload, ReadableStreamEvents } from 'vs/base/common/stream';
import { URI, UriComponents } from 'vs/base/common/uri';
import { URI } from 'vs/base/common/uri';
import { generateUuid } from 'vs/base/common/uuid';
import { IChannel } from 'vs/base/parts/ipc/common/ipc';
import { createFileSystemProviderError, IFileAtomicReadOptions, FileChangeType, IFileDeleteOptions, IFileOpenOptions, IFileOverwriteOptions, IFileReadStreamOptions, FileSystemProviderCapabilities, FileSystemProviderErrorCode, FileType, IFileWriteOptions, IFileChange, IFileSystemProviderWithFileAtomicReadCapability, IFileSystemProviderWithFileCloneCapability, IFileSystemProviderWithFileFolderCopyCapability, IFileSystemProviderWithFileReadStreamCapability, IFileSystemProviderWithFileReadWriteCapability, IFileSystemProviderWithOpenReadWriteCloseCapability, IStat, IWatchOptions, IFileSystemProviderError } from 'vs/platform/files/common/files';
import { createFileSystemProviderError, IFileAtomicReadOptions, IFileDeleteOptions, IFileOpenOptions, IFileOverwriteOptions, IFileReadStreamOptions, FileSystemProviderCapabilities, FileSystemProviderErrorCode, FileType, IFileWriteOptions, IFileChange, IFileSystemProviderWithFileAtomicReadCapability, IFileSystemProviderWithFileCloneCapability, IFileSystemProviderWithFileFolderCopyCapability, IFileSystemProviderWithFileReadStreamCapability, IFileSystemProviderWithFileReadWriteCapability, IFileSystemProviderWithOpenReadWriteCloseCapability, IStat, IWatchOptions, IFileSystemProviderError } from 'vs/platform/files/common/files';
import { reviveFileChanges } from 'vs/platform/files/common/watcher';

export const LOCAL_FILE_SYSTEM_CHANNEL_NAME = 'localFilesystem';

Expand Down Expand Up @@ -229,10 +230,10 @@ export class DiskFileSystemProviderClient extends Disposable implements
// for both events and errors from the watcher. So we need to
// unwrap the event from the remote and emit through the proper
// emitter.
this._register(this.channel.listen<{ resource: UriComponents; type: FileChangeType }[] | string>('fileChange', [this.sessionId])(eventsOrError => {
this._register(this.channel.listen<IFileChange[] | string>('fileChange', [this.sessionId])(eventsOrError => {
if (Array.isArray(eventsOrError)) {
const events = eventsOrError;
this._onDidChange.fire(events.map(event => ({ resource: URI.revive(event.resource), type: event.type })));
this._onDidChange.fire(reviveFileChanges(events));
} else {
const error = eventsOrError;
this._onDidWatchError.fire(error);
Expand Down
55 changes: 50 additions & 5 deletions src/vs/platform/files/common/fileService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import { extUri, extUriIgnorePathCase, IExtUri, isAbsolutePath } from 'vs/base/c
import { consumeStream, isReadableBufferedStream, isReadableStream, listenStream, newWriteableStream, peekReadable, peekStream, transform } from 'vs/base/common/stream';
import { URI } from 'vs/base/common/uri';
import { localize } from 'vs/nls';
import { ensureFileSystemProviderError, etag, ETAG_DISABLED, FileChangesEvent, IFileDeleteOptions, FileOperation, FileOperationError, FileOperationEvent, FileOperationResult, FilePermission, FileSystemProviderCapabilities, FileSystemProviderErrorCode, FileType, hasFileAtomicReadCapability, hasFileFolderCopyCapability, hasFileReadStreamCapability, hasOpenReadWriteCloseCapability, hasReadWriteCapability, ICreateFileOptions, IFileContent, IFileService, IFileStat, IFileStatWithMetadata, IFileStreamContent, IFileSystemProvider, IFileSystemProviderActivationEvent, IFileSystemProviderCapabilitiesChangeEvent, IFileSystemProviderRegistrationEvent, IFileSystemProviderWithFileAtomicReadCapability, IFileSystemProviderWithFileReadStreamCapability, IFileSystemProviderWithFileReadWriteCapability, IFileSystemProviderWithOpenReadWriteCloseCapability, IReadFileOptions, IReadFileStreamOptions, IResolveFileOptions, IFileStatResult, IFileStatResultWithMetadata, IResolveMetadataFileOptions, IStat, IFileStatWithPartialMetadata, IWatchOptions, IWriteFileOptions, NotModifiedSinceFileOperationError, toFileOperationResult, toFileSystemProviderErrorCode, hasFileCloneCapability, TooLargeFileOperationError, hasFileAtomicDeleteCapability, hasFileAtomicWriteCapability } from 'vs/platform/files/common/files';
import { ensureFileSystemProviderError, etag, ETAG_DISABLED, FileChangesEvent, IFileDeleteOptions, FileOperation, FileOperationError, FileOperationEvent, FileOperationResult, FilePermission, FileSystemProviderCapabilities, FileSystemProviderErrorCode, FileType, hasFileAtomicReadCapability, hasFileFolderCopyCapability, hasFileReadStreamCapability, hasOpenReadWriteCloseCapability, hasReadWriteCapability, ICreateFileOptions, IFileContent, IFileService, IFileStat, IFileStatWithMetadata, IFileStreamContent, IFileSystemProvider, IFileSystemProviderActivationEvent, IFileSystemProviderCapabilitiesChangeEvent, IFileSystemProviderRegistrationEvent, IFileSystemProviderWithFileAtomicReadCapability, IFileSystemProviderWithFileReadStreamCapability, IFileSystemProviderWithFileReadWriteCapability, IFileSystemProviderWithOpenReadWriteCloseCapability, IReadFileOptions, IReadFileStreamOptions, IResolveFileOptions, IFileStatResult, IFileStatResultWithMetadata, IResolveMetadataFileOptions, IStat, IFileStatWithPartialMetadata, IWatchOptions, IWriteFileOptions, NotModifiedSinceFileOperationError, toFileOperationResult, toFileSystemProviderErrorCode, hasFileCloneCapability, TooLargeFileOperationError, hasFileAtomicDeleteCapability, hasFileAtomicWriteCapability, IWatchOptionsWithCorrelation, IFileSystemWatcher, IWatchOptionsWithoutCorrelation } from 'vs/platform/files/common/files';
import { readFileIntoStream } from 'vs/platform/files/common/io';
import { ILogService } from 'vs/platform/log/common/log';
import { ErrorNoTelemetry } from 'vs/base/common/errors';
Expand Down Expand Up @@ -63,7 +63,17 @@ export class FileService extends Disposable implements IFileService {
this._onDidChangeFileSystemProviderRegistrations.fire({ added: true, scheme, provider });

// Forward events from provider
providerDisposables.add(provider.onDidChangeFile(changes => this._onDidFilesChange.fire(new FileChangesEvent(changes, !this.isPathCaseSensitive(provider)))));
providerDisposables.add(provider.onDidChangeFile(changes => {
const event = new FileChangesEvent(changes, !this.isPathCaseSensitive(provider));

// Always emit any event internally
this.internalOnDidFilesChange.fire(event);

// Only emit uncorrelated events in the global `onDidFilesChange` event
if (!event.hasCorrelation()) {
this._onDidUncorrelatedFilesChange.fire(event);
}
}));
if (typeof provider.onDidWatchError === 'function') {
providerDisposables.add(provider.onDidWatchError(error => this._onDidWatchError.fire(new Error(error))));
}
Expand Down Expand Up @@ -1094,15 +1104,31 @@ export class FileService extends Disposable implements IFileService {

//#region File Watching

private readonly _onDidFilesChange = this._register(new Emitter<FileChangesEvent>());
readonly onDidFilesChange = this._onDidFilesChange.event;
private readonly internalOnDidFilesChange = this._register(new Emitter<FileChangesEvent>());

private readonly _onDidUncorrelatedFilesChange = this._register(new Emitter<FileChangesEvent>());
readonly onDidFilesChange = this._onDidUncorrelatedFilesChange.event; // global `onDidFilesChange` skips correlated events

private readonly _onDidWatchError = this._register(new Emitter<Error>());
readonly onDidWatchError = this._onDidWatchError.event;

private readonly activeWatchers = new Map<number /* watch request hash */, { disposable: IDisposable; count: number }>();

watch(resource: URI, options: IWatchOptions = { recursive: false, excludes: [] }): IDisposable {
private static WATCHER_CORRELATION_IDS = 0;

createWatcher(resource: URI, options: IWatchOptionsWithoutCorrelation): IFileSystemWatcher {
return this.watch(resource, {
...options,
// Explicitly set a correlation id so that file events that originate
// from requests from extensions are exclusively routed back to the
// extension host and not into the workbench.
correlationId: FileService.WATCHER_CORRELATION_IDS++
});
}

watch(resource: URI, options: IWatchOptionsWithCorrelation): IFileSystemWatcher;
watch(resource: URI, options?: IWatchOptionsWithoutCorrelation): IDisposable;
watch(resource: URI, options: IWatchOptions = { recursive: false, excludes: [] }): IFileSystemWatcher | IDisposable {
const disposables = new DisposableStore();

// Forward watch request to provider and wire in disposables
Expand All @@ -1125,6 +1151,25 @@ export class FileService extends Disposable implements IFileService {
}
})();

// When a correlation identifier is set, return a specific
// watcher that only emits events matching that correalation.
const correlationId = options.correlationId;
if (typeof correlationId === 'number') {
const fileChangeEmitter = disposables.add(new Emitter<FileChangesEvent>());
disposables.add(this.internalOnDidFilesChange.event(e => {
if (e.correlates(correlationId)) {
fileChangeEmitter.fire(e);
}
}));

const watcher: IFileSystemWatcher = {
onDidChange: fileChangeEmitter.event,
dispose: () => disposables.dispose()
};

return watcher;
}

return disposables;
}

Expand Down
Loading

0 comments on commit 29b6943

Please sign in to comment.