From d790f7ebe4da11e94cb8d5c4a8f0baa6cd78e268 Mon Sep 17 00:00:00 2001 From: Danny Browning Date: Sat, 25 Jan 2020 11:58:27 -0700 Subject: [PATCH 1/2] Switch to async synchronization primitives --- src/nats_client/client.rs | 36 +++--- src/nats_client/client_inner.rs | 113 +++++++---------- src/nats_client/mod.rs | 7 +- src/stan_client/client.rs | 208 ++++++++++++++------------------ src/stan_client/mod.rs | 6 +- tests/nats_client_test.rs | 7 +- 6 files changed, 164 insertions(+), 213 deletions(-) diff --git a/src/nats_client/client.rs b/src/nats_client/client.rs index 08235c7..bb3ab9e 100644 --- a/src/nats_client/client.rs +++ b/src/nats_client/client.rs @@ -3,7 +3,8 @@ use crate::ops::{Subscribe, Message, Publish}; use futures::{StreamExt}; use crate::nats_client::{NatsClient, NatsClientOptions, NatsClientInner, NatsSid, ReconnectHandler, NatsClientState}; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; +use tokio::sync::RwLock; use crate::error::RatsioError; use futures::lock::Mutex; @@ -25,7 +26,7 @@ impl NatsClient { opts, server_info: RwLock::new(None), subscriptions: Arc::new(Mutex::new(HashMap::default())), - on_reconnect: std::sync::Mutex::new(None), + on_reconnect: tokio::sync::Mutex::new(None), state: RwLock::new(NatsClientState::Connecting), last_ping: RwLock::new(NatsClientInner::time_in_millis()), reconnect_version: RwLock::new(version), @@ -44,13 +45,17 @@ impl NatsClient { let arc_client = Arc::new(client); let reconn_client = arc_client.clone(); - - if let Ok(mut client_ref) = arc_client.inner.client_ref.write() { + { + let mut client_ref = arc_client.inner.client_ref.write().await; *client_ref = Some(arc_client.clone()); } - if let Ok(mut reconnect) = arc_client.inner.on_reconnect.lock() { - *reconnect = Some(Box::new(move || { reconn_client.on_reconnect() })); + { + let mut reconnect = arc_client.inner.on_reconnect.lock().await; + let reconnect_f = async move { + reconn_client.on_reconnect().await + }; + *reconnect = Some(Box::pin(reconnect_f)); } //heartbeat monitor @@ -143,19 +148,18 @@ impl NatsClient { self.inner.stop().await } - pub fn add_reconnect_handler(&self, handler: ReconnectHandler) -> Result<(), RatsioError> { - if let Ok(mut handlers) = self.reconnect_handlers.write() { - handlers.push(handler); - } + pub async fn add_reconnect_handler(&self, handler: ReconnectHandler) -> Result<(), RatsioError> { + let mut handlers = self.reconnect_handlers.write().await; + handlers.push(handler); + Ok(()) } - pub (in crate::nats_client) fn on_reconnect(&self) -> () { - if let Ok(handlers) = self.reconnect_handlers.read() { - let handlers: &Vec = handlers.as_ref(); - for handler in handlers { - handler(self) - } + pub (in crate::nats_client) async fn on_reconnect(&self) -> () { + let handlers = self.reconnect_handlers.read().await; + let handlers: &Vec = handlers.as_ref(); + for handler in handlers { + handler(self) } } } diff --git a/src/nats_client/client_inner.rs b/src/nats_client/client_inner.rs index eef05ef..2b36a70 100644 --- a/src/nats_client/client_inner.rs +++ b/src/nats_client/client_inner.rs @@ -79,12 +79,8 @@ impl NatsClientInner { let stream_self = self_arc.clone(); let _ = tokio::spawn(async move { while let Some(item) = stream.next().await { - let current_version = if let Ok(version) = stream_self.reconnect_version.read() { - *version - } else { - 1 - }; - if current_version != version { + let current_version = stream_self.reconnect_version.read().await; + if *current_version != version { break; } stream_self.process_nats_event(item).await @@ -107,9 +103,8 @@ impl NatsClientInner { jwt: None, }); self_arc.send_command(connect).await?; - if let Ok(mut state_guard) = self_arc.state.write() { - *state_guard = NatsClientState::Connected; - } + let mut state_guard = self_arc.state.write().await; + *state_guard = NatsClientState::Connected; Ok(()) } @@ -122,15 +117,14 @@ impl NatsClientInner { } pub(in crate::nats_client) async fn process_nats_event(&self, item: Op) { - self.ping_pong_reset(); + self.ping_pong_reset().await; match item { Op::CLOSE => { let _ = self.stop().await; } Op::INFO(server_info) => { - if let Ok(mut info) = self.server_info.write() { - *info = Some(server_info) - } + let mut info = self.server_info.write().await; + *info = Some(server_info) } Op::PING => { match self.send_command(Op::PONG).await { @@ -155,10 +149,9 @@ impl NatsClientInner { } } - pub(in crate::nats_client) fn ping_pong_reset(&self) { - if let Ok(mut last_ping) = self.last_ping.write() { - *last_ping = Self::time_in_millis(); - } + pub(in crate::nats_client) async fn ping_pong_reset(&self) { + let mut last_ping = self.last_ping.write().await; + *last_ping = Self::time_in_millis(); } pub(in crate::nats_client) async fn subscribe( @@ -226,12 +219,11 @@ impl NatsClientInner { } pub(in crate::nats_client) async fn stop(&self) -> Result<(), RatsioError> { - if let Ok(mut state_guard) = self.state.write() { - *state_guard = NatsClientState::Shutdown; - } - if let Ok(mut reconnect) = self.on_reconnect.lock() { - *reconnect = None; - } + let mut state_guard = self.state.write().await; + *state_guard = NatsClientState::Shutdown; + + let mut reconnect = self.on_reconnect.lock().await; + *reconnect = None; //Close all subscritions. let mut subscriptions = self.subscriptions.lock().await; @@ -244,60 +236,48 @@ impl NatsClientInner { let _ = self.send_command(cmd).await; } subscriptions.clear(); - if let Ok(mut client_ref) = self.client_ref.write() { - *client_ref = None - } + let mut client_ref = self.client_ref.write().await; + *client_ref = None; + Ok(()) } pub async fn reconnect(&self) -> Result<(), RatsioError> { - if let Ok(mut state_guard) = self.state.write() { - if *state_guard == NatsClientState::Disconnected { - *state_guard = NatsClientState::Reconnecting; - } else { - return Ok(()); - } + let mut state_guard = self.state.write().await; + if *state_guard == NatsClientState::Disconnected { + *state_guard = NatsClientState::Reconnecting; } else { - return Err(RatsioError::CannotReconnectToServer); + return Ok(()); } match self.do_reconnect().await { Ok(_) => { - if let Ok(mut state_guard) = self.state.write() { - *state_guard = NatsClientState::Connected; - } + let mut state_guard = self.state.write().await; + *state_guard = NatsClientState::Connected; Ok(()) } Err(err) => { error!("Error trying to reconnect to NATS {:?}", err); - if let Ok(mut state_guard) = self.state.write() { - *state_guard = NatsClientState::Disconnected; - } + let mut state_guard = self.state.write().await; + *state_guard = NatsClientState::Disconnected; Err(err) } } } async fn do_reconnect(&self) -> Result<(), RatsioError> { - let client_ref = if let Ok(client_ref_guard) = self.client_ref.read() { - if let Some(client_ref) = client_ref_guard.as_ref() { - client_ref.clone() - } else { - return Err(RatsioError::CannotReconnectToServer); - } + let client_ref_guard = self.client_ref.read().await; + let client_ref = if let Some(client_ref) = client_ref_guard.as_ref() { + client_ref.clone() } else { - return Err(RatsioError::InternalServerError); + return Err(RatsioError::CannotReconnectToServer); }; let tcp_stream = Self::try_connect(self.opts.clone(), &self.opts.cluster_uris.0, true).await?; let (sink, stream) = NatsTcpStream::new(tcp_stream).await.split(); *self.conn_sink.lock().await = sink; - let new_version = if let Ok(mut version) = self.reconnect_version.write() { - let new_version = *version + 1; - *version = new_version; - new_version - } else { - return Err(RatsioError::CannotReconnectToServer); - }; + let mut version = self.reconnect_version.write().await; + let new_version = *version + 1; + *version = new_version; info!("Reconnecting to NATS servers 4 - new version {}", new_version); let _ = NatsClientInner::start(client_ref.inner.clone(), new_version, stream).await?; if self.opts.subscribe_on_reconnect { @@ -313,7 +293,7 @@ impl NatsClientInner { } } } - client_ref.on_reconnect(); + client_ref.on_reconnect().await; Ok(()) } @@ -327,10 +307,9 @@ impl NatsClientInner { let ping_max_out = u128::from(self.opts.ping_max_out); loop { let _ = Delay::new(Duration::from_millis((ping_interval / 2) as u64)).await; - if let Ok(state_guard) = self.state.read() { - if *state_guard == NatsClientState::Shutdown { - break; - } + let state_guard = self.state.read().await; + if *state_guard == NatsClientState::Shutdown { + break; } let mut reconnect_required = false; @@ -344,21 +323,19 @@ impl NatsClientInner { if !reconnect_required { let _ = Delay::new(Duration::from_millis((ping_interval / 2) as u64)).await; let now = Self::time_in_millis(); - if let Ok(last_ping) = self.last_ping.read() { - if now - *last_ping > ping_interval { - error!("Missed ping interval") - } - if (now - *last_ping) > (ping_max_out * ping_interval) { - reconnect_required = true; - } + let last_ping = self.last_ping.read().await; + if now - *last_ping > ping_interval { + error!("Missed ping interval") + } + if (now - *last_ping) > (ping_max_out * ping_interval) { + reconnect_required = true; } } if reconnect_required { error!("Missed too many pings, reconnect is required."); - if let Ok(mut state_guard) = self.state.write() { - *state_guard = NatsClientState::Disconnected - } + let mut state_guard = self.state.write().await; + *state_guard = NatsClientState::Disconnected; let _ = self.reconnect().await; } } diff --git a/src/nats_client/mod.rs b/src/nats_client/mod.rs index 8cfb872..b5eba83 100644 --- a/src/nats_client/mod.rs +++ b/src/nats_client/mod.rs @@ -4,7 +4,10 @@ mod client_inner; use crate::net::nats_tcp_stream::NatsTcpStream; use crate::ops::{ServerInfo, Op, Message, Subscribe}; -use std::sync::{Arc, RwLock}; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use tokio::sync::RwLock; use std::fmt::Debug; use futures::stream::{ SplitSink}; @@ -111,7 +114,7 @@ pub struct NatsClientInner { /// Server info server_info: RwLock>, subscriptions: Arc, Subscribe)>>>, - on_reconnect: std::sync::Mutex () + Send + Sync>>>, + on_reconnect: tokio::sync::Mutex + Send + Sync>>>>, state: RwLock, last_ping: RwLock, client_ref: RwLock>>, diff --git a/src/stan_client/client.rs b/src/stan_client/client.rs index 761e622..75863bd 100644 --- a/src/stan_client/client.rs +++ b/src/stan_client/client.rs @@ -1,5 +1,5 @@ use crate::stan_client::{StanClient, StanOptions, DEFAULT_DISCOVER_PREFIX, ClientInfo, StanMessage, StartPosition, DEFAULT_MAX_INFLIGHT, DEFAULT_ACK_WAIT, StanSubscribe, StanSid, Subscription, AckHandler}; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use crate::error::RatsioError; use crate::nuid::NUID; use crate::nats_client::{NatsClient, ClosableMessage}; @@ -8,8 +8,8 @@ use prost::Message; use nom::lib::std::collections::HashMap; use futures::{Stream, StreamExt}; use tokio::sync::mpsc::UnboundedReceiver; +use tokio::sync::RwLock; use sha2::{Digest, Sha256}; -use std::ops::Deref; use pin_project::pin_project; use std::task::{Context, Poll}; use std::pin::Pin; @@ -37,9 +37,9 @@ impl StanClient { id_gen })); - let conn_id = id_generator.write().unwrap().next(); + let conn_id = id_generator.write().await.next(); debug!("Connection id => {}", &conn_id); - let heartbeat_inbox: String = format!("_HB.{}", id_generator.write().unwrap().next()); + let heartbeat_inbox: String = format!("_HB.{}", id_generator.write().await.next()); let discover_subject: String = format!("{}.{}", DEFAULT_DISCOVER_PREFIX, options.cluster_id); let client_id = options.client_id.clone(); @@ -77,7 +77,7 @@ impl StanClient { subscriptions: RwLock::new(HashMap::default()), self_reference:RwLock::new(None), }); - *stan_client.self_reference.write().unwrap() = Some(stan_client.clone()); + *stan_client.self_reference.write().await = Some(stan_client.clone()); tokio::spawn(async move { let _ = StanClient::process_heartbeats( @@ -85,14 +85,13 @@ impl StanClient { client_id.clone(), heartbeat_inbox.clone()).await; }); - let reconnect_stan_client = stan_client.clone(); stan_client.nats_client.add_reconnect_handler(Box::new(move |_nats_client| { let stan_client = reconnect_stan_client.clone(); tokio::spawn(async move { let _ = stan_client.clone().on_reconnect().await; }); - }))?; + })).await?; Ok(stan_client) } @@ -107,7 +106,7 @@ impl StanClient { client_id: client_id.clone(), conn_id: conn_id.clone(), subject: heartbeat_inbox.clone(), - guid: id_generator.write().map(|mut id_gen| id_gen.next()).unwrap_or_default(), + guid: id_generator.write().await.next(), ..Default::default() }; @@ -127,8 +126,7 @@ impl StanClient { } async fn on_reconnect(&self) -> Result<(), RatsioError> { - let close_requests = self.client_info.read() - .map(|ci| ci.deref().close_requests.clone()).unwrap_or_default(); + let close_requests = self.client_info.read().await.close_requests.clone(); //We may need to disconnect first . let nats_client = self.nats_client.clone(); let close_request = protocol::CloseRequest { @@ -138,13 +136,12 @@ impl StanClient { close_request.encode(&mut close_req_buf).unwrap(); let _ = nats_client.publish(close_requests.clone(), &close_req_buf[..]).await?; - let conn_id = self.id_generator.write() - .map(|mut i| i.next()).unwrap_or_default(); - if let Ok(mut old_conn_id) = self.conn_id.write() { - *old_conn_id = conn_id.clone().into_bytes().clone(); - } + let conn_id = self.id_generator.write().await.next(); + let mut old_conn_id = self.conn_id.write().await; + *old_conn_id = conn_id.clone().into_bytes().clone(); + debug!("Connection id => {}", &conn_id); - let heartbeat_inbox: String = format!("_HB.{}", self.id_generator.write().unwrap().next()); + let heartbeat_inbox: String = format!("_HB.{}", self.id_generator.write().await.next()); let discover_subject: String = format!("{}.{}", DEFAULT_DISCOVER_PREFIX, self.options.cluster_id); let client_id = self.options.client_id.clone(); @@ -163,12 +160,11 @@ impl StanClient { let connect_response = protocol::ConnectResponse::decode(connect_response.payload.as_slice())?; let client_info: ClientInfo = connect_response.into(); - if let Ok(mut client_info_guard) = self.client_info.write() { - *client_info_guard = client_info.clone() - } - if let Ok(mut conn_id_guard) = self.conn_id.write() { - *conn_id_guard = conn_id.clone().into_bytes(); - } + let mut client_info_guard = self.client_info.write().await; + *client_info_guard = client_info.clone(); + + let mut conn_id_guard = self.conn_id.write().await; + *conn_id_guard = conn_id.clone().into_bytes(); let hb_id_generator = self.id_generator.clone(); let hb_nats_client = self.nats_client.clone(); @@ -178,13 +174,8 @@ impl StanClient { client_id.clone(), heartbeat_inbox.clone()).await; }); - let subscriptions = self.subscriptions.write().map(|m| { - let mut subs: Vec = Vec::new(); - for n in m.iter() { - subs.push(n.1.clone()) - } - subs - }).unwrap_or_default(); + let subscriptions = self.subscriptions.write().await; + let subscriptions = subscriptions.values().map(|s| s.clone()); for sub in subscriptions { let _ = self.re_subscribe(&client_info, sub).await; @@ -193,8 +184,7 @@ impl StanClient { } async fn re_subscribe(&self, client_info: &ClientInfo, sub: Subscription) -> Result<(), RatsioError> { - let inbox: String = format!("_SUB.{}", self.id_generator.write() - .map(|mut id_gen| id_gen.next()).unwrap_or_default()); + let inbox: String = format!("_SUB.{}", self.id_generator.write().await.next()); let sub_request = protocol::SubscriptionRequest { client_id: self.client_id.clone(), @@ -217,29 +207,28 @@ impl StanClient { let sub_response = protocol::SubscriptionResponse::decode(&sub_response.payload[..]).unwrap(); let ack_inbox = sub_response.ack_inbox.clone(); let (sid, mut subscription) = self.nats_client.subscribe(inbox.clone()).await?; - if let Ok(mut subscriptions) = self.subscriptions.write() { - let stan_sid = StanSid(sid); - let new_sub = Subscription { - client_id: self.client_id.clone(), - subject: sub.subject.clone(), - durable_name: sub.durable_name.clone(), - queue_group: sub.queue_group.clone(), - max_in_flight: sub.max_in_flight, - ack_wait_in_secs: sub.ack_wait_in_secs, - inbox: sub.inbox.clone(), - ack_inbox: ack_inbox.clone(), - unsub_requests: client_info.unsub_requests.clone(), - close_requests: client_info.close_requests.clone(), - sender: sender.clone(), - }; - subscriptions.insert((stan_sid.0).0.clone(), new_sub); + let mut subscriptions = self.subscriptions.write().await; + let stan_sid = StanSid(sid); + let new_sub = Subscription { + client_id: self.client_id.clone(), + subject: sub.subject.clone(), + durable_name: sub.durable_name.clone(), + queue_group: sub.queue_group.clone(), + max_in_flight: sub.max_in_flight, + ack_wait_in_secs: sub.ack_wait_in_secs, + inbox: sub.inbox.clone(), + ack_inbox: ack_inbox.clone(), + unsub_requests: client_info.unsub_requests.clone(), + close_requests: client_info.close_requests.clone(), + sender: sender.clone(), + }; + subscriptions.insert((stan_sid.0).0.clone(), new_sub); - tokio::spawn(async move { - while let Some(nats_msg) = subscription.next().await { - let _ = sender.send(ClosableMessage::Message(nats_msg)); - } - }); - } + tokio::spawn(async move { + while let Some(nats_msg) = subscription.next().await { + let _ = sender.send(ClosableMessage::Message(nats_msg)); + } + }); } Ok(()) } @@ -299,10 +288,8 @@ impl StanClient { max_in_flight: i32, ack_wait_in_secs: i32, start_position: StartPosition, start_sequence: u64, start_time_delta: Option, manual_acks: bool, ) -> Result<(StanSid, impl Stream + Send + Sync), RatsioError> { - let client_info = self.client_info.read().map(|e| e.clone()) - .unwrap_or_else(|_| Default::default()); - let inbox: String = format!("_SUB.{}", self.id_generator.write() - .map(|mut id_gen| id_gen.next()).unwrap_or_default()); + let client_info = self.client_info.read().await.clone(); + let inbox: String = format!("_SUB.{}", self.id_generator.write().await.next()); let sub_request = protocol::SubscriptionRequest { client_id: self.client_id.clone(), subject: subject.to_string(), @@ -327,42 +314,39 @@ impl StanClient { let sub_response = protocol::SubscriptionResponse::decode(&sub_response.payload[..]).unwrap(); let ack_inbox = sub_response.ack_inbox.clone(); let (sid, mut subscription) = self.nats_client.subscribe(inbox.clone()).await?; - if let Ok(mut subscriptions) = self.subscriptions.write() { - let (sender, receiver) = tokio::sync::mpsc::unbounded_channel(); - let stan_sid = StanSid(sid); - let sub = Subscription { - client_id: self.client_id.clone(), - subject: subject.clone(), - durable_name: durable_name.clone(), - queue_group: queue_group.clone(), - max_in_flight: max_in_flight, - ack_wait_in_secs: ack_wait_in_secs, - inbox: inbox.clone(), - ack_inbox: ack_inbox.clone(), - unsub_requests: client_info.unsub_requests.clone(), - close_requests: client_info.close_requests.clone(), - sender: sender.clone(), - }; - subscriptions.insert((stan_sid.0).0.clone(), sub); + let mut subscriptions = self.subscriptions.write().await; + let (sender, receiver) = tokio::sync::mpsc::unbounded_channel(); + let stan_sid = StanSid(sid); + let sub = Subscription { + client_id: self.client_id.clone(), + subject: subject.clone(), + durable_name: durable_name.clone(), + queue_group: queue_group.clone(), + max_in_flight: max_in_flight, + ack_wait_in_secs: ack_wait_in_secs, + inbox: inbox.clone(), + ack_inbox: ack_inbox.clone(), + unsub_requests: client_info.unsub_requests.clone(), + close_requests: client_info.close_requests.clone(), + sender: sender.clone(), + }; + subscriptions.insert((stan_sid.0).0.clone(), sub); - tokio::spawn(async move { - while let Some(nats_msg) = subscription.next().await { - let _ = sender.send(ClosableMessage::Message(nats_msg)); - } - }); - Ok((stan_sid, StanClosableReceiver{ - receiver, ack_inbox, manual_acks, stan_client:self.get_self_reference() - })) - } else { - Err(RatsioError::InternalServerError) - } + tokio::spawn(async move { + while let Some(nats_msg) = subscription.next().await { + let _ = sender.send(ClosableMessage::Message(nats_msg)); + } + }); + Ok((stan_sid, StanClosableReceiver{ + receiver, ack_inbox, manual_acks, stan_client:self.get_self_reference().await + })) } else { Err(RatsioError::InternalServerError) } } - fn get_self_reference(&self) -> Arc{ - self.self_reference.read().unwrap().clone().unwrap() + async fn get_self_reference(&self) -> Arc{ + self.self_reference.read().await.clone().unwrap() } async fn ack_message( @@ -412,10 +396,8 @@ impl StanClient { let mut hasher = Sha256::new(); hasher.input(payload); - let conn_id = self.conn_id.read() - .map(|i| i.clone()).unwrap_or_default(); - let guid = self.id_generator.write() - .map(|mut i| i.next()).unwrap_or_default(); + let conn_id = self.conn_id.read().await.clone(); + let guid = self.id_generator.write().await.next(); let pub_msg = protocol::PubMsg { sha256: Vec::from(&hasher.result()[..]), client_id: self.client_id.clone(), @@ -428,26 +410,15 @@ impl StanClient { let mut pub_req_buf: Vec = Vec::with_capacity(64); pub_msg.encode(&mut pub_req_buf).unwrap(); - if let Ok(client_info) = self.client_info.read() { - self.nats_client.publish(format!( - "{}.{}", client_info.pub_prefix, subject - ), pub_req_buf.as_slice()).await - } else { - Err(RatsioError::InternalServerError) - } + let client_info = self.client_info.read().await; + self.nats_client.publish(format!( + "{}.{}", client_info.pub_prefix, subject + ), pub_req_buf.as_slice()).await } pub async fn un_subscribe(&self, stan_sid: &StanSid) -> Result<(), RatsioError> { - let client_info = if let Ok(client_info) = self.client_info.read() { - client_info - } else { - return Err(RatsioError::InternalServerError); - }; - let mut subscriptions = if let Ok(subscriptions) = self.subscriptions.write() { - subscriptions - } else { - return Err(RatsioError::InternalServerError); - }; + let client_info = self.client_info.read().await; + let mut subscriptions = self.subscriptions.write().await; if let Some(subscription) = subscriptions.remove(&(stan_sid.0).0) { let unsub_msg = protocol::UnsubscribeRequest { client_id: self.client_id.clone(), @@ -468,17 +439,16 @@ impl StanClient { } pub async fn close(&self) -> Result<(), RatsioError> { - if let Ok(ref client_info) = self.client_info.read() { - let nats_client = self.nats_client.clone(); - let client_id = self.client_id.clone(); - let close_request = protocol::CloseRequest { - client_id, - }; - let mut close_req_buf: Vec = Vec::with_capacity(64); - close_request.encode(&mut close_req_buf).unwrap(); - let _ = nats_client.publish(client_info.close_requests.clone(), &close_req_buf[..]).await?; - } - *self.self_reference.write().unwrap() = None; + let client_info = self.client_info.read().await; + let nats_client = self.nats_client.clone(); + let client_id = self.client_id.clone(); + let close_request = protocol::CloseRequest { + client_id, + }; + let mut close_req_buf: Vec = Vec::with_capacity(64); + close_request.encode(&mut close_req_buf).unwrap(); + let _ = nats_client.publish(client_info.close_requests.clone(), &close_req_buf[..]).await?; + *self.self_reference.write().await = None; self.nats_client.close().await } } diff --git a/src/stan_client/mod.rs b/src/stan_client/mod.rs index 37ba02d..4df7e80 100644 --- a/src/stan_client/mod.rs +++ b/src/stan_client/mod.rs @@ -3,11 +3,9 @@ use crate::nuid::NUID; use std::{ collections::HashMap, - sync::{ - Arc, - RwLock, - }, + sync::Arc, }; +use tokio::sync::RwLock; use tokio::sync::mpsc::UnboundedSender; use failure::_core::fmt::{Debug, Formatter, Error}; diff --git a/tests/nats_client_test.rs b/tests/nats_client_test.rs index 55f0d5f..0f6019e 100644 --- a/tests/nats_client_test.rs +++ b/tests/nats_client_test.rs @@ -2,7 +2,6 @@ use ratsio::nats_client::NatsClient; use log::info; use futures::StreamExt; use ratsio::error::RatsioError; -use prost::Message; use ratsio::protocol; use ratsio::nuid; @@ -56,16 +55,16 @@ async fn test1() -> Result<(), RatsioError> { let _ = nats_client.publish("foo", b"Publish Message 2").await?; - let discover_subject = "_STAN.discover.test-cluster"; + let _discover_subject = "_STAN.discover.test-cluster"; let client_id = "test-1"; let conn_id = nuid::next(); let heartbeat_inbox = format!("_HB.{}", &conn_id); - let connect_request = protocol::ConnectRequest { + let _connect_request = protocol::ConnectRequest { client_id: client_id.into(), conn_id: conn_id.clone().as_bytes().into(), heartbeat_inbox: heartbeat_inbox.clone(), ..Default::default() }; - thread::sleep(time::Duration::from_secs(2)); + tokio::time::delay_for(std::time::Duration::from_secs(2)).await; Ok(()) } \ No newline at end of file From 1fae74be3227055ffe2c3f97241158cca83adacd Mon Sep 17 00:00:00 2001 From: Danny Browning Date: Sat, 25 Jan 2020 12:13:07 -0700 Subject: [PATCH 2/2] Move format out of async call --- src/stan_client/client.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/stan_client/client.rs b/src/stan_client/client.rs index 75863bd..14cff52 100644 --- a/src/stan_client/client.rs +++ b/src/stan_client/client.rs @@ -411,9 +411,10 @@ impl StanClient { let mut pub_req_buf: Vec = Vec::with_capacity(64); pub_msg.encode(&mut pub_req_buf).unwrap(); let client_info = self.client_info.read().await; - self.nats_client.publish(format!( + let subject = format!( "{}.{}", client_info.pub_prefix, subject - ), pub_req_buf.as_slice()).await + ); + self.nats_client.publish(subject, pub_req_buf.as_slice()).await } pub async fn un_subscribe(&self, stan_sid: &StanSid) -> Result<(), RatsioError> {