Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use async in watch file changes #4026

Merged
merged 14 commits into from
Sep 23, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
ILogger,
ProgressLocation,
URI,
Uri,
formatLocalize,
localize,
raceCancellation,
Expand Down Expand Up @@ -77,15 +78,15 @@ export class MainThreadFileSystemEvent extends Disposable {
for (const change of changes) {
switch (change.type) {
case FileChangeType.ADDED:
events.created.push(new URI(change.uri).codeUri);
events.created.push(Uri.parse(change.uri));
hasResult = true;
break;
case FileChangeType.UPDATED:
events.changed.push(new URI(change.uri).codeUri);
events.changed.push(Uri.parse(change.uri));
hasResult = true;
break;
case FileChangeType.DELETED:
events.deleted.push(new URI(change.uri).codeUri);
events.deleted.push(Uri.parse(change.uri));
hasResult = true;
break;
}
Expand Down
62 changes: 22 additions & 40 deletions packages/file-service/src/node/recursive/file-service-watcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ import paths from 'path';

import ParcelWatcher from '@parcel/watcher';
import fs from 'fs-extra';
import debounce from 'lodash/debounce';
import throttle from 'lodash/throttle';
import uniqBy from 'lodash/uniqBy';

import { Autowired, Injectable, Optional } from '@opensumi/di';
import {
Disposable,
DisposableCollection,
FRAME_FIVE,
FileUri,
IDisposable,
ILogService,
Expand All @@ -22,6 +23,7 @@ import {

import { FileChangeType, FileSystemWatcherClient, IFileSystemWatcherServer, INsfw, WatchOptions } from '../../common';
import { FileChangeCollection } from '../file-change-collection';
import { shouldIgnorePath } from '../shared';

export interface WatcherOptions {
excludesPattern: ParsedPattern[];
Expand All @@ -46,6 +48,8 @@ export interface NsfwFileSystemWatcherOption {

@Injectable({ multiple: true })
export class FileSystemWatcherServer implements IFileSystemWatcherServer {
protected readonly _disposables = new DisposableCollection();

private static readonly PARCEL_WATCHER_BACKEND = isWindows ? 'windows' : isLinux ? 'inotify' : 'fs-events';

private WATCHER_HANDLERS = new Map<
Expand All @@ -57,8 +61,6 @@ export class FileSystemWatcherServer implements IFileSystemWatcherServer {

protected client: FileSystemWatcherClient | undefined;

protected readonly toDispose = new DisposableCollection(Disposable.create(() => this.setClient(undefined)));

protected changes = new FileChangeCollection();

@Autowired(ILogServiceManager)
Expand All @@ -71,7 +73,7 @@ export class FileSystemWatcherServer implements IFileSystemWatcherServer {
}

dispose(): void {
this.toDispose.dispose();
this._disposables.dispose();
this.WATCHER_HANDLERS.clear();
}

Expand Down Expand Up @@ -150,7 +152,7 @@ export class FileSystemWatcherServer implements IFileSystemWatcherServer {
});
toDisposeWatcher.push(Disposable.create(() => this.WATCHER_HANDLERS.delete(watcherId as number)));
toDisposeWatcher.push(await this.start(watcherId, watchPath, options));
this.toDispose.push(toDisposeWatcher);
this._disposables.push(toDisposeWatcher);
return watcherId;
}

Expand Down Expand Up @@ -180,24 +182,10 @@ export class FileSystemWatcherServer implements IFileSystemWatcherServer {
* @param events
*/
protected trimChangeEvent(events: ParcelWatcher.Event[]): ParcelWatcher.Event[] {
events = events.filter((event: ParcelWatcher.Event) => {
if (event.path) {
if (this.isTempFile(event.path)) {
// write-file-atomic 源文件xxx.xx 对应的临时文件为 xxx.xx.22243434
// 这类文件的更新应当完全隐藏掉
return false;
}
}
return true;
});

events = events.filter((event: ParcelWatcher.Event) => !shouldIgnorePath(event.path));
return events;
}

private isTempFile(path: string) {
return /\.\d{7}\d+$/.test(path);
}

private getDefaultWatchExclude() {
return ['**/.git/objects/**', '**/.git/subtree-cache/**', '**/node_modules/**/*', '**/.hg/store/**'];
}
Expand Down Expand Up @@ -251,20 +239,14 @@ export class FileSystemWatcherServer implements IFileSystemWatcherServer {
return undefined; // watch 失败则返回 undefined
};

/**
* 由于 parcel/watcher 在 Linux 下存在内存越界访问问题触发了 sigsegv 导致 crash,所以在 Linux 下仍旧使用 nsfw
* 社区相关 issue: https://github.com/parcel-bundler/watcher/issues/49
* 后续这里的 watcher 模块需要重构掉,先暂时这样处理
*
* 代码来自 issue: https://github.com/opensumi/core/pull/1437/files?diff=split&w=0#diff-9de963117a88a70d7c58974bf2b092c61a196d6eef719846d78ca5c9d100b796 的旧代码处理
*/
if (this.isEnableNSFW()) {
const nsfw = await this.withNSFWModule();
const watcher: INsfw.NSFW = await nsfw(
realPath,
(events: INsfw.ChangeEvent[]) => this.handleNSFWEvents(events, watcherId),
{
errorCallback: (error: any) => {
errorCallback: (err) => {
this.logger.error('NSFW watcher encountered an error and will stop watching.', err);
// see https://github.com/atom/github/issues/342
this.unwatchFileChanges(watcherId);
},
Expand Down Expand Up @@ -314,15 +296,15 @@ export class FileSystemWatcherServer implements IFileSystemWatcherServer {
}

setClient(client: FileSystemWatcherClient | undefined) {
if (client && this.toDispose.disposed) {
if (client && this._disposables.disposed) {
return;
}
this.client = client;
}

/**
* @deprecated
* 主要是用来跳过 jest 测试
* 由于 parcel/watcher 在 Linux 下存在内存越界访问问题触发了 sigsegv 导致 crash,所以在 Linux 下仍旧使用 nsfw
* 社区相关 issue: https://github.com/parcel-bundler/watcher/issues/49
*/
private isEnableNSFW(): boolean {
return isLinux;
Expand All @@ -347,7 +329,7 @@ export class FileSystemWatcherServer implements IFileSystemWatcherServer {
return true;
}

return !this.isTempFile(event.file!);
return !shouldIgnorePath(event.file);
});
// 合并下事件,由于 resolvePath 耗时较久,这里只用当前事件路径及文件名去重,后续处理事件再获取真实路径
const mergedEvents = uniqBy(filterEvents, (event) => {
Expand All @@ -362,30 +344,30 @@ export class FileSystemWatcherServer implements IFileSystemWatcherServer {

for (const event of mergedEvents) {
if (event.action === INsfw.actions.RENAMED) {
const deletedPath = this.resolvePath(event.directory, event.oldFile!);
const deletedPath = await this.resolvePath(event.directory, event.oldFile!);
if (isIgnored(watcherId, deletedPath)) {
continue;
}

this.pushDeleted(deletedPath);

if (event.newDirectory) {
const path = this.resolvePath(event.newDirectory, event.newFile!);
const path = await this.resolvePath(event.newDirectory, event.newFile!);
if (isIgnored(watcherId, path)) {
continue;
}

this.pushAdded(path);
} else {
const path = this.resolvePath(event.directory, event.newFile!);
const path = await this.resolvePath(event.directory, event.newFile!);
if (isIgnored(watcherId, path)) {
continue;
}

this.pushAdded(path);
}
} else {
const path = this.resolvePath(event.directory, event.file!);
const path = await this.resolvePath(event.directory, event.file!);
if (isIgnored(watcherId, path)) {
continue;
}
Expand Down Expand Up @@ -426,16 +408,16 @@ export class FileSystemWatcherServer implements IFileSystemWatcherServer {
this.fireDidFilesChanged();
}

protected resolvePath(directory: string, file: string): string {
protected async resolvePath(directory: string, file: string): Promise<string> {
const path = paths.join(directory, file);
// 如果是 linux 则获取一下真实 path,以防返回的是软连路径被过滤
if (isLinux) {
try {
return fs.realpathSync.native(path);
return await fs.realpath.native(path);
} catch (_e) {
try {
// file does not exist try to resolve directory
return paths.join(fs.realpathSync.native(directory), file);
return paths.join(await fs.realpath.native(directory), file);
} catch (_e) {
// directory does not exist fall back to symlink
return path;
Expand All @@ -449,7 +431,7 @@ export class FileSystemWatcherServer implements IFileSystemWatcherServer {
* Fires file changes to clients.
* It is debounced in the case if the filesystem is spamming to avoid overwhelming clients with events.
*/
protected readonly fireDidFilesChanged: () => void = debounce(() => this.doFireDidFilesChanged(), 100);
protected readonly fireDidFilesChanged: () => void = throttle(() => this.doFireDidFilesChanged(), FRAME_FIVE);
protected doFireDidFilesChanged(): void {
const changes = this.changes.values();
this.changes = new FileChangeCollection();
Expand Down
13 changes: 13 additions & 0 deletions packages/file-service/src/node/shared/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
export function shouldIgnorePath(path?: string) {
if (!path) {
return true;
}

if (/\.\d{7}\d+$/.test(path)) {
// write-file-atomic 源文件xxx.xx 对应的临时文件为 xxx.xx.22243434
// 这类文件的更新应当完全隐藏掉
return true;
}

return false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {

import { FileChangeType, FileSystemWatcherClient, IFileSystemWatcherServer } from '../../common/index';
import { FileChangeCollection } from '../file-change-collection';
import { shouldIgnorePath } from '../shared';
const { join, basename, normalize } = path;
@Injectable({ multiple: true })
export class UnRecursiveFileSystemWatcher implements IFileSystemWatcherServer {
Expand Down Expand Up @@ -96,7 +97,7 @@ export class UnRecursiveFileSystemWatcher implements IFileSystemWatcherServer {
});

watcher.on('change', (type: string, filename: string | Buffer) => {
if (this.isTemporaryFile(filename as string)) {
if (shouldIgnorePath(filename as string)) {
return;
}

Expand Down Expand Up @@ -248,15 +249,4 @@ export class UnRecursiveFileSystemWatcher implements IFileSystemWatcherServer {
}
this.client = client;
}

protected isTemporaryFile(path: string): boolean {
if (path) {
if (/\.\d{7}\d+$/.test(path)) {
// write-file-atomic 源文件xxx.xx 对应的临时文件为 xxx.xx.22243434
// 这类文件的更新应当完全隐藏掉
return true;
}
}
return false;
}
}
2 changes: 2 additions & 0 deletions packages/utils/src/async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ export type MaybePromise<T> = T | Promise<T> | PromiseLike<T>;
export const FRAME_ONE = 16;
export const FRAME_TWO = FRAME_ONE * 2;
export const FRAME_THREE = FRAME_ONE * 3;
export const FRAME_FOUR = FRAME_ONE * 4;
export const FRAME_FIVE = FRAME_ONE * 5;

export interface CancelablePromise<T> extends Promise<T> {
cancel(): void;
Expand Down
Loading