Fix durable inbox message loss via ListeningAgent coordination#2196
Fix durable inbox message loss via ListeningAgent coordination#2196jeremydmiller merged 3 commits intomainfrom
Conversation
Implements backpressure support for RabbitMQ when the durable inbox database is unavailable. The pause check is in WorkerQueueMessageConsumer's HandleBasicDeliverAsync to block message delivery while paused. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Implements backpressure support for Redis streams when the durable inbox database is unavailable. Adds a spin-wait check in the ConsumerLoop before each read cycle. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…se is unavailable Replace per-listener ISupportConsumerPause approach with centralized coordination through ListeningAgent. When DurableReceiver detects inbox database failure, it signals the ListeningAgent to stop the listener and drain in-flight messages. A new InboxHealthRestarter probes the database with exponential backoff and restarts the listener when connectivity is restored. Also forces Kafka to disable auto-commit in durable mode to prevent offset advancement on unconsumed messages. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
Hi @jeremydmiller, quick question on this one, I noticed you are disabling the EnableAutoOffsetStore, but you are not storing the offset anywhere else. I could be wrong, but if you don't store the offset, the commit won't have anything to push. When the application restarts, it will skip to the last message of the topic, causing messages to be "lost". This is because of auto.offset.reset, which defaults to Latest. However, if you leave the autocommit to be controlled by the user (ConsumerConfig), which by default uploads the stored offset every 5 secs, and only stores the offset on the CompleteAsync, that would enable the offset to be properly persisted in Kafka. You don't really want to store and commit every time because that could cause extra latency (having that handshake on every message). |
Summary
DurableReceiversignalListeningAgentto stop the transport listener and drain in-flight messagesInboxHealthRestarterthat probes the inbox database with exponential backoff (2s → 30s) and automatically restarts the listener when connectivity is restoredEnableAutoCommit=false/EnableAutoOffsetStore=falsein durable mode so offsets are only committed on successful inbox persistenceISupportConsumerPauseinterface — replaces per-listener pause/resume spin-waits (Kafka, RabbitMQ, Redis) with centralized coordination throughListeningAgentCloses #1708
Test plan
🤖 Generated with Claude Code