Skip to content

Commit d2b68ac

Browse files
committed
add PartitionedSemaphore module (#5739)
1 parent f445b87 commit d2b68ac

File tree

4 files changed

+612
-0
lines changed

4 files changed

+612
-0
lines changed

.changeset/witty-jokes-lead.md

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
---
2+
"effect": patch
3+
---
4+
5+
add experimental PartitionedSemaphore module
6+
7+
A `PartitionedSemaphore` is a concurrency primitive that can be used to
8+
control concurrent access to a resource across multiple partitions identified
9+
by keys.
10+
11+
The total number of permits is shared across all partitions, with waiting
12+
permits equally distributed among partitions using a round-robin strategy.
13+
14+
This is useful when you want to limit the total number of concurrent accesses
15+
to a resource, while still allowing for fair distribution of access across
16+
different partitions.
17+
18+
```ts
19+
import { Effect, PartitionedSemaphore } from "effect"
20+
21+
Effect.gen(function* () {
22+
const semaphore = yield* PartitionedSemaphore.make<string>({ permits: 5 })
23+
24+
// Take the first 5 permits with key "A", then the following permits will be
25+
// equally distributed between all the keys using a round-robin strategy
26+
yield* Effect.log("A").pipe(
27+
Effect.delay(1000),
28+
semaphore.withPermits("A", 1),
29+
Effect.replicateEffect(15, { concurrency: "unbounded" }),
30+
Effect.fork
31+
)
32+
yield* Effect.log("B").pipe(
33+
Effect.delay(1000),
34+
semaphore.withPermits("B", 1),
35+
Effect.replicateEffect(10, { concurrency: "unbounded" }),
36+
Effect.fork
37+
)
38+
yield* Effect.log("C").pipe(
39+
Effect.delay(1000),
40+
semaphore.withPermits("C", 1),
41+
Effect.replicateEffect(10, { concurrency: "unbounded" }),
42+
Effect.fork
43+
)
44+
45+
return yield* Effect.never
46+
}).pipe(Effect.runFork)
47+
```
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
/**
2+
* @since 3.19.4
3+
* @experimental
4+
*/
5+
import * as Effect from "./Effect.js"
6+
import * as Iterable from "./Iterable.js"
7+
import * as MutableHashMap from "./MutableHashMap.js"
8+
import * as Option from "./Option.js"
9+
10+
/**
11+
* @since 3.19.4
12+
* @category Models
13+
* @experimental
14+
*/
15+
export const TypeId: TypeId = "~effect/PartitionedSemaphore"
16+
17+
/**
18+
* @since 3.19.4
19+
* @category Models
20+
* @experimental
21+
*/
22+
export type TypeId = "~effect/PartitionedSemaphore"
23+
24+
/**
25+
* A `PartitionedSemaphore` is a concurrency primitive that can be used to
26+
* control concurrent access to a resource across multiple partitions identified
27+
* by keys.
28+
*
29+
* The total number of permits is shared across all partitions, with waiting
30+
* permits equally distributed among partitions using a round-robin strategy.
31+
*
32+
* This is useful when you want to limit the total number of concurrent accesses
33+
* to a resource, while still allowing for fair distribution of access across
34+
* different partitions.
35+
*
36+
* @since 3.19.4
37+
* @category Models
38+
* @experimental
39+
*/
40+
export interface PartitionedSemaphore<in K> {
41+
readonly [TypeId]: TypeId
42+
43+
readonly withPermits: (
44+
key: K,
45+
permits: number
46+
) => <A, E, R>(effect: Effect.Effect<A, E, R>) => Effect.Effect<A, E, R>
47+
}
48+
49+
/**
50+
* A `PartitionedSemaphore` is a concurrency primitive that can be used to
51+
* control concurrent access to a resource across multiple partitions identified
52+
* by keys.
53+
*
54+
* The total number of permits is shared across all partitions, with waiting
55+
* permits equally distributed among partitions using a round-robin strategy.
56+
*
57+
* This is useful when you want to limit the total number of concurrent accesses
58+
* to a resource, while still allowing for fair distribution of access across
59+
* different partitions.
60+
*
61+
* @since 3.19.4
62+
* @category Constructors
63+
* @experimental
64+
*/
65+
export const makeUnsafe = <K = unknown>(options: {
66+
readonly permits: number
67+
}): PartitionedSemaphore<K> => {
68+
const maxPermits = Math.max(0, options.permits)
69+
70+
if (!Number.isFinite(maxPermits)) {
71+
return {
72+
[TypeId]: TypeId,
73+
withPermits: () => (effect) => effect
74+
}
75+
}
76+
77+
let totalPermits = maxPermits
78+
let waitingPermits = 0
79+
80+
type Waiter = {
81+
permits: number
82+
readonly resume: () => void
83+
}
84+
const partitions = MutableHashMap.empty<K, Set<Waiter>>()
85+
86+
const take = (key: K, permits: number) =>
87+
Effect.async<void>((resume) => {
88+
if (maxPermits < permits) {
89+
return resume(Effect.never)
90+
} else if (totalPermits >= permits) {
91+
totalPermits -= permits
92+
return resume(Effect.void)
93+
}
94+
95+
const needed = permits - totalPermits
96+
const taken = permits - needed
97+
if (totalPermits > 0) {
98+
totalPermits = 0
99+
}
100+
waitingPermits += needed
101+
102+
const waiters = Option.getOrElse(
103+
MutableHashMap.get(partitions, key),
104+
() => {
105+
const set = new Set<Waiter>()
106+
MutableHashMap.set(partitions, key, set)
107+
return set
108+
}
109+
)
110+
111+
const entry: Waiter = {
112+
permits: needed,
113+
resume() {
114+
cleanup()
115+
resume(Effect.void)
116+
}
117+
}
118+
function cleanup() {
119+
waiters.delete(entry)
120+
if (waiters.size === 0) {
121+
MutableHashMap.remove(partitions, key)
122+
}
123+
}
124+
waiters.add(entry)
125+
return Effect.sync(() => {
126+
cleanup()
127+
waitingPermits -= entry.permits
128+
if (taken > 0) {
129+
releaseUnsafe(taken)
130+
}
131+
})
132+
})
133+
134+
let iterator = partitions[Symbol.iterator]()
135+
const releaseUnsafe = (permits: number) => {
136+
while (permits > 0) {
137+
if (waitingPermits === 0) {
138+
totalPermits += permits
139+
return
140+
}
141+
142+
let state = iterator.next()
143+
if (state.done) {
144+
iterator = partitions[Symbol.iterator]()
145+
state = iterator.next()
146+
if (state.done) return
147+
}
148+
149+
const entry = Iterable.unsafeHead(state.value[1])
150+
entry.permits--
151+
waitingPermits--
152+
if (entry.permits === 0) entry.resume()
153+
permits--
154+
}
155+
}
156+
157+
return {
158+
[TypeId]: TypeId,
159+
withPermits: (key, permits) => {
160+
const takePermits = take(key, permits)
161+
const release: <A, E, R>(effect: Effect.Effect<A, E, R>) => Effect.Effect<A, E, R> = Effect.matchCauseEffect({
162+
onFailure(cause) {
163+
releaseUnsafe(permits)
164+
return Effect.failCause(cause)
165+
},
166+
onSuccess(value) {
167+
releaseUnsafe(permits)
168+
return Effect.succeed(value)
169+
}
170+
})
171+
return (effect) =>
172+
Effect.uninterruptibleMask((restore) =>
173+
Effect.flatMap(
174+
restore(takePermits),
175+
() => release(restore(effect))
176+
)
177+
)
178+
}
179+
}
180+
}
181+
182+
/**
183+
* A `PartitionedSemaphore` is a concurrency primitive that can be used to
184+
* control concurrent access to a resource across multiple partitions identified
185+
* by keys.
186+
*
187+
* The total number of permits is shared across all partitions, with waiting
188+
* permits equally distributed among partitions using a round-robin strategy.
189+
*
190+
* This is useful when you want to limit the total number of concurrent accesses
191+
* to a resource, while still allowing for fair distribution of access across
192+
* different partitions.
193+
*
194+
* @since 3.19.4
195+
* @category Constructors
196+
* @experimental
197+
*/
198+
export const make = <K = unknown>(options: {
199+
readonly permits: number
200+
}): Effect.Effect<PartitionedSemaphore<K>> => Effect.sync(() => makeUnsafe<K>(options))

packages/effect/src/index.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1090,6 +1090,12 @@ export * as Ordering from "./Ordering.js"
10901090
*/
10911091
export * as ParseResult from "./ParseResult.js"
10921092

1093+
/**
1094+
* @since 3.19.4
1095+
* @experimental
1096+
*/
1097+
export * as PartitionedSemaphore from "./PartitionedSemaphore.js"
1098+
10931099
/**
10941100
* @since 2.0.0
10951101
*/

0 commit comments

Comments
 (0)