Skip to content

Commit 0a9ec23

Browse files
authored
fix multiple persisted requests subscribing to the same id (#5484)
1 parent 333be04 commit 0a9ec23

File tree

2 files changed

+35
-12
lines changed

2 files changed

+35
-12
lines changed

.changeset/tame-birds-shout.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@effect/cluster": patch
3+
---
4+
5+
fix multiple persisted requests subscribing to the same id

packages/cluster/src/Runners.ts

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -195,34 +195,53 @@ export const make: (options: Omit<Runners["Type"], "sendLocal" | "notifyLocal">)
195195

196196
type StorageRequestEntry = {
197197
readonly latch: Effect.Latch
198+
doneLatch: Effect.Latch | undefined
199+
readonly messages: Array<Message.OutgoingRequest<any>>
198200
replies: Array<Reply.Reply<any>>
199201
}
200202
const storageRequests = new Map<Snowflake.Snowflake, StorageRequestEntry>()
201203
const waitingStorageRequests = new Map<Snowflake.Snowflake, Message.OutgoingRequest<any>>()
202204
const replyFromStorage = Effect.fnUntraced(
203205
function*(message: Message.OutgoingRequest<any>) {
204-
const entry: StorageRequestEntry = {
205-
latch: Effect.unsafeMakeLatch(false),
206-
replies: []
206+
let entry = storageRequests.get(message.envelope.requestId)
207+
if (!entry) {
208+
entry = {
209+
latch: Effect.unsafeMakeLatch(false),
210+
doneLatch: undefined,
211+
replies: [],
212+
messages: []
213+
}
214+
storageRequests.set(message.envelope.requestId, entry)
215+
}
216+
entry.messages.push(message)
217+
if (entry.messages.length > 1) {
218+
entry.doneLatch ??= Effect.unsafeMakeLatch(false)
219+
return yield* entry.doneLatch.await
207220
}
208-
storageRequests.set(message.envelope.requestId, entry)
209221

210222
while (true) {
211223
// wait for the storage loop to notify us
212224
entry.latch.unsafeClose()
213225
waitingStorageRequests.set(message.envelope.requestId, message)
214-
yield* storageLatch.open
226+
storageLatch.unsafeOpen()
215227
yield* entry.latch.await
216228

217229
// send the replies back
218-
for (const reply of entry.replies) {
230+
for (let i = 0; i < entry.replies.length; i++) {
231+
const reply = entry.replies[i]
219232
// we have reached the end
220233
if (reply._tag === "WithExit") {
221-
return yield* message.respond(reply)
234+
for (let j = 0; j < entry.messages.length; j++) {
235+
yield* entry.messages[j].respond(reply)
236+
}
237+
entry.doneLatch?.unsafeOpen()
238+
return
222239
}
223240

224241
entry.latch.unsafeClose()
225-
yield* message.respond(reply)
242+
for (let i = 0; i < entry.messages.length; i++) {
243+
yield* entry.messages[i].respond(reply)
244+
}
226245
yield* entry.latch.await
227246
}
228247
entry.replies = []
@@ -261,17 +280,16 @@ export const make: (options: Omit<Runners["Type"], "sendLocal" | "notifyLocal">)
261280
const foundRequests = new Set<StorageRequestEntry>()
262281

263282
// put the replies into the storage requests and then open the latches
264-
for (const reply of replies) {
283+
for (let i = 0; i < replies.length; i++) {
284+
const reply = replies[i]
265285
const entry = storageRequests.get(reply.requestId)
266286
if (!entry) continue
267287
entry.replies.push(reply)
268288
waitingStorageRequests.delete(reply.requestId)
269289
foundRequests.add(entry)
270290
}
271291

272-
for (const entry of foundRequests) {
273-
entry.latch.unsafeOpen()
274-
}
292+
foundRequests.forEach((entry) => entry.latch.unsafeOpen())
275293
}
276294
}).pipe(
277295
Effect.interruptible,

0 commit comments

Comments
 (0)