Skip to content

Commit

Permalink
Add no responders handling
Browse files Browse the repository at this point in the history
  • Loading branch information
Jarema committed May 25, 2022
1 parent c279362 commit a3c370c
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 18 deletions.
35 changes: 28 additions & 7 deletions async-nats/src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashSet;
use std::num::{NonZeroU16, NonZeroU8};
use std::str::FromStr;

use subslice::SubsliceExt;
Expand Down Expand Up @@ -114,6 +114,8 @@ impl Connection {
headers: None,
subject,
payload,
status: None,
description: None,
}));
}
}
Expand Down Expand Up @@ -203,20 +205,31 @@ impl Connection {
}

let mut headers = HeaderMap::new();
let mut maybe_status: Option<NonZeroU16> = None;
let mut maybe_description: Option<String> = None;
if let Some(slice) = version_line.get("NATS/1.0".len()..).map(|s| s.trim()) {
match slice.split_once(' ') {
Some((status, description)) => {
if !status.is_empty() {
headers.append("Status", status.trim().try_into().unwrap());
maybe_status = Some(status.trim().parse().map_err(|_| {
std::io::Error::new(
io::ErrorKind::Other,
"could not covert Description header into header value",
)
})?);
}
if !description.is_empty() {
headers
.append("Description", status.trim().try_into().unwrap());
maybe_description = Some(description.trim().to_string());
}
}
None => {
if !slice.is_empty() {
headers.append("Status", slice.try_into().unwrap());
maybe_status = Some(slice.trim().parse().map_err(|_| {
std::io::Error::new(
io::ErrorKind::Other,
"could not covert Description header into header value",
)
})?);
}
}
}
Expand Down Expand Up @@ -251,6 +264,8 @@ impl Connection {
subject,
headers: Some(headers),
payload,
status: maybe_status,
description: maybe_description,
}));
}
}
Expand Down Expand Up @@ -524,6 +539,8 @@ mod read_op {
reply: None,
headers: None,
payload: "Hello World".into(),
status: None,
description: None,
})
);

Expand All @@ -541,6 +558,8 @@ mod read_op {
reply: Some("INBOX.34".into()),
headers: None,
payload: "Hello World".into(),
status: None,
description: None,
})
);

Expand All @@ -565,6 +584,8 @@ mod read_op {
"X".parse().unwrap()
)])),
payload: "Hello World".into(),
status: None,
description: None,
})
);
}
Expand Down Expand Up @@ -780,7 +801,7 @@ mod write_op {
pass: None,
auth_token: None,
headers: false,
no_responders: true,
no_responders: false,
}))
.await
.unwrap();
Expand All @@ -789,7 +810,7 @@ mod write_op {
reader.read_line(&mut buffer).await.unwrap();
assert_eq!(
buffer,
"CONNECT {\"verbose\":false,\"pedantic\":false,\"jwt\":null,\"nkey\":null,\"sig\":null,\"name\":null,\"echo\":false,\"lang\":\"Rust\",\"version\":\"1.0.0\",\"protocol\":1,\"tls_required\":false,\"user\":null,\"pass\":null,\"auth_token\":null,\"headers\":false,\"no_responders\":true}\r\n"
"CONNECT {\"verbose\":false,\"pedantic\":false,\"jwt\":null,\"nkey\":null,\"sig\":null,\"name\":null,\"echo\":false,\"lang\":\"Rust\",\"version\":\"1.0.0\",\"protocol\":1,\"tls_required\":false,\"user\":null,\"pass\":null,\"auth_token\":null,\"headers\":false,\"no_responders\":false}\r\n"
);
}
}
45 changes: 43 additions & 2 deletions async-nats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ use std::cmp;
use std::collections::HashMap;
use std::iter;
use std::net::{SocketAddr, ToSocketAddrs};
use std::num::NonZeroU128;
use std::num::NonZeroU16;
use std::num::NonZeroU8;
use std::option;
use std::pin::Pin;
use std::slice;
Expand Down Expand Up @@ -223,6 +226,8 @@ pub(crate) enum ServerOp {
reply: Option<String>,
payload: Bytes,
headers: Option<HeaderMap>,
status: Option<NonZeroU16>,
description: Option<String>,
},
}

Expand Down Expand Up @@ -507,13 +512,17 @@ impl ConnectionHandler {
reply,
payload,
headers,
status,
description,
} => {
if let Some(subscription) = self.subscriptions.get_mut(&sid) {
let message = Message {
subject,
reply,
payload,
headers,
status,
description,
};

// if the channel for subscription was dropped, remove the
Expand Down Expand Up @@ -791,7 +800,15 @@ impl Client {
self.publish_with_reply(subject, inbox, payload).await?;
self.flush().await?;
match sub.next().await {
Some(message) => Ok(message),
Some(message) => {
if message.is_no_responders() {
return Err(Box::new(std::io::Error::new(
ErrorKind::NotFound,
"nats: no responders",
)));
}
Ok(message)
}
None => Err(Box::new(io::Error::new(
ErrorKind::BrokenPipe,
"did not receive any message",
Expand All @@ -811,7 +828,15 @@ impl Client {
.await?;
self.flush().await?;
match sub.next().await {
Some(message) => Ok(message),
Some(message) => {
if message.is_no_responders() {
return Err(Box::new(std::io::Error::new(
ErrorKind::NotFound,
"nats: no responders",
)));
}
Ok(message)
}
None => Err(Box::new(io::Error::new(
ErrorKind::BrokenPipe,
"did not receive any message",
Expand Down Expand Up @@ -1052,8 +1077,24 @@ pub struct Message {
pub reply: Option<String>,
pub payload: Bytes,
pub headers: Option<HeaderMap>,
status: Option<NonZeroU16>,
description: Option<String>,
}

impl Message {
fn is_no_responders(&self) -> bool {
if !self.payload.is_empty() {
return false;
}
if self.status == NonZeroU16::new(NO_RESPONDERS) {
return true;
}
false
}
}

const NO_RESPONDERS: u16 = 503;

/// Retrieves messages from given `subscription` created by [Client::subscribe].
///
/// Implements [futures::stream::Stream] for ergonomic async message processing.
Expand Down
14 changes: 5 additions & 9 deletions async-nats/tests/client_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// limitations under the License.

mod client {
use async_nats::{ConnectOptions, ServerError};
use async_nats::{ConnectOptions, Message, ServerError};
use bytes::Bytes;
use futures::future::join_all;
use futures::stream::StreamExt;
Expand Down Expand Up @@ -198,17 +198,13 @@ mod client {
let server = nats_server::run_basic_server();
let client = async_nats::connect(server.client_url()).await.unwrap();

let resp = tokio::time::timeout(
tokio::time::Duration::from_millis(500),
tokio::time::timeout(
tokio::time::Duration::from_millis(300),
client.request("test".into(), "request".into()),
)
.await
.unwrap();
println!(
"headers: {:?}",
resp.unwrap().headers.unwrap().get("Status").unwrap()
);
// assert_eq!(resp.unwrap().payload, Bytes::from("reply"));
.unwrap()
.unwrap_err();
}

#[tokio::test]
Expand Down

0 comments on commit a3c370c

Please sign in to comment.