Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce LockBox for managing lock hierarchy #6

Merged
merged 1 commit into from
Apr 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .eslintrc
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@
},
{
"selector": "variable",
"format": ["camelCase", "UPPER_CASE"],
"format": ["camelCase", "UPPER_CASE", "PascalCase"],
"leadingUnderscore": "allow",
"trailingUnderscore": "allowSingleOrDouble"
},
Expand Down
3 changes: 2 additions & 1 deletion src/Lock.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import type { MutexInterface } from 'async-mutex';
import type { ResourceAcquire } from '@matrixai/resources';
import type { Lockable } from './types';
import { Mutex, withTimeout } from 'async-mutex';
import { withF, withG } from '@matrixai/resources';
import { sleep, yieldMicro } from './utils';
import { ErrorAsyncLocksTimeout } from './errors';

class Lock {
class Lock implements Lockable {
protected _lock: Mutex = new Mutex();
protected _count: number = 0;

Expand Down
149 changes: 149 additions & 0 deletions src/LockBox.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
import type { ResourceAcquire, ResourceRelease } from '@matrixai/resources';
import type { Lockable, ToString } from './types';
import { withF, withG } from '@matrixai/resources';
import { ErrorAsyncLocksLockBoxConflict } from './errors';

type LockRequest<L extends Lockable> = [
key: ToString,
lockConstructor: new () => L,
...lockingParams: Parameters<L['lock']>
];

class LockBox<L extends Lockable> implements Lockable {
protected _locks: Map<string, L> = new Map();

public lock(...requests: Array<LockRequest<L>>): ResourceAcquire<LockBox<L>> {
return async () => {
// Convert to strings
// This creates a copy of the requests
let requests_: Array<[string, new () => L, ...Parameters<L['lock']>]> =
requests.map(([key, ...rest]) =>
typeof key === 'string' ? [key, ...rest] : [key.toString(), ...rest],
);
// Sort to ensure lock hierarchy
requests_.sort(([key1], [key2]) => {
// Deterministic string comparison according to 16-bit code units
if (key1 < key2) return -1;
if (key1 > key2) return 1;
return 0;
});
// Avoid duplicate locking
requests_ = requests_.filter(
([key], i, arr) => i === 0 || key !== arr[i - 1][0],
);
const locks: Array<[string, ResourceRelease, L]> = [];
for (const [key, LockConstructor, ...lockingParams] of requests_) {
let lock = this._locks.get(key);
if (lock == null) {
lock = new LockConstructor();
this._locks.set(key, lock);
} else {
// It is possible to swap the lock class, but only after the lock key is released
if (!(lock instanceof LockConstructor)) {
throw new ErrorAsyncLocksLockBoxConflict(
`Lock ${key} is already locked with class ${lock.constructor.name}, which conflicts with class ${LockConstructor.name}`,
);
}
}
const lockAcquire = lock.lock(...lockingParams);
let lockRelease: ResourceRelease;
try {
[lockRelease] = await lockAcquire();
} catch (e) {
// Release all intermediate locks in reverse order
locks.reverse();
for (const [key, lockRelease, lock] of locks) {
await lockRelease();
if (!lock.isLocked()) {
this._locks.delete(key);
}
}
throw e;
}
locks.push([key, lockRelease, lock]);
}
return [
async () => {
// Release all locks in reverse order
locks.reverse();
for (const [key, lockRelease, lock] of locks) {
await lockRelease();
if (!lock.isLocked()) {
this._locks.delete(key);
}
}
},
this,
];
};
}

get locks(): ReadonlyMap<string, L> {
return this._locks;
}

public get count(): number {
let count = 0;
for (const lock of this._locks.values()) {
count += lock.count;
}
return count;
}

public isLocked(key?: ToString): boolean {
if (key == null) {
for (const lock of this._locks.values()) {
if (lock.isLocked()) return true;
}
return false;
} else {
const lock = this._locks.get(key.toString());
if (lock == null) return false;
return lock.isLocked();
}
}

public async waitForUnlock(timeout?: number, key?: ToString): Promise<void> {
if (key == null) {
const ps: Array<Promise<void>> = [];
for (const lock of this._locks.values()) {
ps.push(lock.waitForUnlock(timeout));
}
await Promise.all(ps);
} else {
const lock = this._locks.get(key.toString());
if (lock == null) return;
await lock.waitForUnlock(timeout);
}
}

public async withF<T>(
...params: [
...requests: Array<LockRequest<L>>,
f: (lockBox: LockBox<L>) => Promise<T>,
]
): Promise<T> {
const f = params.pop() as (lockBox: LockBox<L>) => Promise<T>;
return withF(
[this.lock(...(params as Array<LockRequest<L>>))],
([lockBox]) => f(lockBox),
);
}

public withG<T, TReturn, TNext>(
...params: [
...requests: Array<LockRequest<L>>,
g: (lockBox: LockBox<L>) => AsyncGenerator<T, TReturn, TNext>,
]
): AsyncGenerator<T, TReturn, TNext> {
const g = params.pop() as (
lockBox: LockBox<L>,
) => AsyncGenerator<T, TReturn, TNext>;
return withG(
[this.lock(...(params as Array<LockRequest<L>>))],
([lockBox]) => g(lockBox),
);
}
}

export default LockBox;
35 changes: 26 additions & 9 deletions src/RWLockReader.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { MutexInterface } from 'async-mutex';
import type { ResourceAcquire } from '@matrixai/resources';
import type { Lockable } from './types';
import { Mutex, withTimeout } from 'async-mutex';
import { withF, withG } from '@matrixai/resources';
import { sleep, yieldMicro } from './utils';
Expand All @@ -8,20 +9,32 @@ import { ErrorAsyncLocksTimeout } from './errors';
/**
* Read-preferring read write lock
*/
class RWLockReader {
class RWLockReader implements Lockable {
protected _readerCount: number = 0;
protected _writerCount: number = 0;
protected lock: Mutex = new Mutex();
protected _lock: Mutex = new Mutex();
protected release: MutexInterface.Releaser;

public lock(
type: 'read' | 'write',
timeout?: number,
): ResourceAcquire<RWLockReader> {
switch (type) {
case 'read':
return this.read(timeout);
case 'write':
return this.write(timeout);
}
}

public read(timeout?: number): ResourceAcquire<RWLockReader> {
return async () => {
const readerCount = ++this._readerCount;
// The first reader locks
if (readerCount === 1) {
let lock: MutexInterface = this.lock;
let lock: MutexInterface = this._lock;
if (timeout != null) {
lock = withTimeout(this.lock, timeout, new ErrorAsyncLocksTimeout());
lock = withTimeout(this._lock, timeout, new ErrorAsyncLocksTimeout());
}
try {
this.release = await lock.acquire();
Expand Down Expand Up @@ -51,9 +64,9 @@ class RWLockReader {
public write(timeout?: number): ResourceAcquire<RWLockReader> {
return async () => {
++this._writerCount;
let lock: MutexInterface = this.lock;
let lock: MutexInterface = this._lock;
if (timeout != null) {
lock = withTimeout(this.lock, timeout, new ErrorAsyncLocksTimeout());
lock = withTimeout(this._lock, timeout, new ErrorAsyncLocksTimeout());
}
let release: MutexInterface.Releaser;
try {
Expand All @@ -74,6 +87,10 @@ class RWLockReader {
};
}

public get count(): number {
return this.readerCount + this.writerCount;
}

public get readerCount(): number {
return this._readerCount;
}
Expand All @@ -83,14 +100,14 @@ class RWLockReader {
}

public isLocked(): boolean {
return this.lock.isLocked();
return this._lock.isLocked();
}

public async waitForUnlock(timeout?: number): Promise<void> {
if (timeout != null) {
let timedOut = false;
await Promise.race([
this.lock.waitForUnlock(),
this._lock.waitForUnlock(),
sleep(timeout).then(() => {
timedOut = true;
}),
Expand All @@ -99,7 +116,7 @@ class RWLockReader {
throw new ErrorAsyncLocksTimeout();
}
} else {
await this.lock.waitForUnlock();
await this._lock.waitForUnlock();
}
}

Expand Down
19 changes: 18 additions & 1 deletion src/RWLockWriter.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { MutexInterface } from 'async-mutex';
import type { ResourceAcquire } from '@matrixai/resources';
import type { Lockable } from './types';
import { performance } from 'perf_hooks';
import { Mutex, withTimeout } from 'async-mutex';
import { withF, withG } from '@matrixai/resources';
Expand All @@ -9,14 +10,26 @@ import { ErrorAsyncLocksTimeout } from './errors';
/**
* Write-preferring read write lock
*/
class RWLockWriter {
class RWLockWriter implements Lockable {
protected readersLock: Mutex = new Mutex();
protected writersLock: Mutex = new Mutex();
protected readersRelease: MutexInterface.Releaser;
protected readerCountBlocked: number = 0;
protected _readerCount: number = 0;
protected _writerCount: number = 0;

public lock(
type: 'read' | 'write',
timeout?: number,
): ResourceAcquire<RWLockWriter> {
switch (type) {
case 'read':
return this.read(timeout);
case 'write':
return this.write(timeout);
}
}

public read(timeout?: number): ResourceAcquire<RWLockWriter> {
return async () => {
const t1 = performance.now();
Expand Down Expand Up @@ -126,6 +139,10 @@ class RWLockWriter {
};
}

public get count(): number {
return this.readerCount + this.writerCount;
}

public get readerCount(): number {
return this._readerCount + this.readerCountBlocked;
}
Expand Down
13 changes: 11 additions & 2 deletions src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,16 @@ class ErrorAsyncLocks<T> extends AbstractError<T> {
}

class ErrorAsyncLocksTimeout<T> extends ErrorAsyncLocks<T> {
static description = 'Async lock timeout';
static description = 'Async locks timeout';
}

export { ErrorAsyncLocks, ErrorAsyncLocksTimeout };
class ErrorAsyncLocksLockBoxConflict<T> extends ErrorAsyncLocks<T> {
static description =
'LockBox cannot lock same ID with different Lockable classes';
}

export {
ErrorAsyncLocks,
ErrorAsyncLocksTimeout,
ErrorAsyncLocksLockBoxConflict,
};
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
export { default as Lock } from './Lock';
export { default as RWLockReader } from './RWLockReader';
export { default as RWLockWriter } from './RWLockWriter';
export { default as LockBox } from './LockBox';
export * as utils from './utils';
export * as errors from './errors';
export * from './types';
18 changes: 17 additions & 1 deletion src/types.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,22 @@
import type { ResourceAcquire } from '@matrixai/resources';

/**
* Plain data dictionary
*/
type POJO = { [key: string]: any };

export type { POJO };
/**
* Any type that can be turned into a string
*/
interface ToString {
toString(): string;
}

interface Lockable {
count: number;
lock(...params: Array<unknown>): ResourceAcquire<Lockable>;
isLocked(): boolean;
waitForUnlock(timeout?: number): Promise<void>;
}

export type { POJO, ToString, Lockable };
Loading