diff --git a/src/clients/cache/momento/http.rs b/src/clients/cache/momento/http.rs index 1736f1f..916b3c5 100644 --- a/src/clients/cache/momento/http.rs +++ b/src/clients/cache/momento/http.rs @@ -1,6 +1,7 @@ use crate::clients::common::*; use crate::workload::{ClientRequest, ClientWorkItemKind}; use crate::*; +use rustls::KeyLogFile; use async_channel::Receiver; use bytes::{Bytes, BytesMut}; @@ -56,11 +57,13 @@ pub async fn pool_manager(endpoint: String, _config: Config, queue: Queue { let response = response.await; - let response = match response { + let mut response = match response { Ok(r) => r, Err(_e) => { GET_EX.increment(); @@ -191,11 +196,22 @@ async fn task( // read the response body to completion let mut buffer = BytesMut::new(); - let mut body = response.into_body(); - while let Some(chunk) = body.data().await { + while let Some(chunk) = response.body_mut().data().await { if let Ok(b) = chunk { buffer.extend_from_slice(&b); + if response + .body_mut() + .flow_control() + .release_capacity(b.len()) + .is_err() + { + GET_EX.increment(); + + RESPONSE_EX.increment(); + + continue; + } } else { GET_EX.increment(); @@ -266,10 +282,10 @@ async fn task( let response = response.await; - let response = match response { + let mut response = match response { Ok(r) => r, Err(_e) => { - GET_EX.increment(); + SET_EX.increment(); RESPONSE_EX.increment(); @@ -282,13 +298,25 @@ async fn task( // read the response body to completion let mut buffer = BytesMut::new(); - let mut body = response.into_body(); - while let Some(chunk) = body.data().await { + while let Some(chunk) = response.body_mut().data().await { if let Ok(b) = chunk { + // info!("chunk for set: {}", b.len()); buffer.extend_from_slice(&b); + if response + .body_mut() + .flow_control() + .release_capacity(b.len()) + .is_err() + { + SET_EX.increment(); + + RESPONSE_EX.increment(); + + continue; + } } else { - GET_EX.increment(); + SET_EX.increment(); RESPONSE_EX.increment(); @@ -336,7 +364,7 @@ async fn task( Ok((response, _)) => { let response = response.await; - let response = match response { + let mut response = match response { Ok(r) => r, Err(_e) => { GET_EX.increment(); @@ -352,13 +380,23 @@ async fn task( // read the response body to completion let mut buffer = BytesMut::new(); - let mut body = response.into_body(); - while let Some(chunk) = body.data().await { + while let Some(chunk) = response.body_mut().data().await { if let Ok(b) = chunk { buffer.extend_from_slice(&b); + if response + .body_mut() + .flow_control() + .release_capacity(b.len()) + .is_err() + { + DELETE_EX.increment(); + RESPONSE_EX.increment(); + + continue; + } } else { - GET_EX.increment(); + DELETE_EX.increment(); RESPONSE_EX.increment();