Skip to content

Commit 3ffc8a1

Browse files
authored
Semaphore: Add timeout option (#1300)
Adds an optional `timeout` option to the Semaphore class that throws an error when the Semaphore cannot be acquired for `timeout` milliseconds. Related to #1287 ## Testing instructions Confirm the unit tests pass
1 parent 30a707c commit 3ffc8a1

File tree

3 files changed

+53
-3
lines changed

3 files changed

+53
-3
lines changed

packages/php-wasm/util/src/lib/semaphore.spec.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import Semaphore from './semaphore';
1+
import Semaphore, { AcquireTimeoutError } from './semaphore';
22

33
describe('RequestsPerIntervaledSemaphore', () => {
44
it('should limit the number of concurrent lock holders', async () => {
@@ -40,4 +40,13 @@ describe('RequestsPerIntervaledSemaphore', () => {
4040

4141
expect(semaphore.running).toBe(1);
4242
});
43+
it('should wait for the lock no longer than the timeout', async () => {
44+
const semaphore = new Semaphore({
45+
concurrency: 1,
46+
timeout: 1,
47+
});
48+
49+
await semaphore.acquire();
50+
expect(() => semaphore.acquire()).rejects.toThrow(AcquireTimeoutError);
51+
});
4352
});

packages/php-wasm/util/src/lib/semaphore.ts

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,38 @@
1+
import { SleepFinished, sleep } from './sleep';
2+
13
export interface SemaphoreOptions {
4+
/**
5+
* The maximum number of concurrent locks.
6+
*/
27
concurrency: number;
8+
/**
9+
* The maximum time to wait for a lock to become available.
10+
*/
11+
timeout?: number;
12+
}
13+
14+
export class AcquireTimeoutError extends Error {
15+
constructor() {
16+
super('Acquiring lock timed out');
17+
}
318
}
419

520
export default class Semaphore {
621
private _running = 0;
722
private concurrency: number;
23+
private timeout?: number;
824
private queue: (() => void)[];
925

10-
constructor({ concurrency }: SemaphoreOptions) {
26+
constructor({ concurrency, timeout }: SemaphoreOptions) {
1127
this.concurrency = concurrency;
28+
this.timeout = timeout;
1229
this.queue = [];
1330
}
1431

32+
get remaining(): number {
33+
return this.concurrency - this.running;
34+
}
35+
1536
get running(): number {
1637
return this._running;
1738
}
@@ -20,7 +41,20 @@ export default class Semaphore {
2041
while (true) {
2142
if (this._running >= this.concurrency) {
2243
// Concurrency exhausted – wait until a lock is released:
23-
await new Promise<void>((resolve) => this.queue.push(resolve));
44+
const acquired = new Promise<void>((resolve) => {
45+
this.queue.push(resolve);
46+
});
47+
if (this.timeout !== undefined) {
48+
await Promise.race([acquired, sleep(this.timeout)]).then(
49+
(value) => {
50+
if (value === SleepFinished) {
51+
throw new AcquireTimeoutError();
52+
}
53+
}
54+
);
55+
} else {
56+
await acquired;
57+
}
2458
} else {
2559
// Acquire the lock:
2660
this._running++;
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
export const SleepFinished = Symbol('SleepFinished');
2+
3+
export function sleep(ms: number): Promise<typeof SleepFinished> {
4+
return new Promise((resolve) => {
5+
setTimeout(() => resolve(SleepFinished), ms);
6+
});
7+
}

0 commit comments

Comments
 (0)