diff --git a/.eslintrc b/.eslintrc index f8bc2cb..5e9e3c4 100644 --- a/.eslintrc +++ b/.eslintrc @@ -91,7 +91,7 @@ }, { "selector": "variable", - "format": ["camelCase", "UPPER_CASE"], + "format": ["camelCase", "UPPER_CASE", "PascalCase"], "leadingUnderscore": "allow", "trailingUnderscore": "allowSingleOrDouble" }, diff --git a/src/Lock.ts b/src/Lock.ts index c27ef33..703bb28 100644 --- a/src/Lock.ts +++ b/src/Lock.ts @@ -1,11 +1,12 @@ import type { MutexInterface } from 'async-mutex'; import type { ResourceAcquire } from '@matrixai/resources'; +import type { Lockable } from './types'; import { Mutex, withTimeout } from 'async-mutex'; import { withF, withG } from '@matrixai/resources'; import { sleep, yieldMicro } from './utils'; import { ErrorAsyncLocksTimeout } from './errors'; -class Lock { +class Lock implements Lockable { protected _lock: Mutex = new Mutex(); protected _count: number = 0; diff --git a/src/LockBox.ts b/src/LockBox.ts new file mode 100644 index 0000000..545c321 --- /dev/null +++ b/src/LockBox.ts @@ -0,0 +1,149 @@ +import type { ResourceAcquire, ResourceRelease } from '@matrixai/resources'; +import type { Lockable, ToString } from './types'; +import { withF, withG } from '@matrixai/resources'; +import { ErrorAsyncLocksLockBoxConflict } from './errors'; + +type LockRequest = [ + key: ToString, + lockConstructor: new () => L, + ...lockingParams: Parameters +]; + +class LockBox implements Lockable { + protected _locks: Map = new Map(); + + public lock(...requests: Array>): ResourceAcquire> { + return async () => { + // Convert to strings + // This creates a copy of the requests + let requests_: Array<[string, new () => L, ...Parameters]> = + requests.map(([key, ...rest]) => + typeof key === 'string' ? [key, ...rest] : [key.toString(), ...rest], + ); + // Sort to ensure lock hierarchy + requests_.sort(([key1], [key2]) => { + // Deterministic string comparison according to 16-bit code units + if (key1 < key2) return -1; + if (key1 > key2) return 1; + return 0; + }); + // Avoid duplicate locking + requests_ = requests_.filter( + ([key], i, arr) => i === 0 || key !== arr[i - 1][0], + ); + const locks: Array<[string, ResourceRelease, L]> = []; + for (const [key, LockConstructor, ...lockingParams] of requests_) { + let lock = this._locks.get(key); + if (lock == null) { + lock = new LockConstructor(); + this._locks.set(key, lock); + } else { + // It is possible to swap the lock class, but only after the lock key is released + if (!(lock instanceof LockConstructor)) { + throw new ErrorAsyncLocksLockBoxConflict( + `Lock ${key} is already locked with class ${lock.constructor.name}, which conflicts with class ${LockConstructor.name}`, + ); + } + } + const lockAcquire = lock.lock(...lockingParams); + let lockRelease: ResourceRelease; + try { + [lockRelease] = await lockAcquire(); + } catch (e) { + // Release all intermediate locks in reverse order + locks.reverse(); + for (const [key, lockRelease, lock] of locks) { + await lockRelease(); + if (!lock.isLocked()) { + this._locks.delete(key); + } + } + throw e; + } + locks.push([key, lockRelease, lock]); + } + return [ + async () => { + // Release all locks in reverse order + locks.reverse(); + for (const [key, lockRelease, lock] of locks) { + await lockRelease(); + if (!lock.isLocked()) { + this._locks.delete(key); + } + } + }, + this, + ]; + }; + } + + get locks(): ReadonlyMap { + return this._locks; + } + + public get count(): number { + let count = 0; + for (const lock of this._locks.values()) { + count += lock.count; + } + return count; + } + + public isLocked(key?: ToString): boolean { + if (key == null) { + for (const lock of this._locks.values()) { + if (lock.isLocked()) return true; + } + return false; + } else { + const lock = this._locks.get(key.toString()); + if (lock == null) return false; + return lock.isLocked(); + } + } + + public async waitForUnlock(timeout?: number, key?: ToString): Promise { + if (key == null) { + const ps: Array> = []; + for (const lock of this._locks.values()) { + ps.push(lock.waitForUnlock(timeout)); + } + await Promise.all(ps); + } else { + const lock = this._locks.get(key.toString()); + if (lock == null) return; + await lock.waitForUnlock(timeout); + } + } + + public async withF( + ...params: [ + ...requests: Array>, + f: (lockBox: LockBox) => Promise, + ] + ): Promise { + const f = params.pop() as (lockBox: LockBox) => Promise; + return withF( + [this.lock(...(params as Array>))], + ([lockBox]) => f(lockBox), + ); + } + + public withG( + ...params: [ + ...requests: Array>, + g: (lockBox: LockBox) => AsyncGenerator, + ] + ): AsyncGenerator { + const g = params.pop() as ( + lockBox: LockBox, + ) => AsyncGenerator; + return withG( + [this.lock(...(params as Array>))], + ([lockBox]) => g(lockBox), + ); + } +} + +export default LockBox; diff --git a/src/RWLockReader.ts b/src/RWLockReader.ts index 2105a19..fd10996 100644 --- a/src/RWLockReader.ts +++ b/src/RWLockReader.ts @@ -1,5 +1,6 @@ import type { MutexInterface } from 'async-mutex'; import type { ResourceAcquire } from '@matrixai/resources'; +import type { Lockable } from './types'; import { Mutex, withTimeout } from 'async-mutex'; import { withF, withG } from '@matrixai/resources'; import { sleep, yieldMicro } from './utils'; @@ -8,20 +9,32 @@ import { ErrorAsyncLocksTimeout } from './errors'; /** * Read-preferring read write lock */ -class RWLockReader { +class RWLockReader implements Lockable { protected _readerCount: number = 0; protected _writerCount: number = 0; - protected lock: Mutex = new Mutex(); + protected _lock: Mutex = new Mutex(); protected release: MutexInterface.Releaser; + public lock( + type: 'read' | 'write', + timeout?: number, + ): ResourceAcquire { + switch (type) { + case 'read': + return this.read(timeout); + case 'write': + return this.write(timeout); + } + } + public read(timeout?: number): ResourceAcquire { return async () => { const readerCount = ++this._readerCount; // The first reader locks if (readerCount === 1) { - let lock: MutexInterface = this.lock; + let lock: MutexInterface = this._lock; if (timeout != null) { - lock = withTimeout(this.lock, timeout, new ErrorAsyncLocksTimeout()); + lock = withTimeout(this._lock, timeout, new ErrorAsyncLocksTimeout()); } try { this.release = await lock.acquire(); @@ -51,9 +64,9 @@ class RWLockReader { public write(timeout?: number): ResourceAcquire { return async () => { ++this._writerCount; - let lock: MutexInterface = this.lock; + let lock: MutexInterface = this._lock; if (timeout != null) { - lock = withTimeout(this.lock, timeout, new ErrorAsyncLocksTimeout()); + lock = withTimeout(this._lock, timeout, new ErrorAsyncLocksTimeout()); } let release: MutexInterface.Releaser; try { @@ -74,6 +87,10 @@ class RWLockReader { }; } + public get count(): number { + return this.readerCount + this.writerCount; + } + public get readerCount(): number { return this._readerCount; } @@ -83,14 +100,14 @@ class RWLockReader { } public isLocked(): boolean { - return this.lock.isLocked(); + return this._lock.isLocked(); } public async waitForUnlock(timeout?: number): Promise { if (timeout != null) { let timedOut = false; await Promise.race([ - this.lock.waitForUnlock(), + this._lock.waitForUnlock(), sleep(timeout).then(() => { timedOut = true; }), @@ -99,7 +116,7 @@ class RWLockReader { throw new ErrorAsyncLocksTimeout(); } } else { - await this.lock.waitForUnlock(); + await this._lock.waitForUnlock(); } } diff --git a/src/RWLockWriter.ts b/src/RWLockWriter.ts index 92ad7b3..7a73652 100644 --- a/src/RWLockWriter.ts +++ b/src/RWLockWriter.ts @@ -1,5 +1,6 @@ import type { MutexInterface } from 'async-mutex'; import type { ResourceAcquire } from '@matrixai/resources'; +import type { Lockable } from './types'; import { performance } from 'perf_hooks'; import { Mutex, withTimeout } from 'async-mutex'; import { withF, withG } from '@matrixai/resources'; @@ -9,7 +10,7 @@ import { ErrorAsyncLocksTimeout } from './errors'; /** * Write-preferring read write lock */ -class RWLockWriter { +class RWLockWriter implements Lockable { protected readersLock: Mutex = new Mutex(); protected writersLock: Mutex = new Mutex(); protected readersRelease: MutexInterface.Releaser; @@ -17,6 +18,18 @@ class RWLockWriter { protected _readerCount: number = 0; protected _writerCount: number = 0; + public lock( + type: 'read' | 'write', + timeout?: number, + ): ResourceAcquire { + switch (type) { + case 'read': + return this.read(timeout); + case 'write': + return this.write(timeout); + } + } + public read(timeout?: number): ResourceAcquire { return async () => { const t1 = performance.now(); @@ -126,6 +139,10 @@ class RWLockWriter { }; } + public get count(): number { + return this.readerCount + this.writerCount; + } + public get readerCount(): number { return this._readerCount + this.readerCountBlocked; } diff --git a/src/errors.ts b/src/errors.ts index ce2ed23..faa4b5c 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -5,7 +5,16 @@ class ErrorAsyncLocks extends AbstractError { } class ErrorAsyncLocksTimeout extends ErrorAsyncLocks { - static description = 'Async lock timeout'; + static description = 'Async locks timeout'; } -export { ErrorAsyncLocks, ErrorAsyncLocksTimeout }; +class ErrorAsyncLocksLockBoxConflict extends ErrorAsyncLocks { + static description = + 'LockBox cannot lock same ID with different Lockable classes'; +} + +export { + ErrorAsyncLocks, + ErrorAsyncLocksTimeout, + ErrorAsyncLocksLockBoxConflict, +}; diff --git a/src/index.ts b/src/index.ts index 160836f..3840721 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,6 +1,7 @@ export { default as Lock } from './Lock'; export { default as RWLockReader } from './RWLockReader'; export { default as RWLockWriter } from './RWLockWriter'; +export { default as LockBox } from './LockBox'; export * as utils from './utils'; export * as errors from './errors'; export * from './types'; diff --git a/src/types.ts b/src/types.ts index 7995e54..a560860 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1,6 +1,22 @@ +import type { ResourceAcquire } from '@matrixai/resources'; + /** * Plain data dictionary */ type POJO = { [key: string]: any }; -export type { POJO }; +/** + * Any type that can be turned into a string + */ +interface ToString { + toString(): string; +} + +interface Lockable { + count: number; + lock(...params: Array): ResourceAcquire; + isLocked(): boolean; + waitForUnlock(timeout?: number): Promise; +} + +export type { POJO, ToString, Lockable }; diff --git a/tests/LockBox.test.ts b/tests/LockBox.test.ts new file mode 100644 index 0000000..a09c6f6 --- /dev/null +++ b/tests/LockBox.test.ts @@ -0,0 +1,337 @@ +import { withF, withG } from '@matrixai/resources'; +import LockBox from '@/LockBox'; +import Lock from '@/Lock'; +import RWLockReader from '@/RWLockReader'; +import RWLockWriter from '@/RWLockWriter'; +import * as utils from '@/utils'; +import * as errors from '@/errors'; + +describe(LockBox.name, () => { + test('withF', async () => { + const lockBox = new LockBox(); + const p = withF([lockBox.lock(['1', Lock])], async ([lockBox]) => { + expect(lockBox.isLocked()).toBe(true); + expect(lockBox.count).toBe(1); + }); + expect(lockBox.isLocked()).toBe(true); + expect(lockBox.count).toBe(1); + await p; + expect(lockBox.isLocked()).toBe(false); + expect(lockBox.count).toBe(0); + }); + test('withG', async () => { + const lockBox = new LockBox(); + const g1 = withG( + [lockBox.lock(['1', RWLockReader, 'write'])], + async function* ([lockBox]): AsyncGenerator { + expect(lockBox.isLocked()).toBe(true); + expect(lockBox.count).toBe(1); + yield 'first'; + expect(lockBox.isLocked()).toBe(true); + expect(lockBox.count).toBe(1); + yield 'second'; + expect(lockBox.isLocked()).toBe(true); + expect(lockBox.count).toBe(1); + return 'last'; + }, + ); + expect(lockBox.isLocked()).toBe(false); + expect(lockBox.count).toBe(0); + for await (const _ of g1) { + // It should be locked during iteration + expect(lockBox.isLocked()).toBe(true); + expect(lockBox.count).toBe(1); + } + // Note that for await consumes the returned value + // But does not provide a way to retrieve it + expect(await g1.next()).toStrictEqual({ + value: undefined, + done: true, + }); + expect(lockBox.isLocked()).toBe(false); + expect(lockBox.count).toBe(0); + // To actually get the value use while loop or explicit `next()` + const g2 = withG( + [lockBox.lock(['1', RWLockReader, 'write'])], + async function* (): AsyncGenerator { + yield 'first'; + yield 'second'; + return 'last'; + }, + ); + // Unlocked before the first next + expect(lockBox.isLocked()).toBe(false); + expect(lockBox.count).toBe(0); + const firstP = g2.next(); + expect(lockBox.isLocked()).toBe(true); + expect(lockBox.count).toBe(1); + await firstP; + expect(lockBox.isLocked()).toBe(true); + expect(lockBox.count).toBe(1); + const secondP = g2.next(); + expect(lockBox.isLocked()).toBe(true); + expect(lockBox.count).toBe(1); + await secondP; + expect(lockBox.isLocked()).toBe(true); + expect(lockBox.count).toBe(1); + const lastP = g2.next(); + expect(lockBox.isLocked()).toBe(true); + expect(lockBox.count).toBe(1); + await lastP; + // Unlocked after the return + expect(lockBox.isLocked()).toBe(false); + expect(lockBox.count).toBe(0); + }); + test('lock count', async () => { + const lockBox = new LockBox(); + const p = Promise.all([ + lockBox.withF(['1', Lock], async () => undefined), + lockBox.withF(['2', RWLockReader, 'write'], async () => undefined), + lockBox.withF(['3', RWLockReader, 'read'], async () => undefined), + lockBox.withF(['4', RWLockWriter, 'write'], async () => undefined), + lockBox.withF(['5', RWLockWriter, 'read'], async () => undefined), + ]); + expect(lockBox.count).toBe(5); + await p; + }); + test('wait for unlock', async () => { + const lockBox = new LockBox(); + let value; + const p1 = withF([lockBox.lock(['1', Lock])], async () => { + value = 'p1'; + await utils.sleep(100); + }); + const p2 = lockBox.waitForUnlock(undefined, '1').then(() => { + value = 'p2'; + }); + await p1; + await p2; + expect(value).toBe('p2'); + }); + test('wait for unlock all', async () => { + const lockBox = new LockBox(); + let value; + const p1 = withF( + [lockBox.lock(['1', Lock], [2, RWLockWriter, 'write'])], + async () => { + value = 'p1'; + await utils.sleep(100); + }, + ); + const p2 = lockBox.waitForUnlock().then(() => { + value = 'p2'; + }); + await p1; + await p2; + expect(value).toBe('p2'); + }); + test('unlock when exception is thrown', async () => { + const lockBox = new LockBox(); + await expect( + lockBox.withF( + [1, Lock], + [2, RWLockWriter, 'write'], + [3, RWLockReader, 'read'], + async () => { + expect(lockBox.isLocked()).toBe(true); + expect(lockBox.count).toBe(3); + throw new Error('oh no'); + }, + ), + ).rejects.toThrow('oh no'); + expect(lockBox.isLocked()).toBe(false); + expect(lockBox.count).toBe(0); + }); + test('mutual exclusion', async () => { + const lockBox = new LockBox(); + let value = 0; + await Promise.all([ + lockBox.withF(['somelock', Lock], async () => { + const value_ = value + 1; + await utils.sleep(100); + value = value_; + }), + lockBox.withF(['somelock', Lock], async () => { + const value_ = value + 1; + await utils.sleep(100); + value = value_; + }), + ]); + expect(value).toBe(2); + value = 0; + await Promise.all([ + (async () => { + const g = lockBox.withG( + ['somelock', Lock], + async function* (): AsyncGenerator { + const value_ = value + 1; + await utils.sleep(100); + value = value_; + return 'last'; + }, + ); + for await (const _ of g) { + } + })(), + (async () => { + const g = lockBox.withG( + ['somelock', Lock], + async function* (): AsyncGenerator { + const value_ = value + 1; + await utils.sleep(100); + value = value_; + return 'last'; + }, + ); + for await (const _ of g) { + } + })(), + ]); + expect(value).toBe(2); + }); + test('timeout', async () => { + const lockBox = new LockBox(); + await withF([lockBox.lock([1, Lock, 0])], async ([lock]) => { + expect(lock.isLocked()).toBe(true); + expect(lock.count).toBe(1); + const f = jest.fn(); + await expect(withF([lockBox.lock([1, Lock, 100])], f)).rejects.toThrow( + errors.ErrorAsyncLocksTimeout, + ); + expect(f).not.toBeCalled(); + expect(lock.isLocked()).toBe(true); + expect(lock.count).toBe(1); + }); + expect(lockBox.isLocked()).toBe(false); + expect(lockBox.count).toBe(0); + await lockBox.withF([1, Lock, 100], async () => { + const f = jest.fn(); + await expect(lockBox.withF([1, Lock, 100], f)).rejects.toThrow( + errors.ErrorAsyncLocksTimeout, + ); + expect(f).not.toBeCalled(); + }); + const g = lockBox.withG([1, Lock, 100], async function* () { + expect(lockBox.isLocked()).toBe(true); + expect(lockBox.count).toBe(1); + const f = jest.fn(); + const g = lockBox.withG([1, Lock, 100], f); + await expect(g.next()).rejects.toThrow(errors.ErrorAsyncLocksTimeout); + expect(f).not.toBeCalled(); + expect(lockBox.isLocked()).toBe(true); + expect(lockBox.count).toBe(1); + }); + await g.next(); + expect(lockBox.isLocked()).toBe(false); + expect(lockBox.count).toBe(0); + }); + test('timeout waiting for unlock', async () => { + const lockBox = new LockBox(); + await lockBox.waitForUnlock(100); + await withF([lockBox.lock([1, Lock])], async ([lockBox]) => { + await lockBox.waitForUnlock(100, 2); + await expect(lockBox.waitForUnlock(100, 1)).rejects.toThrow( + errors.ErrorAsyncLocksTimeout, + ); + await expect(lockBox.waitForUnlock(100)).rejects.toThrow( + errors.ErrorAsyncLocksTimeout, + ); + }); + await lockBox.waitForUnlock(100); + const g = withG([lockBox.lock([1, Lock])], async function* ([lockBox]) { + await lockBox.waitForUnlock(100, 2); + await expect(lockBox.waitForUnlock(100, 1)).rejects.toThrow( + errors.ErrorAsyncLocksTimeout, + ); + await expect(lockBox.waitForUnlock(100)).rejects.toThrow( + errors.ErrorAsyncLocksTimeout, + ); + }); + await g.next(); + await lockBox.waitForUnlock(100); + }); + test('multiple types of locks', async () => { + const lockBox = new LockBox(); + await lockBox.withF( + [1, Lock], + [2, RWLockReader, 'write'], + [3, RWLockWriter, 'read'], + async (lockBox) => { + expect(lockBox.isLocked(1)).toBe(true); + expect(lockBox.isLocked(2)).toBe(true); + expect(lockBox.isLocked(3)).toBe(true); + const f = jest.fn(); + await expect( + withF([lockBox.lock([1, Lock, 100], [2, Lock, 100])], f), + ).rejects.toThrow(errors.ErrorAsyncLocksTimeout); + expect(f).not.toBeCalled(); + }, + ); + }); + test('cannot use different lock type on the same active key', async () => { + const lockBox = new LockBox(); + await lockBox.withF([3, Lock], async (lockBox) => { + const f = jest.fn(); + await expect( + lockBox.withF([3, RWLockReader, 'write'], f), + ).rejects.toThrow(errors.ErrorAsyncLocksLockBoxConflict); + expect(f).not.toBeCalled(); + }); + await lockBox.withF([3, RWLockReader, 'write'], async (lockBox) => { + const f = jest.fn(); + await expect(lockBox.withF([3, Lock], f)).rejects.toThrow( + errors.ErrorAsyncLocksLockBoxConflict, + ); + expect(f).not.toBeCalled(); + }); + }); + test('prevent deadlocks with lock hierarchy via sorted lock keys', async () => { + const lockBox = new LockBox(); + let value = 0; + await Promise.all([ + lockBox.withF([1, Lock], [2, Lock], [3, Lock], [4, Lock], async () => { + const value_ = value + 1; + await utils.sleep(100); + value = value_; + }), + lockBox.withF([4, Lock], [3, Lock], [2, Lock], [1, Lock], async () => { + const value_ = value + 1; + await utils.sleep(100); + value = value_; + }), + ]); + expect(value).toBe(2); + value = 0; + const g1 = lockBox.withG( + ['1', Lock], + ['2', Lock], + async function* (): AsyncGenerator { + const value_ = value + 1; + await utils.sleep(100); + value = value_; + return 'last'; + }, + ); + const g2 = lockBox.withG( + ['2', Lock], + ['1', Lock], + async function* (): AsyncGenerator { + const value_ = value + 1; + await utils.sleep(100); + value = value_; + return 'last'; + }, + ); + await Promise.all([ + (async () => { + for await (const _ of g1) { + } + })(), + (async () => { + for await (const _ of g2) { + } + })(), + ]); + expect(value).toBe(2); + }); +});