Skip to content

Commit 26d1267

Browse files
authored
ensure runner detects when it has been externally unregistered (#5508)
1 parent 1c4b6ae commit 26d1267

File tree

4 files changed

+43
-21
lines changed

4 files changed

+43
-21
lines changed

.changeset/thick-falcons-brush.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@effect/cluster": patch
3+
---
4+
5+
ensure runner detects when it has been externally unregistered

packages/cluster/src/Runners.ts

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -196,28 +196,27 @@ export const make: (options: Omit<Runners["Type"], "sendLocal" | "notifyLocal">)
196196
type StorageRequestEntry = {
197197
readonly latch: Effect.Latch
198198
doneLatch: Effect.Latch | undefined
199-
readonly messages: Array<Message.OutgoingRequest<any>>
199+
readonly messages: Set<Message.OutgoingRequest<any>>
200200
replies: Array<Reply.Reply<any>>
201201
}
202202
const storageRequests = new Map<Snowflake.Snowflake, StorageRequestEntry>()
203203
const waitingStorageRequests = new Map<Snowflake.Snowflake, Message.OutgoingRequest<any>>()
204204
const replyFromStorage = Effect.fnUntraced(
205205
function*(message: Message.OutgoingRequest<any>) {
206206
let entry = storageRequests.get(message.envelope.requestId)
207-
if (!entry) {
207+
if (entry) {
208+
entry.messages.add(message)
209+
entry.doneLatch ??= Effect.unsafeMakeLatch(false)
210+
return yield* entry.doneLatch.await
211+
} else {
208212
entry = {
209213
latch: Effect.unsafeMakeLatch(false),
210214
doneLatch: undefined,
211215
replies: [],
212-
messages: []
216+
messages: new Set([message])
213217
}
214218
storageRequests.set(message.envelope.requestId, entry)
215219
}
216-
entry.messages.push(message)
217-
if (entry.messages.length > 1) {
218-
entry.doneLatch ??= Effect.unsafeMakeLatch(false)
219-
return yield* entry.doneLatch.await
220-
}
221220

222221
while (true) {
223222
// wait for the storage loop to notify us
@@ -231,16 +230,16 @@ export const make: (options: Omit<Runners["Type"], "sendLocal" | "notifyLocal">)
231230
const reply = entry.replies[i]
232231
// we have reached the end
233232
if (reply._tag === "WithExit") {
234-
for (let j = 0; j < entry.messages.length; j++) {
235-
yield* entry.messages[j].respond(reply)
233+
for (const message of entry.messages) {
234+
yield* message.respond(reply)
236235
}
237236
entry.doneLatch?.unsafeOpen()
238237
return
239238
}
240239

241240
entry.latch.unsafeClose()
242-
for (let i = 0; i < entry.messages.length; i++) {
243-
yield* entry.messages[i].respond(reply)
241+
for (const message of entry.messages) {
242+
yield* message.respond(reply)
244243
}
245244
yield* entry.latch.await
246245
}
@@ -251,6 +250,11 @@ export const make: (options: Omit<Runners["Type"], "sendLocal" | "notifyLocal">)
251250
Effect.ensuring(
252251
effect,
253252
Effect.sync(() => {
253+
const entry = storageRequests.get(message.envelope.requestId)
254+
if (!entry || entry.messages.size > 1) {
255+
entry?.messages.delete(message)
256+
return
257+
}
254258
storageRequests.delete(message.envelope.requestId)
255259
waitingStorageRequests.delete(message.envelope.requestId)
256260
})

packages/cluster/src/ShardManager.ts

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ export class ShardManager extends Context.Tag("@effect/cluster/ShardManager")<Sh
5959
*/
6060
readonly shardingEvents: (
6161
address: Option.Option<RunnerAddress>
62-
) => Effect.Effect<Queue.Dequeue<ShardingEvent>, never, Scope>
62+
) => Effect.Effect<Queue.Dequeue<ShardingEvent>, RunnerNotRegistered, Scope>
6363
/**
6464
* Register a new runner with the cluster.
6565
*/
@@ -293,6 +293,7 @@ export class Rpcs extends RpcGroup.make(
293293
Rpc.make("ShardingEvents", {
294294
payload: { address: Schema.Option(RunnerAddress) },
295295
success: ShardingEventSchema,
296+
error: RunnerNotRegistered,
296297
stream: true
297298
}),
298299
Rpc.make("GetTime", {
@@ -744,11 +745,16 @@ export const make = Effect.gen(function*() {
744745

745746
return ShardManager.of({
746747
getAssignments,
747-
shardingEvents: (address) =>
748-
Effect.zipRight(
749-
Option.isSome(address) ? runnerHealthApi.onConnection(address.value) : Effect.void,
750-
PubSub.subscribe(events)
751-
),
748+
shardingEvents: (address) => {
749+
if (Option.isNone(address)) {
750+
return PubSub.subscribe(events)
751+
}
752+
return Effect.tap(PubSub.subscribe(events), () => {
753+
const isRegistered = MutableHashMap.has(state.allRunners, address.value)
754+
if (isRegistered) return
755+
return Effect.fail(new RunnerNotRegistered({ address: address.value }))
756+
})
757+
},
752758
register,
753759
unregister,
754760
rebalance,

packages/cluster/src/Sharding.ts

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import {
3636
EntityNotManagedByRunner,
3737
RunnerUnavailable
3838
} from "./ClusterError.js"
39+
import * as ClusterError from "./ClusterError.js"
3940
import { Persisted, Uninterruptible } from "./ClusterSchema.js"
4041
import * as ClusterSchema from "./ClusterSchema.js"
4142
import type { CurrentAddress, CurrentRunnerAddress, Entity, HandlersFrom } from "./Entity.js"
@@ -830,11 +831,12 @@ const make = Effect.gen(function*() {
830831

831832
yield* Effect.logDebug("Subscribing to sharding events")
832833
const mailbox = yield* shardManager.shardingEvents(config.runnerAddress)
833-
const startedLatch = yield* Deferred.make<void>()
834+
const startedLatch = yield* Deferred.make<void, ClusterError.RunnerNotRegistered>()
834835

835836
const eventsFiber = yield* Effect.gen(function*() {
836837
while (true) {
837-
const [events] = yield* mailbox.takeAll
838+
const [events, done] = yield* mailbox.takeAll
839+
if (done) return
838840
for (const event of events) {
839841
yield* Effect.logDebug("Received sharding event", event)
840842

@@ -868,11 +870,16 @@ const make = Effect.gen(function*() {
868870
}
869871
break
870872
}
873+
case "RunnerUnregistered": {
874+
if (!isLocalRunner(event.address)) break
875+
return yield* Effect.fail(new ClusterError.RunnerNotRegistered({ address: event.address }))
876+
}
871877
}
872878
}
873879
}
874880
}).pipe(
875881
Effect.intoDeferred(startedLatch),
882+
Effect.zipRight(Effect.dieMessage("Shard manager event stream down")),
876883
Effect.forkScoped
877884
)
878885

@@ -886,7 +893,7 @@ const make = Effect.gen(function*() {
886893
Effect.forkScoped
887894
)
888895

889-
yield* Fiber.joinAll([eventsFiber, syncFiber])
896+
return yield* Fiber.joinAll([eventsFiber, syncFiber])
890897
}).pipe(
891898
Effect.scoped,
892899
Effect.catchAllCause((cause) => Effect.logDebug(cause)),

0 commit comments

Comments
 (0)