diff --git a/package.json b/package.json index 79ccf50158..25e13c60c8 100644 --- a/package.json +++ b/package.json @@ -29,6 +29,7 @@ "@atom/source-map-support": "^0.3.4", "@babel/core": "7.18.6", "@electron/remote": "2.1.2", + "@parcel/watcher": "^2.5.1", "@pulsar-edit/fuzzy-native": "https://github.com/pulsar-edit/fuzzy-native.git#670c97d95ac22e4ac54aa6ebda0f2e3d3716764a", "about": "file:packages/about", "archive-view": "file:packages/archive-view", @@ -68,6 +69,7 @@ "etch": "0.14.1", "event-kit": "^2.5.3", "exception-reporting": "file:packages/exception-reporting", + "fdir": "6.4.6", "find-and-replace": "file:packages/find-and-replace", "find-parent-dir": "^0.3.0", "focus-trap": "6.3.0", diff --git a/spec/path-watcher-spec.js b/spec/path-watcher-spec.js index 993041e2cd..25ec0a4ae9 100644 --- a/spec/path-watcher-spec.js +++ b/spec/path-watcher-spec.js @@ -43,7 +43,7 @@ describe('File', () => { file.unsubscribeFromNativeChangeEvents(); fs.removeSync(filePath); closeAllWatchers(); - await stopAllWatchers(); + await watchPath.reset(); await wait(100); }); @@ -323,7 +323,7 @@ describe('watchPath', function () { afterEach(async function () { subs.dispose(); - await stopAllWatchers(); + await watchPath.reset(); }); function waitForChanges(watcher, ...fileNames) { @@ -348,123 +348,181 @@ describe('watchPath', function () { }); } - describe('watchPath()', function () { - it('resolves the returned promise when the watcher begins listening', async function () { - const rootDir = await tempMkdir('atom-fsmanager-test-'); + const WATCHER_IMPLEMENTATIONS = ['nsfw', 'parcel']; - const watcher = await watchPath(rootDir, {}, () => {}); - expect(watcher.constructor.name).toBe('PathWatcher'); - }); + for (let impl of WATCHER_IMPLEMENTATIONS) { + describe(`watchPath() (${impl})`, function () { + let disposables; + beforeEach(async () => { + jasmine.useRealClock(); + atom.config.set('core.fileSystemWatcher', impl); + // Changing the config setting will trigger an async transition to new + // file-watchers. This helper method lets us wait until that transition + // has finished. + await watchPath.waitForTransition(); + disposables = new CompositeDisposable(); + }); - it('reuses an existing native watcher and resolves getStartPromise immediately if attached to a running watcher', async function () { - const rootDir = await tempMkdir('atom-fsmanager-test-'); + afterEach(() => { + disposables?.dispose(); + }); - const watcher0 = await watchPath(rootDir, {}, () => {}); - const watcher1 = await watchPath(rootDir, {}, () => {}); + it('resolves the returned promise when the watcher begins listening', async function () { + const rootDir = await tempMkdir('atom-fsmanager-test-'); + const watcher = await watchPath(rootDir, {}, () => {}); + disposables.add(watcher); + expect(watcher.constructor.name).toBe('PathWatcher'); + }); - expect(watcher0.native).toBe(watcher1.native); - }); + it('respects `core.ignoredNames`', async () => { + jasmine.useRealClock(); - it("reuses existing native watchers even while they're still starting", async function () { - const rootDir = await tempMkdir('atom-fsmanager-test-'); + let existing = atom.config.get('core.ignoredNames'); + atom.config.set( + 'core.ignoredNames', + [...existing, 'some-other-dir'] + ); - const [watcher0, watcher1] = await Promise.all([ - watchPath(rootDir, {}, () => {}), - watchPath(rootDir, {}, () => {}) - ]); - expect(watcher0.native).toBe(watcher1.native); - }); + const rootDir = await tempMkdir('atom-fsmanager-test-'); - it("doesn't attach new watchers to a native watcher that's stopping", async function () { - const rootDir = await tempMkdir('atom-fsmanager-test-'); + // Create a directory that will be affected by our `core.ignoredNames` + // value. + let ignoredDir = path.join(rootDir, 'some-other-dir'); + await mkdir(ignoredDir, { recursive: true }); - const watcher0 = await watchPath(rootDir, {}, () => {}); - const native0 = watcher0.native; + let spy = jasmine.createSpy(); - watcher0.dispose(); - const watcher1 = await watchPath(rootDir, {}, () => {}); + let watcher = await watchPath(rootDir, {}, spy); + disposables.add(watcher); - expect(watcher1.native).not.toBe(native0); - }); + // Writing a file to a path within an ignored directory should not + // trigger the callback… + await writeFile(path.join(ignoredDir, 'foo.txt'), 'something'); + // (file-watchers might have a debounce interval) + await wait(process.env.CI ? 3000 : 1000); + expect(spy).not.toHaveBeenCalled(); - it('reuses an existing native watcher on a parent directory and filters events', async function () { - const rootDir = await tempMkdir('atom-fsmanager-test-').then(realpath); - const rootFile = path.join(rootDir, 'rootfile.txt'); - const subDir = path.join(rootDir, 'subdir'); - const subFile = path.join(subDir, 'subfile.txt'); + // …but writing a file to a path outside of an ignored directory should + // trigger the callback. + await writeFile(path.join(rootDir, 'foo.txt'), 'something'); + // (file-watchers might have a debounce interval) + await wait(process.env.CI ? 3000 : 1000); + expect(spy).toHaveBeenCalled(); + }); - await mkdir(subDir); + it('reuses an existing native watcher and resolves getStartPromise immediately if attached to a running watcher', async function () { + const rootDir = await tempMkdir('atom-fsmanager-test-'); - // Keep the watchers alive with an undisposed subscription - const rootWatcher = await watchPath(rootDir, {}, () => {}); - const childWatcher = await watchPath(subDir, {}, () => {}); + const watcher0 = await watchPath(rootDir, {}, () => {}); + const watcher1 = await watchPath(rootDir, {}, () => {}); - expect(rootWatcher.native).toBe(childWatcher.native); - expect(rootWatcher.native.isRunning()).toBe(true); + disposables.add(watcher0, watcher1); - const firstChanges = Promise.all([ - waitForChanges(rootWatcher, subFile), - waitForChanges(childWatcher, subFile) - ]); - await writeFile(subFile, 'subfile\n', { encoding: 'utf8' }); - await firstChanges; + expect(watcher0.native).toBe(watcher1.native); + }); - const nextRootEvent = waitForChanges(rootWatcher, rootFile); - await writeFile(rootFile, 'rootfile\n', { encoding: 'utf8' }); - await nextRootEvent; - }); + it("reuses existing native watchers even while they're still starting", async function () { + const rootDir = await tempMkdir('atom-fsmanager-test-'); - it('adopts existing child watchers and filters events appropriately to them', async function () { - const parentDir = await tempMkdir('atom-fsmanager-test-').then(realpath); - - // Create the directory tree - const rootFile = path.join(parentDir, 'rootfile.txt'); - const subDir0 = path.join(parentDir, 'subdir0'); - const subFile0 = path.join(subDir0, 'subfile0.txt'); - const subDir1 = path.join(parentDir, 'subdir1'); - const subFile1 = path.join(subDir1, 'subfile1.txt'); - - await mkdir(subDir0); - await mkdir(subDir1); - await Promise.all([ - writeFile(rootFile, 'rootfile\n', { encoding: 'utf8' }), - writeFile(subFile0, 'subfile 0\n', { encoding: 'utf8' }), - writeFile(subFile1, 'subfile 1\n', { encoding: 'utf8' }) - ]); - - // Begin the child watchers and keep them alive - const subWatcher0 = await watchPath(subDir0, {}, () => {}); - const subWatcherChanges0 = waitForChanges(subWatcher0, subFile0); - - const subWatcher1 = await watchPath(subDir1, {}, () => {}); - const subWatcherChanges1 = waitForChanges(subWatcher1, subFile1); - - expect(subWatcher0.native).not.toBe(subWatcher1.native); - - // Create the parent watcher - const parentWatcher = await watchPath(parentDir, {}, () => {}); - const parentWatcherChanges = waitForChanges( - parentWatcher, - rootFile, - subFile0, - subFile1 - ); + const [watcher0, watcher1] = await Promise.all([ + watchPath(rootDir, {}, () => {}), + watchPath(rootDir, {}, () => {}) + ]); + expect(watcher0.native).toBe(watcher1.native); + }); + + it("doesn't attach new watchers to a native watcher that's stopping", async function () { + const rootDir = await tempMkdir('atom-fsmanager-test-'); + + const watcher0 = await watchPath(rootDir, {}, () => {}); + const native0 = watcher0.native; - expect(subWatcher0.native).toBe(parentWatcher.native); - expect(subWatcher1.native).toBe(parentWatcher.native); - - // Ensure events are filtered correctly - await Promise.all([ - appendFile(rootFile, 'change\n', { encoding: 'utf8' }), - appendFile(subFile0, 'change\n', { encoding: 'utf8' }), - appendFile(subFile1, 'change\n', { encoding: 'utf8' }) - ]); - - await Promise.all([ - subWatcherChanges0, - subWatcherChanges1, - parentWatcherChanges - ]); + watcher0.dispose(); + const watcher1 = await watchPath(rootDir, {}, () => {}); + + expect(watcher1.native).not.toBe(native0); + }); + + it('reuses an existing native watcher on a parent directory and filters events', async function () { + const rootDir = await tempMkdir('atom-fsmanager-test-').then(realpath); + const rootFile = path.join(rootDir, 'rootfile.txt'); + const subDir = path.join(rootDir, 'subdir'); + const subFile = path.join(subDir, 'subfile.txt'); + + await mkdir(subDir); + + // Keep the watchers alive with an undisposed subscription + const rootWatcher = await watchPath(rootDir, {}, () => {}); + const childWatcher = await watchPath(subDir, {}, () => {}); + + expect(rootWatcher.native).toBe(childWatcher.native); + expect(rootWatcher.native.isRunning()).toBe(true); + + const firstChanges = Promise.all([ + waitForChanges(rootWatcher, subFile), + waitForChanges(childWatcher, subFile) + ]); + await writeFile(subFile, 'subfile\n', { encoding: 'utf8' }); + await firstChanges; + + const nextRootEvent = waitForChanges(rootWatcher, rootFile); + await writeFile(rootFile, 'rootfile\n', { encoding: 'utf8' }); + await nextRootEvent; + }); + + it('adopts existing child watchers and filters events appropriately to them', async function () { + const parentDir = await tempMkdir('atom-fsmanager-test-').then(realpath); + + // Create the directory tree + const rootFile = path.join(parentDir, 'rootfile.txt'); + const subDir0 = path.join(parentDir, 'subdir0'); + const subFile0 = path.join(subDir0, 'subfile0.txt'); + const subDir1 = path.join(parentDir, 'subdir1'); + const subFile1 = path.join(subDir1, 'subfile1.txt'); + + await mkdir(subDir0); + await mkdir(subDir1); + await Promise.all([ + writeFile(rootFile, 'rootfile\n', { encoding: 'utf8' }), + writeFile(subFile0, 'subfile 0\n', { encoding: 'utf8' }), + writeFile(subFile1, 'subfile 1\n', { encoding: 'utf8' }) + ]); + + // Begin the child watchers and keep them alive + const subWatcher0 = await watchPath(subDir0, {}, () => {}); + const subWatcherChanges0 = waitForChanges(subWatcher0, subFile0); + + const subWatcher1 = await watchPath(subDir1, {}, () => {}); + const subWatcherChanges1 = waitForChanges(subWatcher1, subFile1); + + expect(subWatcher0.native).not.toBe(subWatcher1.native); + + // Create the parent watcher + const parentWatcher = await watchPath(parentDir, {}, () => {}); + const parentWatcherChanges = waitForChanges( + parentWatcher, + rootFile, + subFile0, + subFile1 + ); + + expect(subWatcher0.native).toBe(parentWatcher.native); + expect(subWatcher1.native).toBe(parentWatcher.native); + + // Ensure events are filtered correctly + await Promise.all([ + appendFile(rootFile, 'change\n', { encoding: 'utf8' }), + appendFile(subFile0, 'change\n', { encoding: 'utf8' }), + appendFile(subFile1, 'change\n', { encoding: 'utf8' }) + ]); + + await Promise.all([ + subWatcherChanges0, + subWatcherChanges1, + parentWatcherChanges + ]); + }); }); - }); + } + }); diff --git a/src/atom-environment.js b/src/atom-environment.js index 52ff32268f..5379a60193 100644 --- a/src/atom-environment.js +++ b/src/atom-environment.js @@ -468,6 +468,7 @@ class AtomEnvironment { // need other disposing objects to be able to check it. We won't need to // reset it because another environment will be created. this.isDestroying = true; + this.emitter.emit('will-destroy'); this.disposables.dispose(); if (this.workspace) this.workspace.destroy(); @@ -482,6 +483,20 @@ class AtomEnvironment { this.uninstallWindowEventHandler(); } + /** + * @memberof AtomEnvironment + * @function onWillDestroy + * @desc Invoke the given callback when the environment is destroying, as + * happens during window close or reload. + * @param {function} callback - Function to be called when the environment is + * destroying. + * @returns {Disposable} on which `.dispose()` can be called to unsubscribe. + * @category Event Subscription + */ + onWillDestroy (callback) { + return this.emitter.on('will-destroy', callback); + } + /** * @memberof AtomEnvironment * @function onDidBeep diff --git a/src/config-schema.js b/src/config-schema.js index a46ba18b50..bdfaef4f18 100644 --- a/src/config-schema.js +++ b/src/config-schema.js @@ -1,6 +1,7 @@ // This is loaded by atom-environment.coffee. See -// https://atom.io/docs/api/latest/Config for more information about config TODO: Link to Pulsar API site when documented -// schemas. +// https://atom.io/docs/api/latest/Config for more information about config +// +// TODO: Link to Pulsar API site when documented schemas. const configSchema = { core: { type: 'object', @@ -347,13 +348,21 @@ const configSchema = { }, fileSystemWatcher: { description: - 'Choose the underlying implementation used to watch for filesystem changes. Emulating changes will miss any events caused by applications other than Pulsar, but may help prevent crashes or freezes.', + 'Choose the underlying implementation used to watch for filesystem changes. It’s best to let Pulsar manage this, but you can change this value if you want to opt into a specific watcher that may work better for your platform.', type: 'string', - default: 'native', + default: 'default', enum: [ { - value: 'native', - description: 'Native operating system APIs' + value: 'default', + description: 'Default (let Pulsar decide)' + }, + { + value: 'nsfw', + description: 'Node Sentinel File Watcher' + }, + { + value: 'parcel', + description: '@parcel/watcher' } ] }, diff --git a/src/nsfw-watcher-worker.js b/src/nsfw-watcher-worker.js new file mode 100644 index 0000000000..51993388b1 --- /dev/null +++ b/src/nsfw-watcher-worker.js @@ -0,0 +1,211 @@ +/* global emit */ + +const nsfw = require('nsfw'); +const minimatch = require('minimatch'); +const { fdir } = require('fdir'); +const path = require('path'); + +const ACTION_MAP = new Map([ + [nsfw.actions.MODIFIED, 'modified'], + [nsfw.actions.CREATED, 'created'], + [nsfw.actions.DELETED, 'deleted'], + [nsfw.actions.RENAMED, 'renamed'] +]); + +// Organizes watchers by unique ID. +const WATCHERS_BY_PATH = new Map(); + +function onError(instance, err) { + emit('watcher:error', { id: instance, error: err.message }); +} + +function handler(instance, events) { + let normalizedEvents = events.map((event) => { + const action = + ACTION_MAP.get(event.action) || `unexpected (${event.action})`; + const payload = { action }; + + if (event.file) { + payload.path = path.join(event.directory, event.file); + } else { + payload.oldPath = path.join( + event.directory, + typeof event.oldFile === 'undefined' ? '' : event.oldFile + ); + payload.path = path.join( + event.directory, + typeof event.newFile === 'undefined' ? '' : event.newFile + ); + } + + return payload; + }); + + console.log('File events:', normalizedEvents) + + emit('watcher:events', { + id: instance, + events: normalizedEvents + }); +} + +// A shim over the real `console` methods so that they send log messages back +// to the renderer process instead of making us dig into their own console. +const console = { + enabled: false, + log(...args) { + if (!this.enabled) return; + emit('console:log', ['nsfw-worker', ...args]); + }, + warn(...args) { + if (!this.enabled) return; + emit('console:warn', ['nsfw-worker', ...args]); + }, + error(...args) { + // Send errors whether logging is enabled or not. + emit('console:error', ['nsfw-worker', ...args]); + } +}; + +// Given a root path and a list of globs, generates a list of excluded paths to +// pass to the `nsfw` watcher. +// +// This is _painful_! It drives us nuts because what we really want is to give +// these globs to `nsfw` and have it use them when adding a recursive watcher. +// (On Linux, it does this by spidering its way through the descendant folders +// and adding `inotify` watches on each, but it should ignore some altogether!) +// +// But `nsfw` doesn't take globs; it takes explicit absolute paths. So we have +// to do the filesystem crawling ourselves. +// +// If we were running this watcher on the renderer process, we'd have to worry +// about scheduling this work so as not to lock up the process. But we can more +// easily afford to go synchronous here because we're on a worker process. +async function buildExcludedPaths(normalizedPath, ignoredNames) { + let results = []; + let _totalTimeSpentMinimatching = 0; + let start = new Date().valueOf(); + console.log('Beginning generation of exclusions', normalizedPath, ignoredNames, performance.now()); + await new fdir() + .withDirs() + .onlyDirs() + // Treat symlinks as though they're genuinely in the places they pretend to + // be. + .withSymlinks({ resolvePaths: false }) + .exclude((_, dirPath) => { + // This is a trick. Returning `true` from this handler will prevent the + // filesystem crawler from diving any deeper down this path. That's what + // we want for each directory that matches any of our globs. So we + // assemble the results at the same time that we prevent further crawling + // for a certain path. + let start = performance.now(); + let matches = ignoredNames.some(pattern => minimatch(dirPath, pattern, { matchBase: true })) + let stop = performance.now(); + _totalTimeSpentMinimatching += (stop - start); + if (matches) { + results.push(dirPath); + return true; + } + }) + // Don't actually return any results, since we compile our exclusions a + // different way. This probably doesn't help much, but no sense in building + // a big array full of paths when we're not going to use it. + .filter(() => false) + .crawl(normalizedPath) + // We could go synchronous here because we're in a worker and it won't lock + // up the renderer process. But some tests suggest that this actually + // finishes faster if we let it go async. + .withPromise(); + + let end = new Date().valueOf(); + + console.log('Generated exclusions in', end - start, 'ms', 'with time spent minimatching:', _totalTimeSpentMinimatching); + let excludedPaths = results; + return excludedPaths; +} + +// Reacts to messages sent by the renderer. +async function handleMessage(message) { + let { id, event = null, args } = JSON.parse(message); + console.log('handleMessage:', id, event, args); + switch (event) { + case 'watcher:watch': { + // `instance` is a unique ID for the watcher instance. We use it when we + // push filesystem events so that they can be routed back to the correct + // instance. + let { normalizedPath, instance, ignored } = args; + console.log('handling watcher:watch with normalizedPath', normalizedPath, 'and ignored', ignored); + let wrappedHandler = (err, events) => handler(instance, err, events); + try { + let excludedPaths = await buildExcludedPaths(normalizedPath, ignored); + console.log('Excluded paths:', instance, excludedPaths); + + let watcher = await nsfw(normalizedPath, wrappedHandler, { + debounceMS: 200, + errorCallback: (error) => onError(instance, error), + excludedPaths + }); + await watcher.start(); + WATCHERS_BY_PATH.set(instance, watcher); + emit('watcher:reply', { id, args: instance }); + } catch (err) { + console.error('Error trying to watch path:', normalizedPath, err.message); + emit('watcher:reply', { id, error: err.message }); + } + break; + } + case 'watcher:update': { + let { normalizedPath, instance, ignored } = args; + /** @type {nsfw.NSFW} */ + let watcher = WATCHERS_BY_PATH.get(instance); + let excludedPaths = await buildExcludedPaths(normalizedPath, ignored); + await watcher.updateExcludedPaths(excludedPaths); + emit('watcher:reply', { id, args: instance }); + break; + } + case 'watcher:unwatch': { + let { instance } = args; + let watcher = WATCHERS_BY_PATH.get(instance); + if (watcher) { + await watcher.stop(); + } + emit('watcher:reply', { id, args: instance }); + break; + } + default: { + console.warn(`Unrecognized event:`, event); + } + } +} + +function run() { + // Run a no-op on an interval just to keep the task alive. + setInterval(() => {}, 10000); + process.on('message', handleMessage); + console.log('nsfw worker starting'); + emit('watcher:ready'); +} + +process.on('uncaughtException', (error) => { + // Dilemma: most of the things that can cause exceptions in this worker are + // things that prevent us from communicating the error to anything — e.g., + // ERR_IPC_CHANNEL_CLOSED. + // + // The goal here is to try to emit the exception and then fall back to + // exiting the process no matter what. But `uncaughtException` is + // unrecoverable and we shouldn't try to keep the worker going; we should + // just try to gather forensic data while we have the chance. + // + // See also: + // https://github.com/AtomLinter/linter-eslint-node/blob/main/lib/worker.js#L413-L429 + try { + console.error(error?.message ?? error); + } finally { + // eslint-disable-next-line no-process-exit + process.exit(1); + } +}); + +process.title = `Pulsar file watcher worker (NSFW) [PID: ${process.pid}]`; + +module.exports = run; diff --git a/src/parcel-watcher-worker.js b/src/parcel-watcher-worker.js new file mode 100644 index 0000000000..c4bfd084bf --- /dev/null +++ b/src/parcel-watcher-worker.js @@ -0,0 +1,187 @@ +/* global emit */ + +// A worker script for `@parcel/watcher`. Runs as a `Task` (see src/task.js). +// +// Manages any number of individual folder watchers in a single process, +// communicating over IPC. +// +// Requests to watch files (rather than directories) are handled via Node's +// builtin `fs.watch` API. + +const watcher = require("@parcel/watcher"); +const fs = require('fs'); + +const EVENT_MAP = { + update: 'updated', + delete: 'deleted', + create: 'created', + rename: 'renamed', + change: 'updated' +}; + +// A class designed to imitate the object that is returned by `@parcel/watcher` +// when it watches directories; this one is for when we watch individual files. +class FileHandle { + constructor(controller) { + this.controller = controller; + } + + // Async to match `@parcel/watcher`’s API. + async unsubscribe() { + return this.controller.abort(); + } +} + +// Reacts to events on individual files and sends batches back to the renderer +// process. +function fileHandler(instance, eventType, normalizedPath) { + let action = EVENT_MAP[eventType] ?? `unexpected (${eventType})`; + let payload = { action, path: normalizedPath }; + + console.log('Sending events:', [payload]); + + emit('watcher:events', { + id: instance, + events: [payload] + }); +} + +// Reacts to filesystem events and sends batches back to the renderer process. +function handler(instance, err, events) { + if (err) { + emit('watcher:error', { id: instance, error: err.message }); + return; + } + + let normalizedEvents = events.map(event => { + let action = EVENT_MAP[event.type] ?? `unexpected (${event.type})`; + let payload = { action }; + if (event.path) { + payload.path = event.path; + } + return payload; + }); + + console.log('Sending events:', events); + + emit('watcher:events', { + id: instance, + events: normalizedEvents + }); +} + + +// Organizes watchers by unique ID. +const WATCHERS_BY_PATH = new Map(); + +// A shim over the real `console` methods so that they send log messages back +// to the renderer process instead of making us dig into their own console. +const console = { + enabled: false, + log(...args) { + if (!this.enabled) return; + emit('console:log', ['parcel-worker', ...args]); + }, + warn(...args) { + if (!this.enabled) return; + emit('console:warn', ['parcel-worker', ...args]); + }, + error(...args) { + // Send errors whether logging is enabled or not. + emit('console:error', ['parcel-worker', ...args]); + } +}; + +// Reacts to messages sent by the renderer. +async function handleMessage(message) { + let { id, event = null, args } = JSON.parse(message); + switch (event) { + case 'watcher:watch': // fallthrough + case 'watcher:update': { + // `instance` is a unique ID for the watcher instance. We use it when we + // push filesystem events so that they can be routed back to the correct + // instance. + let { normalizedPath, instance, ignored = [] } = args; + // If this instance already exists, then the worker will call + // `watcher:update` if it wants to change the exclusions. In this worker, + // the two commands have the same effect. If there already was a watcher + // for this instance, we hold onto the existing watcher until the new one + // has started. + let existing = WATCHERS_BY_PATH.get(instance); + let wrappedHandler = (err, events) => handler(instance, err, events); + let wrappedFileHandler = (eventType, _) => fileHandler(instance, eventType, normalizedPath); + try { + let ignore = ignored.reduce((prev, ignoredName) => { + prev.push(`${ignoredName}`, `**/${ignoredName}`); + return prev; + }, []); + console.log('Generated ignore globs:', ignore); + if (fs.lstatSync(normalizedPath).isDirectory()) { + let handle = await watcher.subscribe(normalizedPath, wrappedHandler, { + ignore + }); + WATCHERS_BY_PATH.set(instance, handle); + } else { + console.log('Watching file path:', normalizedPath); + let controller = new AbortController(); + fs.watch(normalizedPath, { signal: controller.signal }, wrappedFileHandler); + WATCHERS_BY_PATH.set(instance, new FileHandle(controller)); + } + if (existing) { + // If there was a pre-existing watcher at this instance, we wait + // until the new one is up and running before stopping this one. + await existing.unsubscribe(); + } + emit('watcher:reply', { id, args: instance }); + } catch (err) { + console.error('Error trying to watch path:', normalizedPath, err.message); + emit('watcher:reply', { id, error: err.message }); + } + break; + } + case 'watcher:unwatch': { + let { instance } = args; + let handle = WATCHERS_BY_PATH.get(instance); + if (handle) { + await handle.unsubscribe(); + } + emit('watcher:reply', { id, args: instance }); + break; + } + default: { + console.warn(`Unrecognized event:`, event); + } + } +} + +function run() { + // Run a no-op on an interval just to keep the task alive. + setInterval(() => {}, 10000); + console.log('@parcel/watcher worker starting'); + process.on('message', handleMessage); + emit('watcher:ready'); +} + +process.on('uncaughtException', (error) => { + // Dilemma: most of the things that can cause exceptions in this worker are + // things that prevent us from communicating the error to anything — e.g., + // ERR_IPC_CHANNEL_CLOSED. + // + // The goal here is to try to emit the exception and then fall back to + // exiting the process no matter what. But `uncaughtException` is + // unrecoverable and we shouldn't try to keep the worker going; we should + // just try to gather forensic data while we have the chance. + // + // See also: + // https://github.com/AtomLinter/linter-eslint-node/blob/main/lib/worker.js#L413-L429 + try { + console.error(error?.message ?? error); + } finally { + // eslint-disable-next-line no-process-exit + process.exit(1); + } +}); + +process.title = `Pulsar file watcher worker (Parcel) [PID: ${process.pid}]`; + +module.exports = run; diff --git a/src/path-watcher.js b/src/path-watcher.js index f09fcef14c..24fd235bff 100644 --- a/src/path-watcher.js +++ b/src/path-watcher.js @@ -1,9 +1,11 @@ const fs = require('fs'); const path = require('path'); +const crypto = require('crypto'); const { Emitter, Disposable, CompositeDisposable } = require('event-kit'); const nsfw = require('nsfw'); const { NativeWatcherRegistry } = require('./native-watcher-registry'); +const Task = require('./task'); // Private: Associate native watcher action flags with descriptive String // equivalents. @@ -168,45 +170,396 @@ class NativeWatcher { } } -// Private: Implement a native watcher by translating events from an NSFW -// watcher. +// A file-watcher implementation that uses `@parcel/watcher`. +// +// We briefly experimented with importing it directly into the renderer +// process, but it caused crashes on window reload for reasons that haven't +// been fully tracked down. That's fine, though; we can run it in its own +// long-running task, much like VS Code does. +class ParcelWatcherNativeWatcher extends NativeWatcher { + static task = new Task(require.resolve('./parcel-watcher-worker.js')); + + // Whether the task has been started. + static started = false; + + // Whether the task has had its listeners attached. + static initialized = false; + + // Job IDs for request/response cycles. + static PROMISE_META = new Map(); + + // All instances of this watcher organized by unique ID. + static INSTANCES = new Map(); + + static register(instance) { + this.initialize(); + this.INSTANCES.set(instance.id, instance); + } + + static unregister(instance) { + this.INSTANCES.delete(instance.id); + if (this.INSTANCES.size === 0) { + this.task.terminate(); + this.started = false; + this.initialized = false; + // Once a task is terminated, it cannot be started again. We have to + // replace it with a new instance. + this.task = new Task(require.resolve('./parcel-watcher-worker.js')); + this.PROMISE_META.clear(); + this.initialize(); + } + } + + static initialize() { + if (this.initialized) return; + + // Emitted when the worker responds to a method call. + this.task.on('watcher:reply', ({ id, args, error }) => { + let meta = this.PROMISE_META.get(id); + if (!meta) return; + if (error) { + meta.reject(new Error(error)); + } else { + meta.resolve(args); + } + this.PROMISE_META.delete(id); + }); + + // Emitted when the worker pushes events. + this.task.on('watcher:events', ({ id, events }) => { + let instance = this.INSTANCES.get(id); + instance?.onEvents(events); + }); + + // Emitted when the worker pushes a watcher error. + this.task.on('watcher:error', ({ id, error }) => { + let instance = this.INSTANCES.get(id); + instance?.onError(new Error(error)); + }); + + // Emitted when the worker is created and ready to receive method calls. + this.task.on('watcher:ready', () => { + this.PROMISE_META.get('self:start')?.resolve?.(); + }); + + // Logging from the worker. + this.task.on('console:log', (args) => { + console.log(...args); + }); + + this.task.on('console:warn', (args) => { + console.warn(...args); + }); + + this.task.on('console:error', (args) => { + console.error(...args); + }); + + this.initialized = true; + } + + static async startTask() { + // This is an unusual one-off task, so we'll use a special key for its + // promise metadata. + let meta = this.PROMISE_META.get('self:start'); + if (!meta) { + meta = {}; + let promise = new Promise((resolve, reject) => { + meta.resolve = resolve; + meta.reject = reject; + this.task.start(); + }); + meta.promise = promise; + this.PROMISE_META.set('self:start', meta); + } + await meta.promise; + this.started = true; + } + + static async sendEvent(event, args) { + let id = this.getID(); + let bundle = { id, event, args }; + let meta = {}; + let promise = new Promise((resolve, reject) => { + meta.resolve = resolve; + meta.reject = reject; + }); + meta.promise = promise; + this.PROMISE_META.set(id, meta); + this.task.send(JSON.stringify(bundle)); + return await promise; + } + + // Both instances and jobs have randomly-generated IDs. We use the job IDs + // for standard request/response cycles initiated by the renderer. We use + // the instance IDs for worker-initiated pushes that are routed directly + // to the corresponding watcher instance. + static getID() { + let id; + // Generate an ID that does not collide with any other IDs we're currently + // using. + do { + id = crypto.randomBytes(5).toString('hex'); + } while (this.INSTANCES.has(id) || this.PROMISE_META.has(id)); + return id; + } + + dispose() { + super.dispose(); + this.constructor.unregister(this); + } + + constructor(...args) { + super(...args); + this.id = this.constructor.getID(); + + this.subs.add( + atom.config.observe('core.ignoredNames', (newValue) => { + this.setIgnoredNames(newValue); + }) + ); + } + + async send(event, args) { + await this.constructor.sendEvent(event, args); + } + + setIgnoredNames(ignoredNames) { + this.ignoredNames = ignoredNames; + if (this.state === WATCHER_STATE.RUNNING) { + // This watcher can't update ignores after starting, but that's its own + // implementation detail to work out. It will respond to this command by + // creating a new watcher, waiting for it to start, then stopping the old + // one. + this.send('watcher:update', { + normalizedPath: this.normalizedPath, + instance: this.id, + ignored: this.ignoredNames + }); + } + } + + async doStart() { + // “Registration” would ordinarily happen earlier in the lifecycle of this + // instance. But (a) the purpose of it is to make the constructor know + // about our ID so it can funnel events to us, which isn't necessary until + // the watcher action starts; (b) if we register just before starting a + // watcher and unregister just after ending a watcher, we get to use it as + // a sort of reference-counting. That helps us know when the task itself + // can be killed. + this.constructor.register(this); + if (!ParcelWatcherNativeWatcher.started) { + await ParcelWatcherNativeWatcher.startTask(); + } + + return await this.send('watcher:watch', { + normalizedPath: this.normalizedPath, + instance: this.id, + ignored: this.ignoredNames + }); + } + + async doStop() { + let result = await this.send('watcher:unwatch', { + normalizedPath: this.normalizedPath, + instance: this.id + }); + this.constructor.unregister(this); + return result; + } +} + +// A file-watcher implementation that uses `nsfw`. Runs in a separate process. class NSFWNativeWatcher extends NativeWatcher { - async doStart(_rootPath, _eventCallback, _errorCallback) { - const handler = events => { - this.onEvents( - events.map(event => { - const action = - ACTION_MAP.get(event.action) || `unexpected (${event.action})`; - const payload = { action }; - - if (event.file) { - payload.path = path.join(event.directory, event.file); - } else { - payload.oldPath = path.join( - event.directory, - typeof event.oldFile === 'undefined' ? '' : event.oldFile - ); - payload.path = path.join( - event.directory, - typeof event.newFile === 'undefined' ? '' : event.newFile - ); - } - - return payload; - }) - ); - }; + static task = new Task(require.resolve('./nsfw-watcher-worker.js')); + + // Whether the task has been started. + static started = false; + + // Whether the task has had its listeners attached. + static initialized = false; + + // Job IDs for request/response cycles. + static PROMISE_META = new Map(); + + // All instances of this watcher organized by unique ID. + static INSTANCES = new Map(); + + static register(instance) { + this.initialize(); + this.INSTANCES.set(instance.id, instance); + } + + static unregister(instance) { + this.INSTANCES.delete(instance.id); + if (this.INSTANCES.size === 0) { + this.task.terminate(); + this.started = false; + this.initialized = false; + // Once a task is terminated, it cannot be started again. We have to + // replace it with a new instance. + this.task = new Task(require.resolve('./nsfw-watcher-worker.js')); + this.PROMISE_META.clear(); + this.initialize(); + } + } + + static initialize() { + if (this.initialized) return; - this.watcher = await nsfw(this.normalizedPath, handler, { - debounceMS: 100, - errorCallback: this.onError + // Emitted when the worker responds to a method call. + this.task.on('watcher:reply', ({ id, args, error }) => { + let meta = this.PROMISE_META.get(id); + if (!meta) return; + if (error) { + meta.reject(new Error(error)); + } else { + meta.resolve(args); + } + this.PROMISE_META.delete(id); + }); + + // Emitted when the worker pushes events. + this.task.on('watcher:events', ({ id, events }) => { + let instance = this.INSTANCES.get(id); + instance?.onEvents(events); + }); + + // Emitted when the worker pushes a watcher error. + this.task.on('watcher:error', ({ id, error }) => { + let instance = this.INSTANCES.get(id); + instance?.onError(new Error(error)); + }); + + // Emitted when the worker is created and ready to receive method calls. + this.task.on('watcher:ready', () => { + this.PROMISE_META.get('self:start')?.resolve?.(); + }); + + // Logging from the worker. + this.task.on('console:log', (args) => { + console.log(...args); }); - await this.watcher.start(); + this.task.on('console:warn', (args) => { + console.warn(...args); + }); + + this.task.on('console:error', (args) => { + console.error(...args); + }); + + this.initialized = true; } - doStop() { - return this.watcher.stop(); + static async startTask() { + // This is an unusual one-off task, so we'll use a special key for its + // promise metadata. + let meta = this.PROMISE_META.get('self:start'); + if (!meta) { + meta = {}; + let promise = new Promise((resolve, reject) => { + meta.resolve = resolve; + meta.reject = reject; + this.task.start(); + }); + meta.promise = promise; + this.PROMISE_META.set('self:start', meta); + } + await meta.promise; + this.started = true; + } + + static async sendEvent(event, args) { + let id = this.getID(); + let bundle = { id, event, args }; + let meta = {}; + let promise = new Promise((resolve, reject) => { + meta.resolve = resolve; + meta.reject = reject; + }); + meta.promise = promise; + this.PROMISE_META.set(id, meta); + this.task.send(JSON.stringify(bundle)); + let result = await promise; + return result; + } + + // Both instances and jobs have randomly-generated IDs. We use the job IDs + // for standard request/response cycles initiated by the renderer. We use + // the instance IDs for worker-initiated pushes that are routed directly + // to the corresponding watcher instance. + static getID() { + let id; + // Generate an ID that does not collide with any other IDs we're currently + // using. + do { + id = crypto.randomBytes(5).toString('hex'); + } while (this.INSTANCES.has(id) || this.PROMISE_META.has(id)); + return id; + } + + dispose() { + super.dispose(); + this.constructor.unregister(this); + } + + constructor(...args) { + super(...args); + this.id = this.constructor.getID(); + + this.subs.add( + atom.config.observe('core.ignoredNames', (value) => { + this.setIgnoredNames(value); + }) + ); + } + + setIgnoredNames(ignoredNames) { + this.ignoredNames = ignoredNames; + if (this.state === WATCHER_STATE.RUNNING) { + // We can update this watcher without restarting it. We send a special + // event name that will look up the watcher, generate new ignores, and + // update the excluded paths list. + this.send('watcher:update', { + normalizedPath: this.normalizedPath, + instance: this.id, + ignored: this.ignoredNames + }); + } + } + + async send(event, args) { + await this.constructor.sendEvent(event, args); + } + + async doStart() { + // “Registration” would ordinarily happen earlier in the lifecycle of this + // instance. But (a) the purpose of it is to make the constructor know + // about our ID so it can funnel events to us, which isn't necessary until + // the watcher action starts; (b) if we register just before starting a + // watcher and unregister just after ending a watcher, we get to use it as + // a sort of reference-counting. That helps us know when the task itself + // can be killed. + this.constructor.register(this); + if (!NSFWNativeWatcher.started) { + await NSFWNativeWatcher.startTask(); + } + + return await this.send('watcher:watch', { + normalizedPath: this.normalizedPath, + instance: this.id, + ignored: this.ignoredNames + }); + } + + async doStop() { + let result = await this.send('watcher:unwatch', { + normalizedPath: this.normalizedPath, + instance: this.id + }); + this.constructor.unregister(this); + return result; } } @@ -260,6 +613,15 @@ class NSFWNativeWatcher extends NativeWatcher { // * `oldPath` (for `renamed` events only), a {String} containing the // filesystem entry's former absolute path. class PathWatcher { + + static DEFAULT_OPTIONS = { + // Whether to normalize filesystem paths to take symlinks into account. The + // default, `true`, means that real paths will always be reported; a value + // of `false` means that the appropriate path for the watcher will be + // reported, even if this means converting a real path to a symlinked path. + realPaths: true + } + // Private: Instantiate a new PathWatcher. Call {watchPath} instead. // // * `nativeWatcherRegistry` {NativeWatcherRegistry} used to find and @@ -268,14 +630,18 @@ class PathWatcher { // watched filesystem tree. // * `options` See {watchPath} for options. // - constructor(nativeWatcherRegistry, watchedPath, _options) { + constructor(nativeWatcherRegistry, watchedPath, options) { this.watchedPath = watchedPath; this.nativeWatcherRegistry = nativeWatcherRegistry; + this.options = { ...PathWatcher.DEFAULT_OPTIONS, ...options }; this.normalizedPath = null; this.native = null; this.changeCallbacks = new Map(); + // Whether the entire `AtomEnvironment` is destroying. + this.isDestroying = false; + this.attachedPromise = new Promise(resolve => { this.resolveAttachedPromise = resolve; }); @@ -417,8 +783,14 @@ class PathWatcher { this.subs.add( native.onShouldDetach(({ replacement, watchedPath }) => { - // Don't re-attach if the entire environment is disposing. - if (atom.isDestroying) return; + // Ordinarily, when a single native watcher detaches, it might prompt + // the _creation_ of new watchers, since there might've been some paths + // that piggy-backed onto an existing watcher. + // + // But if the native watcher is detaching because the entire + // environment is destroying, then we absolutely should not attach a + // replacement watcher. + if (this.isDestroying) return; if ( this.native === native && replacement !== native && @@ -438,9 +810,46 @@ class PathWatcher { }) ); + this.subs.add( + atom.onWillDestroy(() => { + this.isDestroying = true; + // TODO: Be proactive about stopping file watchers? Or just set the + // flag so that they aren't recreated during teardown? + }) + ); + this.resolveAttachedPromise(); } + // Private: Given a "real" filesystem path, adjusts it (if necesssary) to + // match the path that the user subscribed to. + // + // This saves the user from having to make their own calls to `fs.realpath` + // on their end just to do path equality checks. + denormalizePath(filePath) { + if (this.options.realPaths) return filePath; + if (this.watchedPath === this.normalizedPath) return filePath; + if (!filePath.startsWith(this.normalizedPath)) return filePath; + let rest = filePath.substring(this.normalizedPath.length); + return path.join(this.watchedPath, rest); + } + + // Private: Given an event that happened at a "real" filesystem path, adjusts + // it (if necessary) to match the path that the user subscribed to. + // + // This saves the user from having to make their own calls to `fs.realpath` + // on their end just to do path equality checks. + denormalizeEvent(event) { + if (this.options.realPaths) return event; + if (this.watchedPath === this.normalizedPath) return event; + let result = { ...event }; + result.path = this.denormalizePath(event.path); + if (event.oldPath) { + result.oldPath = this.denormalizePath(event.oldPath); + } + return result; + } + // Private: Invoked when the attached native watcher creates a batch of // native filesystem events. The native watcher's events may include events // for paths above this watcher's root path, so filter them to only include @@ -450,6 +859,12 @@ class PathWatcher { eventPath.startsWith(this.normalizedPath); const filtered = []; + let index = {}; + for (let event of events) { + index[event.action] ??= []; + index[event.action].push(event); + } + for (let i = 0; i < events.length; i++) { const event = events[i]; @@ -460,21 +875,22 @@ class PathWatcher { if (srcWatched && destWatched) { filtered.push(event); } else if (srcWatched && !destWatched) { - filtered.push({ + filtered.push(this.denormalizeEvent({ action: 'deleted', kind: event.kind, path: event.oldPath - }); + })); } else if (!srcWatched && destWatched) { - filtered.push({ + filtered.push(this.denormalizeEvent({ action: 'created', kind: event.kind, - path: event.path - }); + path: this.denormalizePath(event.path) + })); } } else { if (isWatchedPath(event.path)) { - filtered.push(event); + let denormalizedEvent = this.denormalizeEvent(event); + filtered.push(denormalizedEvent); } } } @@ -555,36 +971,44 @@ class PathWatcherManager { // Private: Initialize global {PathWatcher} state. constructor(setting) { + PathWatcherManager.transitionPromise ??= Promise.resolve(); this.setting = setting; this.live = new Map(); - this.nativeRegistry = new NativeWatcherRegistry(normalizedPath => { - const nativeWatcher = new NSFWNativeWatcher(normalizedPath); + const initLocal = (NativeConstructor) => { + this.nativeRegistry = new NativeWatcherRegistry(normalizedPath => { + const nativeWatcher = new NativeConstructor(normalizedPath); + this.live.set(normalizedPath, nativeWatcher); + const sub = nativeWatcher.onWillStop(() => { + this.live.delete(normalizedPath); + sub.dispose(); + }); - this.live.set(normalizedPath, nativeWatcher); - const sub = nativeWatcher.onWillStop(() => { - this.live.delete(normalizedPath); - sub.dispose(); + return nativeWatcher; }); + } - return nativeWatcher; - }); + // Look up the proper watcher implementation based on the current value of + // the `core.fileSystemWatcher` setting. + let WatcherClass = WATCHERS_BY_VALUE[setting] ?? WATCHERS_BY_VALUE['default']; + initLocal(WatcherClass); this.isShuttingDown = false; } // Private: Create a {PathWatcher} tied to this global state. See {watchPath} // for detailed arguments. - async createWatcher(rootPath, eventCallback) { + async createWatcher(rootPath, eventCallback, options) { if (this.isShuttingDown) { await this.constructor.transitionPromise; return PathWatcherManager.active().createWatcher( rootPath, - eventCallback + eventCallback, + options ); } - const w = new PathWatcher(this.nativeRegistry, rootPath); + const w = new PathWatcher(this.nativeRegistry, rootPath, options); w.onDidChange(eventCallback); await w.getStartPromise(); return w; @@ -608,13 +1032,17 @@ class PathWatcherManager { // specified path. If you only need to watch events within the project's root // paths, use {Project::onDidChangeFiles} instead. // -// watchPath handles the efficient re-use of operating system resources across -// living watchers. Watching the same path more than once, or the child of a -// watched path, will re-use the existing native watcher. +// `watchPath` handles the efficient re-use of operating system resources +// across living watchers. Watching the same path more than once, or the child +// of a watched path, will re-use the existing native watcher. // // * `rootPath` {String} specifies the absolute path to the root of the // filesystem content to watch. -// * `options` Control the watcher's behavior. +// * `options` Control the watcher's behavior: +// * `realPaths` {Boolean} Whether to report real paths on disk for +// filesystem events. Default is `true`; a value of `false` will instead +// return paths on disk that will always descend from the given path, even +// if the real path of the file is different due to symlinks. // * `eventCallback` {Function} or other callable to be called each time a // batch of filesystem events is observed. // * `events` {Array} of objects that describe the events that have @@ -630,6 +1058,25 @@ class PathWatcherManager { // started. Note that every {PathWatcher} is a {Disposable}, so they can be // managed by a {CompositeDisposable} if desired. // +// The specific library used for file watching may vary over time and may be +// configurable via the `core.fileSystemWatcher` setting. Some implementations +// may work better than others on certain platforms, but all will abide by the +// same contract and should behave in similar fashion to one another. +// +// __Important note:__ `watchPath` will _always_ respect the patterns specified +// by the `core.ignoredNames` setting and will pass those exclusions to the +// underlying native file watcher implementation. This helps reduce the cost of +// recursive file-watching on certain platforms. +// +// Files that match `core.ignoredNames` may still trigger change handlers, but +// _directories_ that match `core.ignoredNames` will be excluded from recursive +// watchers. No filesystem activity that occurs within an excluded directory +// will ever trigger a change handler for `watchPath`. +// +// If you have a legitimate need to watch a path that will or could be listed +// in `core.ignoredNames`, you must instead use a non-recursive watcher on that +// path via {Directory::onDidChange}. +// // ```js // const {watchPath} = require('atom') // @@ -655,7 +1102,8 @@ class PathWatcherManager { function watchPath(rootPath, options, eventCallback) { return PathWatcherManager.active().createWatcher( rootPath, - eventCallback + eventCallback, + options ); } @@ -667,9 +1115,34 @@ function stopAllWatchers() { } // Private: Show the currently active native watchers in a formatted {String}. -watchPath.printWatchers = function () { +watchPath.printWatchers = function printWatchers() { return PathWatcherManager.active().print(); }; +// Private: Wait for new watchers to be created after a change to +// `core.fileSystemWatcher`. This is useful to have in the specs. +watchPath.waitForTransition = async function waitForTransition() { + await PathWatcherManager.transitionPromise; +}; + +// Private: Stop all watchers and reset `PathWatcherManager` to its initial +// state. +watchPath.reset = function reset() { + return PathWatcherManager.active().stopAllWatchers().then(() => { + PathWatcherManager.activeManager = null; + }); +} + +// Which implementation to use for each possible value of +// `core.fileSystemWatcher`. +// +// The 'default' value — which is, uh, the default — allows us to switch the +// default at a later date without affecting users that have opted into a +// specific watcher. +const WATCHERS_BY_VALUE = { + 'default': NSFWNativeWatcher, + 'nsfw': NSFWNativeWatcher, + 'parcel': ParcelWatcherNativeWatcher +}; module.exports = { watchPath, stopAllWatchers };