-
Notifications
You must be signed in to change notification settings - Fork 690
feat: allow specifying consumer name for NATS queue + manually purge old messages #2740
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughImplements per-consumer delivery in NATS by adding an optional consumer name to NatsQueue, updates durable consumer creation and queue size lookup to respect it, adds a purge_up_to_sequence API affecting the stream, and introduces integration tests validating broadcast semantics and purge behavior. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor App as App
participant Q_A as NatsQueue(A)<br/><i>consumer=A</i>
participant Q_B as NatsQueue(B)<br/><i>consumer=B</i>
participant JS as NATS JetStream
rect rgb(235, 245, 255)
note over Q_A,JS: Setup
App->>Q_A: new_with_consumer("stream", "nats", timeout, "A")
App->>Q_B: new_with_consumer("stream", "nats", timeout, "B")
Q_A->>JS: Ensure pull consumer durable="A"
Q_B->>JS: Ensure pull consumer durable="B"
end
rect rgb(240, 255, 240)
note over App,JS: Publish
App->>JS: Publish messages M1..Mn to stream
end
rect rgb(255, 250, 235)
note over App,JS: Purge up to sequence S
App->>Q_A: purge_up_to_sequence(S)
Q_A->>JS: Purge stream messages < S
JS-->>Q_A: Purge result
end
rect rgb(245, 245, 255)
note over Q_A,Q_B: Consume post-purge
Q_A->>JS: Pull (durable="A")
JS-->>Q_A: Deliver messages >= S
Q_B->>JS: Pull (durable="B")
JS-->>Q_B: Deliver messages >= S
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
Status, Documentation and Community
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (8)
lib/runtime/src/transports/nats.rs (8)
446-448: Promote default consumer name to a single constant; avoid magic string duplication."worker-group" appears in multiple places. Define a DEFAULT_CONSUMER_NAME and reuse to prevent drift.
Apply within changed ranges:
- .unwrap_or_else(|| "worker-group".to_string()), + .unwrap_or_else(|| DEFAULT_CONSUMER_NAME.to_string()),- let consumer_name = self - .consumer_name - .clone() - .unwrap_or_else(|| "worker-group".to_string()); + let consumer_name = self + .consumer_name + .clone() + .unwrap_or_else(|| DEFAULT_CONSUMER_NAME.to_string());Add this constant near other top-level consts:
const DEFAULT_CONSUMER_NAME: &str = "worker-group";
469-488: Sanitize/validate consumer_name to avoid invalid durable names.Mirror the stream_name sanitization to prevent slashes/backslashes (and potential server-side rejections).
pub fn new_with_consumer( stream_name: String, nats_server: String, dequeue_timeout: time::Duration, consumer_name: String, ) -> Self { let sanitized_stream_name = stream_name.replace(['/', '\\'], "_"); let subject = format!("{}.*", sanitized_stream_name); + let sanitized_consumer_name = consumer_name.replace(['/', '\\'], "_"); Self { stream_name: sanitized_stream_name, nats_server, dequeue_timeout, client: None, subject, subscriber: None, - consumer_name: Some(consumer_name), + consumer_name: Some(sanitized_consumer_name), } }
514-518: Durable name fallback is good; consider pinning key consumer policies explicitly.Relying on crate defaults (deliver/ack policy) can be brittle. Explicitly set deliver=All and ack=Explicit in the pull consumer config to lock semantics.
If you want to verify exact field names for async-nats pull consumer config in your current version, I can look it up and propose a concrete diff. Do you want me to fetch the latest docs?
596-602: Per-consumer queue size makes sense; add minor resilience.If the consumer was deleted out-of-band, get_consumer will error. Optionally, return 0 in that case or recreate the consumer to keep this API robust.
612-633: Guard no-op purges and fix doc link; purge builder reference points to sync crate.
- Add an early return for sequence <= 1 (no messages before the first).
- The comment links to nats (sync) docs; switch to async-nats or drop the link.
Apply within changed range:
pub async fn purge_up_to_sequence(&self, sequence: u64) -> Result<()> { if let Some(client) = &self.client { let stream = client.jetstream().get_stream(&self.stream_name).await?; + if sequence <= 1 { + log::debug!( + "purge_up_to_sequence: sequence <= 1; nothing to purge for stream {}", + self.stream_name + ); + return Ok(()); + } - // NOTE: this purge excludes the sequence itself - // https://docs.rs/nats/latest/nats/jetstream/struct.PurgeRequest.html + // NOTE: this purge excludes the sequence itself (see async-nats purge builder docs) stream.purge().sequence(sequence).await.map_err(|e| { anyhow::anyhow!("Failed to purge stream up to sequence {}: {}", sequence, e) })?;
846-965: Stabilize the integration test to avoid flakiness with zero fetch expiry.Use a small positive timeout and pass it to dequeue_task to avoid prematurely breaking the drain loop when messages are pending but not returned immediately.
- let timeout = time::Duration::from_secs(0); + let timeout = time::Duration::from_millis(50);- while let Some(msg) = queue1 - .dequeue_task(None) + while let Some(msg) = queue1 + .dequeue_task(Some(timeout)) .await .expect("Failed to dequeue from queue1") { consumer1_messages.push(msg); }- while let Some(msg) = queue2 - .dequeue_task(None) + while let Some(msg) = queue2 + .dequeue_task(Some(timeout)) .await .expect("Failed to dequeue from queue2") { consumer2_messages.push(msg); }
897-915: Clarify misleading comment; no stream info used for the purge decision.The code purges using a fixed sequence (3) and doesn’t read stream info here. Update the comment to reflect the actual behavior.
- // Get stream info to find the sequence numbers - // We need to know the sequence of message 2 to purge up to it + // Purge the first two messages by passing sequence=3 (purge is exclusive)
959-965: Optional cleanup: close queues before deleting the stream.Closing avoids dangling consumers on teardown (especially useful when un-ignoring this test).
- // Clean up by deleting the stream + // Clean up by closing consumers and deleting the stream + queue1.close().await.ok(); + queue2.close().await.ok(); client .jetstream() .delete_stream(&stream_name) .await .expect("Failed to delete test stream");
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
lib/runtime/src/transports/nats.rs(6 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
lib/runtime/src/transports/nats.rs (2)
lib/bindings/python/src/dynamo/_core.pyi (2)
NatsQueue(876-939)new(107-125)lib/bindings/python/rust/llm/nats.rs (1)
new(27-34)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: pre-merge-rust (lib/bindings/python)
- GitHub Check: Build and Test - dynamo
- GitHub Check: pre-merge-rust (.)
- GitHub Check: pre-merge-rust (lib/runtime/examples)
🔇 Additional comments (1)
lib/runtime/src/transports/nats.rs (1)
465-467: LGTM: Backward-compatible default preserved.Defaulting to None keeps existing worker-queue behavior intact.
…old messages (#2740) Signed-off-by: krishung5 <krish@nvidia.com>
…old messages (#2740) Signed-off-by: Jason Zhou <jasonzho@jasonzho-mlt.client.nvidia.com>
…old messages (#2740) Signed-off-by: Krishnan Prashanth <kprashanth@nvidia.com>
…old messages (#2740) Signed-off-by: nnshah1 <neelays@nvidia.com>
Add broadcast pattern and purge support to NatsQueue
Summary
Extends
NatsQueueto support multiple consumers independently receiving all messages (broadcast pattern) and adds stream purging to prevent unbounded storage growth.Motivation
Changes
1. Configurable Consumer Names
consumer_namefield toNatsQueuenew_with_consumer()constructor for creating queues with unique consumer names2. Stream Purging
purge_up_to_sequence()to permanently remove messages up to a specified sequenceTesting
Added
test_nats_queue_broadcast_with_purgeintegration test verifying:Backward Compatibility
Existing code using
NatsQueue::new()continues unchanged with default "worker-group" consumer.Summary by CodeRabbit