Skip to content

Commit

Permalink
Merge branch 'main' into mumur3-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Dec 12, 2022
2 parents 80786b2 + 176d12c commit e935d4b
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 27 deletions.
67 changes: 41 additions & 26 deletions shotover-proxy/src/transforms/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type InnerChain = Vec<Transforms>;
pub struct BufferedChainMessages {
pub local_addr: SocketAddr,
pub messages: Messages,
pub flush: bool,
pub return_chan: Option<oneshot::Sender<crate::error::ChainResponse>>,
}

Expand All @@ -25,18 +26,21 @@ impl BufferedChainMessages {
BufferedChainMessages {
local_addr,
messages: m,
flush: false,
return_chan: None,
}
}

pub fn new(
m: Messages,
local_addr: SocketAddr,
flush: bool,
return_chan: oneshot::Sender<ChainResponse>,
) -> Self {
BufferedChainMessages {
local_addr,
messages: m,
flush,
return_chan: Some(return_chan),
}
}
Expand Down Expand Up @@ -98,6 +102,7 @@ impl BufferedChain {
.send(BufferedChainMessages::new(
wrapper.messages,
wrapper.local_addr,
wrapper.flush,
one_tx,
))
.map_err(|e| anyhow!("Couldn't send message to wrapped chain {:?}", e))
Expand All @@ -106,7 +111,12 @@ impl BufferedChain {
Some(timeout) => {
self.send_handle
.send_timeout(
BufferedChainMessages::new(wrapper.messages, wrapper.local_addr, one_tx),
BufferedChainMessages::new(
wrapper.messages,
wrapper.local_addr,
wrapper.flush,
one_tx,
),
Duration::from_micros(timeout),
)
.map_err(|e| anyhow!("Couldn't send message to wrapped chain {:?}", e))
Expand All @@ -122,27 +132,34 @@ impl BufferedChain {
wrapper: Wrapper<'_>,
buffer_timeout_micros: Option<u64>,
) -> Result<()> {
match buffer_timeout_micros {
None => {
self.send_handle
.send(BufferedChainMessages::new_with_no_return(
wrapper.messages,
wrapper.local_addr,
))
.map_err(|e| anyhow!("Couldn't send message to wrapped chain {:?}", e))
.await?
}
Some(timeout) => {
self.send_handle
.send_timeout(
BufferedChainMessages::new_with_no_return(
if wrapper.flush {
// To obey flush request we need to ensure messages have completed sending before returning.
// In order to achieve that we need to use the regular process_request method.
self.process_request(wrapper, buffer_timeout_micros).await?;
} else {
// When there is no flush we can return much earlier by not waiting for a response.
match buffer_timeout_micros {
None => {
self.send_handle
.send(BufferedChainMessages::new_with_no_return(
wrapper.messages,
wrapper.local_addr,
),
Duration::from_micros(timeout),
)
.map_err(|e| anyhow!("Couldn't send message to wrapped chain {:?}", e))
.await?
))
.map_err(|e| anyhow!("Couldn't send message to wrapped chain {:?}", e))
.await?
}
Some(timeout) => {
self.send_handle
.send_timeout(
BufferedChainMessages::new_with_no_return(
wrapper.messages,
wrapper.local_addr,
),
Duration::from_micros(timeout),
)
.map_err(|e| anyhow!("Couldn't send message to wrapped chain {:?}", e))
.await?
}
}
}
Ok(())
Expand Down Expand Up @@ -280,19 +297,17 @@ impl TransformChainBuilder {
local_addr,
return_chan,
messages,
flush,
}) = rx.recv().await
{
#[cfg(test)]
{
count_clone.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}

let chain_response = chain
.process_request(
Wrapper::new_with_chain_name(messages, chain.name.clone(), local_addr),
chain.name.clone(),
)
.await;
let mut wrapper = Wrapper::new_with_chain_name(messages, chain.name.clone(), local_addr);
wrapper.flush = flush;
let chain_response = chain.process_request(wrapper, chain.name.clone()).await;

if let Err(e) = &chain_response {
error!("Internal error in buffered chain: {e:?}");
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/src/transforms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ impl<'a> Clone for Wrapper<'a> {
client_details: self.client_details.clone(),
chain_name: self.chain_name.clone(),
local_addr: self.local_addr,
flush: false,
flush: self.flush,
}
}
}
Expand Down

0 comments on commit e935d4b

Please sign in to comment.