Skip to content

Commit 35a9c34

Browse files
committed
impl EventPublisher trait for NatsQueue
Signed-off-by: PeaBrane <yanrpei@gmail.com>
1 parent f99b569 commit 35a9c34

File tree

1 file changed

+51
-9
lines changed

1 file changed

+51
-9
lines changed

lib/runtime/src/transports/nats.rs

Lines changed: 51 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,12 @@
2828
//! - `NATS_AUTH_CREDENTIALS_FILE`: the path to the credentials file
2929
//!
3030
//! Note: `NATS_AUTH_USERNAME` and `NATS_AUTH_PASSWORD` must be used together.
31+
use crate::traits::events::EventPublisher;
3132
use crate::{Result, metrics::MetricsRegistry};
3233

3334
use async_nats::connection::State;
3435
use async_nats::{Subscriber, client, jetstream};
36+
use async_trait::async_trait;
3537
use bytes::Bytes;
3638
use derive_builder::Builder;
3739
use futures::{StreamExt, TryStreamExt};
@@ -745,6 +747,39 @@ impl NatsQueue {
745747
}
746748
}
747749

750+
#[async_trait]
751+
impl EventPublisher for NatsQueue {
752+
fn subject(&self) -> String {
753+
self.stream_name.clone()
754+
}
755+
756+
async fn publish(
757+
&self,
758+
event_name: impl AsRef<str> + Send + Sync,
759+
event: &(impl Serialize + Send + Sync),
760+
) -> Result<()> {
761+
let bytes = serde_json::to_vec(event)?;
762+
self.publish_bytes(event_name, bytes).await
763+
}
764+
765+
async fn publish_bytes(
766+
&self,
767+
event_name: impl AsRef<str> + Send + Sync,
768+
bytes: Vec<u8>,
769+
) -> Result<()> {
770+
let subject = format!("{}.{}", self.subject(), event_name.as_ref());
771+
772+
// Note: enqueue_task requires &mut self, but EventPublisher requires &self
773+
// We need to ensure the client is connected and use it directly
774+
if let Some(client) = &self.client {
775+
client.jetstream().publish(subject, bytes.into()).await?;
776+
Ok(())
777+
} else {
778+
Err(anyhow::anyhow!("Client not connected"))
779+
}
780+
}
781+
}
782+
748783
/// Prometheus metrics that mirror the NATS client statistics (in primitive types)
749784
/// to be used for the System Status Server.
750785
///
@@ -988,21 +1023,28 @@ mod tests {
9881023
queue1.connect().await.expect("Failed to connect queue1");
9891024
queue2.connect().await.expect("Failed to connect queue2");
9901025

991-
// Send 4 messages
992-
let messages = vec![
993-
Bytes::from("message1"),
994-
Bytes::from("message2"),
995-
Bytes::from("message3"),
996-
Bytes::from("message4"),
1026+
// Send 4 messages using the EventPublisher trait
1027+
let message_strings = vec![
1028+
"message1".to_string(),
1029+
"message2".to_string(),
1030+
"message3".to_string(),
1031+
"message4".to_string(),
9971032
];
9981033

999-
for msg in &messages {
1034+
// Using the EventPublisher trait to publish messages
1035+
for (idx, msg) in message_strings.iter().enumerate() {
10001036
queue1
1001-
.enqueue_task(msg.clone())
1037+
.publish("queue", msg)
10021038
.await
1003-
.expect("Failed to enqueue message");
1039+
.expect(&format!("Failed to publish message {}", idx + 1));
10041040
}
10051041

1042+
// Convert messages to JSON-serialized Bytes for comparison
1043+
let messages: Vec<Bytes> = message_strings
1044+
.iter()
1045+
.map(|s| Bytes::from(serde_json::to_vec(s).unwrap()))
1046+
.collect();
1047+
10061048
// Give JetStream a moment to persist the messages
10071049
tokio::time::sleep(time::Duration::from_millis(100)).await;
10081050

0 commit comments

Comments
 (0)