diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md index 5b02213f22783..5c3c3caacd0ae 100644 --- a/DEPENDENCIES.md +++ b/DEPENDENCIES.md @@ -349,9 +349,11 @@ graph LR; libnpmexec-->npmcli-template-oss["@npmcli/template-oss"]; libnpmexec-->pacote; libnpmexec-->proc-log; + libnpmexec-->promise-retry; libnpmexec-->read-package-json-fast; libnpmexec-->read; libnpmexec-->semver; + libnpmexec-->signal-exit; libnpmexec-->tap; libnpmexec-->walk-up-path; libnpmfund-->npmcli-arborist["@npmcli/arborist"]; diff --git a/package-lock.json b/package-lock.json index 47eb40016b90e..ac6ad556dd6db 100644 --- a/package-lock.json +++ b/package-lock.json @@ -18950,9 +18950,11 @@ "npm-package-arg": "^12.0.0", "pacote": "^21.0.0", "proc-log": "^5.0.0", + "promise-retry": "^2.0.1", "read": "^4.0.0", "read-package-json-fast": "^4.0.0", "semver": "^7.3.7", + "signal-exit": "^4.1.0", "walk-up-path": "^4.0.0" }, "devDependencies": { diff --git a/workspaces/libnpmexec/lib/index.js b/workspaces/libnpmexec/lib/index.js index 1dcc0c9453a44..7b4c85a7510a1 100644 --- a/workspaces/libnpmexec/lib/index.js +++ b/workspaces/libnpmexec/lib/index.js @@ -1,6 +1,6 @@ 'use strict' -const { dirname, resolve } = require('node:path') +const { dirname, join, resolve } = require('node:path') const crypto = require('node:crypto') const { mkdir } = require('node:fs/promises') const Arborist = require('@npmcli/arborist') @@ -16,6 +16,7 @@ const getBinFromManifest = require('./get-bin-from-manifest.js') const noTTY = require('./no-tty.js') const runScript = require('./run-script.js') const isWindows = require('./is-windows.js') +const withLock = require('./with-lock.js') const binPaths = [] @@ -247,7 +248,8 @@ const exec = async (opts) => { ...flatOptions, path: installDir, }) - const npxTree = await npxArb.loadActual() + const lockPath = join(installDir, 'concurrency.lock') + const npxTree = await withLock(lockPath, () => npxArb.loadActual()) await Promise.all(needInstall.map(async ({ spec }) => { const { manifest } = await missingFromTree({ spec, @@ -290,11 +292,11 @@ const exec = async (opts) => { } } } - await npxArb.reify({ + await withLock(lockPath, () => npxArb.reify({ ...flatOptions, save: true, add, - }) + })) } binPaths.push(resolve(installDir, 'node_modules/.bin')) const pkgJson = await PackageJson.load(installDir) diff --git a/workspaces/libnpmexec/lib/with-lock.js b/workspaces/libnpmexec/lib/with-lock.js new file mode 100644 index 0000000000000..bc8b6b6529789 --- /dev/null +++ b/workspaces/libnpmexec/lib/with-lock.js @@ -0,0 +1,164 @@ +const fs = require('node:fs/promises') +const { rmdirSync } = require('node:fs') +const promiseRetry = require('promise-retry') +const { onExit } = require('signal-exit') + +// a lockfile implementation inspired by the unmaintained proper-lockfile library +// +// similarities: +// - based on mkdir's atomicity +// - works across processes and even machines (via NFS) +// - cleans up after itself +// - detects compromised locks +// +// differences: +// - higher-level API (just a withLock function) +// - written in async/await style +// - uses mtime + inode for more reliable compromised lock detection +// - more ergonomic compromised lock handling (i.e. withLock will reject, and callbacks have access to an AbortSignal) +// - uses a more recent version of signal-exit + +const touchInterval = 100 +// mtime precision is platform dependent, so use a reasonably large threshold +const staleThreshold = 5_000 + +// track current locks and their cleanup functions +const currentLocks = new Map() + +function cleanupLocks () { + for (const [, cleanup] of currentLocks) { + try { + cleanup() + } catch (err) { + // + } + } +} + +// clean up any locks that were not released normally +onExit(cleanupLocks) + +/** + * Acquire an advisory lock for the given path and hold it for the duration of the callback. + * + * The lock will be released automatically when the callback resolves or rejects. + * Concurrent calls to withLock() for the same path will wait until the lock is released. + */ +async function withLock (lockPath, cb) { + try { + const signal = await acquireLock(lockPath) + return await new Promise((resolve, reject) => { + signal.addEventListener('abort', () => { + reject(Object.assign(new Error('Lock compromised'), { code: 'ECOMPROMISED' })) + }); + + (async () => { + try { + resolve(await cb(signal)) + } catch (err) { + reject(err) + } + })() + }) + } finally { + releaseLock(lockPath) + } +} + +function acquireLock (lockPath) { + return promiseRetry({ + minTimeout: 100, + maxTimeout: 5_000, + // if another process legitimately holds the lock, wait for it to release; if it dies abnormally and the lock becomes stale, we'll acquire it automatically + forever: true, + }, async (retry) => { + try { + await fs.mkdir(lockPath) + } catch (err) { + if (err.code !== 'EEXIST') { + throw err + } + + const status = await getLockStatus(lockPath) + + if (status === 'locked') { + // let's see if we can acquire it on the next attempt 🤞 + return retry(err) + } + if (status === 'stale') { + // there is a very tiny window where another process could also release the stale lock and acquire it before we release it here; the lock compromise checker should detect this and throw an error + deleteLock(lockPath, ['ENOENT', 'EBUSY']) // on windows, EBUSY can happen if another process is creating the lock; we'll just retry + } + return await acquireLock(lockPath) + } + try { + const signal = await maintainLock(lockPath) + return signal + } catch (err) { + throw Object.assign(new Error('Lock compromised'), { code: 'ECOMPROMISED' }) + } + }) +} + +function deleteLock (lockPath, ignoreCodes = ['ENOENT']) { + try { + // synchronous, so we can call in an exit handler + rmdirSync(lockPath) + } catch (err) { + if (!ignoreCodes.includes(err.code)) { + throw err + } + } +} + +function releaseLock (lockPath) { + currentLocks.get(lockPath)?.() + currentLocks.delete(lockPath) +} + +async function getLockStatus (lockPath) { + try { + const stat = await fs.stat(lockPath) + return (Date.now() - stat.mtimeMs > staleThreshold) ? 'stale' : 'locked' + } catch (err) { + if (err.code === 'ENOENT') { + return 'unlocked' + } + throw err + } +} + +async function maintainLock (lockPath) { + const controller = new AbortController() + const stats = await fs.stat(lockPath) + let mtimeMs = stats.mtimeMs + const signal = controller.signal + + async function touchLock () { + try { + const currentStats = (await fs.stat(lockPath)) + if (currentStats.ino !== stats.ino || currentStats.mtimeMs !== mtimeMs) { + throw new Error('Lock compromised') + } + mtimeMs = Date.now() + const mtime = new Date(mtimeMs) + await fs.utimes(lockPath, mtime, mtime) + } catch (err) { + // stats mismatch or other fs error means the lock was compromised, unless we just released the lock during this iteration + if (currentLocks.has(lockPath)) { + controller.abort() + } + } + } + + const timeout = setInterval(touchLock, touchInterval) + timeout.unref() + function cleanup () { + deleteLock(lockPath) + clearInterval(timeout) + } + currentLocks.set(lockPath, cleanup) + return signal +} + +module.exports = withLock diff --git a/workspaces/libnpmexec/package.json b/workspaces/libnpmexec/package.json index 91fb9eb8e9e3a..706f6db5d4794 100644 --- a/workspaces/libnpmexec/package.json +++ b/workspaces/libnpmexec/package.json @@ -67,9 +67,11 @@ "npm-package-arg": "^12.0.0", "pacote": "^21.0.0", "proc-log": "^5.0.0", + "promise-retry": "^2.0.1", "read": "^4.0.0", "read-package-json-fast": "^4.0.0", "semver": "^7.3.7", + "signal-exit": "^4.1.0", "walk-up-path": "^4.0.0" }, "templateOSS": { diff --git a/workspaces/libnpmexec/test/with-lock.js b/workspaces/libnpmexec/test/with-lock.js new file mode 100644 index 0000000000000..348f9753fd125 --- /dev/null +++ b/workspaces/libnpmexec/test/with-lock.js @@ -0,0 +1,248 @@ +const fs = require('node:fs') +const path = require('node:path') +const os = require('node:os') +const setTimeout = require('node:timers/promises').setTimeout + +const getTempDir = () => fs.realpathSync(os.tmpdir()) + +const t = require('tap') + +let mockMkdir +let mockStat +let mockUtimes +let mockRmdirSync +let onExitHandler +const withLock = t.mock('../lib/with-lock.js', { + // make various fs things mockable, but default to the real implementation + 'node:fs/promises': { + mkdir: async (...args) => { + return await (mockMkdir?.(...args) ?? fs.promises.mkdir(...args)) + }, + stat: async (...args) => { + return await (mockStat?.(...args) ?? fs.promises.stat(...args)) + }, + utimes: async (...args) => { + return await (mockUtimes?.(...args) ?? fs.promises.utimes(...args)) + }, + }, + 'node:fs': { + rmdirSync: (...args) => { + return (mockRmdirSync?.(...args) ?? fs.rmdirSync(...args)) + }, + }, + 'signal-exit': { + onExit: (handler) => { + onExitHandler = handler + }, + }, +}) + +t.beforeEach(() => { + mockMkdir = undefined + mockStat = undefined + mockUtimes = undefined + mockRmdirSync = undefined +}) + +t.test('concurrent locking', async (t) => { + const lockPath = path.join(fs.mkdtempSync(path.join(getTempDir(), 'test-')), 'concurrency.lock') + const events = [] + const lockPromise1 = withLock(lockPath, async () => { + events.push('lock1 acquired') + await setTimeout(100) + events.push('lock1 released') + }) + await setTimeout(50) // ensure lock1 is acquired before lock2 + const lockPromise2 = withLock(lockPath, async () => { + events.push('lock2 acquired') + await setTimeout(100) + events.push('lock2 released') + return 'lock2' + }) + await Promise.all([lockPromise1, lockPromise2]) + t.same(events, [ + 'lock1 acquired', + 'lock1 released', + 'lock2 acquired', + 'lock2 released', + ], 'should acquire locks in order and release them correctly') +}) + +t.test('unrelated locks', async (t) => { + const lockPath1 = path.join(fs.mkdtempSync(path.join(getTempDir(), 'test-1-')), 'concurrency.lock') + const lockPath2 = path.join(fs.mkdtempSync(path.join(getTempDir(), 'test-2-')), 'concurrency.lock') + const lockPromise1 = withLock(lockPath1, async () => { + await setTimeout(100) + return 'lock1' + }) + const lockPromise2 = withLock(lockPath2, async () => 'lock2') + t.equal(await lockPromise2, 'lock2', 'lock2 should not be blocked by lock1') + t.equal(await lockPromise1, 'lock1', 'lock1 should complete after lock2') +}) + +t.test('resolved value', async (t) => { + const lockPath = path.join(fs.mkdtempSync(path.join(getTempDir(), 'test-')), 'concurrency.lock') + const result = await withLock(lockPath, async () => 'test value') + t.equal(result, 'test value', 'should resolve to the same value as the callback') +}) + +t.test('rejection', async (t) => { + const lockPath = path.join(fs.mkdtempSync(path.join(getTempDir(), 'test-')), 'concurrency.lock') + await t.rejects(withLock(lockPath, async () => { + throw new Error('test error') + }), new Error('test error')) + t.equal(await withLock(lockPath, async () => 'test'), 'test', 'should allow subsequent locks after rejection') +}) + +t.test('stale lock takeover', async (t) => { + let mkdirCalls = 0 + mockMkdir = async () => { + if (++mkdirCalls === 1) { + throw Object.assign(new Error(), { code: 'EEXIST' }) + } + } + let statCalls = 0 + const mtimeMs = Date.now() + mockStat = async () => { + if (++statCalls === 1) { + return { mtimeMs: mtimeMs - 10_000 } + } else { + return { mtimeMs, ino: 1 } + } + } + mockUtimes = async () => {} + mockRmdirSync = () => {} + + const lockPath = path.join(fs.mkdtempSync(path.join(getTempDir(), 'test-')), 'concurrency.lock') + const lockPromise = withLock(lockPath, async () => { + await setTimeout(100) + return 'test value' + }) + t.equal(await lockPromise, 'test value', 'should take over the lock') + t.equal(mkdirCalls, 2, 'should make two mkdir calls') +}) + +t.test('concurrent stale lock takeover', async (t) => { + const lockPath = path.join(fs.mkdtempSync(path.join(getTempDir(), 'test-')), 'concurrency.lock') + // make a stale lock + await fs.promises.mkdir(lockPath) + await fs.promises.utimes(lockPath, new Date(Date.now() - 10_000), new Date(Date.now() - 10_000)) + + const results = await Promise.allSettled([ + withLock(lockPath, () => 'lock1'), + withLock(lockPath, () => 'lock2'), + withLock(lockPath, () => 'lock3'), + ]) + // all locks should either be successfully acquired or get compromised (expected occasional race condition) + t.ok(results.every(result => result.status === 'fulfilled' || result.status === 'rejected' && result.reason.code === 'ECOMPROMISED')) +}) + +t.test('mkdir -> getLockStatus race', async (t) => { + // validate that we can acquire a lock when mkdir fails (due to the lock existing) + // but status indicates it's unlocked (i.e. lock was released after the mkdir call) + let mkdirCalls = 0 + mockMkdir = async () => { + if (++mkdirCalls === 1) { + throw Object.assign(new Error(), { code: 'EEXIST' }) + } + } + let statCalls = 0 + const mtimeMs = Date.now() + mockStat = async () => { + if (++statCalls === 1) { + throw Object.assign(new Error(), { code: 'ENOENT' }) + } else { + return { mtimeMs, ino: 1 } + } + } + mockUtimes = async () => {} + mockRmdirSync = () => {} + + const lockPath = path.join(fs.mkdtempSync(path.join(getTempDir(), 'test-')), 'concurrency.lock') + const lockPromise = withLock(lockPath, async () => { + await setTimeout(100) + return 'test value' + }) + t.equal(await lockPromise, 'test value', 'should acquire the lock') + t.equal(mkdirCalls, 2, 'should make two mkdir calls') +}) + +t.test('unexpected errors', async (t) => { + t.test('can\'t create lock', async (t) => { + const lockPath = '/these/parent/directories/do/not/exist/so/it/should/fail.lock' + await t.rejects(withLock(lockPath, async () => {}), { code: 'ENOENT' }) + }) + + t.test('can\'t release lock', async (t) => { + mockRmdirSync = () => { + throw Object.assign(new Error(), { code: 'ENOTDIR' }) + } + const lockPath = path.join(fs.mkdtempSync(path.join(getTempDir(), 'test-')), 'concurrency.lock') + await t.rejects(withLock(lockPath, async () => {}), { code: 'ENOTDIR' }) + }) + + t.test('existing lock becomes unreadable right before we check its status', async (t) => { + // someone else has the lock + mockMkdir = async () => { + throw Object.assign(new Error(), { code: 'EEXIST' }) + } + // we can't stat the lock file + mockStat = async () => { + throw Object.assign(new Error(), { code: 'EACCES' }) + } + const lockPath = path.join(fs.mkdtempSync(path.join(getTempDir(), 'test-')), 'concurrency.lock') + await t.rejects(withLock(lockPath, async () => {}), { code: 'EACCES' }) + }) + + t.test('can\'t take over stale lock', async (t) => { + // someone else has the lock + mockMkdir = async () => { + throw Object.assign(new Error(), { code: 'EEXIST' }) + } + // it's stale + mockStat = async () => { + return { mtimeMs: Date.now() - 10_000 } + } + // but we can't release it + mockRmdirSync = () => { + throw Object.assign(new Error(), { code: 'ENOTDIR' }) + } + const lockPath = path.join(fs.mkdtempSync(path.join(getTempDir(), 'test-')), 'concurrency.lock') + await t.rejects(withLock(lockPath, async () => {}), { code: 'ENOTDIR' }) + }) + + t.test('lock compromised (recreated)', async (t) => { + const lockPath = path.join(fs.mkdtempSync(path.join(getTempDir(), 'test-')), 'concurrency.lock') + + mockStat = async () => { + return { mtimeMs: Date.now(), ino: Math.floor(Math.random() * 1000000) } + } + await t.rejects(withLock(lockPath, () => setTimeout(1000)), { code: 'ECOMPROMISED' }) + }) + + t.test('lock compromised (deleted)', async (t) => { + const lockPath = path.join(fs.mkdtempSync(path.join(getTempDir(), 'test-')), 'concurrency.lock') + + mockStat = async () => { + throw Object.assign(new Error(), { code: 'ENOENT' }) + } + await t.rejects(withLock(lockPath, () => setTimeout(1000)), { code: 'ECOMPROMISED' }) + }) +}) + +t.test('onExit handler', async (t) => { + t.ok(onExitHandler, 'should be registered') + let rmdirSyncCalls = 0 + + mockRmdirSync = () => { + rmdirSyncCalls++ + } + + const lockPath = path.join(fs.mkdtempSync(path.join(getTempDir(), 'test-')), 'concurrency.lock') + // don't await it since the promise never resolves + withLock(lockPath, () => new Promise(() => {})).catch(() => {}) + // ensure the lock is acquired + await setTimeout(0) + onExitHandler() + t.ok(rmdirSyncCalls > 0, 'should have removed outstanding locks') +})