Skip to content

Commit

Permalink
Revert "fix for momento http with large payloads (iopsystems#288)"
Browse files Browse the repository at this point in the history
This reverts commit 7bbe5d1.
  • Loading branch information
brayniac committed Oct 11, 2024
1 parent 875ebb9 commit 9c65c71
Showing 1 changed file with 18 additions and 56 deletions.
74 changes: 18 additions & 56 deletions src/clients/cache/momento/http.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::clients::common::*;
use crate::workload::{ClientRequest, ClientWorkItemKind};
use crate::*;
use rustls::KeyLogFile;

use async_channel::Receiver;
use bytes::{Bytes, BytesMut};
Expand Down Expand Up @@ -57,13 +56,11 @@ pub async fn pool_manager(endpoint: String, _config: Config, queue: Queue<SendRe
roots: webpki_roots::TLS_SERVER_ROOTS.into(),
};

let mut config = rustls::ClientConfig::builder()
.with_root_certificates(root_store)
.with_no_client_auth();

config.key_log = Arc::new(KeyLogFile::new());

let config = Arc::new(config);
let config = Arc::new(
rustls::ClientConfig::builder()
.with_root_certificates(root_store)
.with_no_client_auth(),
);

while RUNNING.load(Ordering::Relaxed) {
if client.is_none() {
Expand All @@ -81,9 +78,7 @@ pub async fn pool_manager(endpoint: String, _config: Config, queue: Queue<SendRe
.await
.unwrap();

let client_builder = ::h2::client::Builder::new().handshake(stream);

if let Ok((h2, connection)) = client_builder.await {
if let Ok((h2, connection)) = ::h2::client::handshake(stream).await {
tokio::spawn(async move {
let _ = connection.await;
});
Expand Down Expand Up @@ -179,7 +174,7 @@ async fn task(
Ok((response, _)) => {
let response = response.await;

let mut response = match response {
let response = match response {
Ok(r) => r,
Err(_e) => {
GET_EX.increment();
Expand All @@ -195,22 +190,11 @@ async fn task(
// read the response body to completion

let mut buffer = BytesMut::new();
let mut body = response.into_body();

while let Some(chunk) = response.body_mut().data().await {
while let Some(chunk) = body.data().await {
if let Ok(b) = chunk {
buffer.extend_from_slice(&b);
if response
.body_mut()
.flow_control()
.release_capacity(b.len())
.is_err()
{
GET_EX.increment();

RESPONSE_EX.increment();

continue;
}
} else {
GET_EX.increment();

Expand Down Expand Up @@ -283,10 +267,10 @@ async fn task(

let response = response.await;

let mut response = match response {
let response = match response {
Ok(r) => r,
Err(_e) => {
SET_EX.increment();
GET_EX.increment();

RESPONSE_EX.increment();

Expand All @@ -299,25 +283,13 @@ async fn task(
// read the response body to completion

let mut buffer = BytesMut::new();
let mut body = response.into_body();

while let Some(chunk) = response.body_mut().data().await {
while let Some(chunk) = body.data().await {
if let Ok(b) = chunk {
// info!("chunk for set: {}", b.len());
buffer.extend_from_slice(&b);
if response
.body_mut()
.flow_control()
.release_capacity(b.len())
.is_err()
{
SET_EX.increment();

RESPONSE_EX.increment();

continue;
}
} else {
SET_EX.increment();
GET_EX.increment();

RESPONSE_EX.increment();

Expand Down Expand Up @@ -367,7 +339,7 @@ async fn task(
Ok((response, _)) => {
let response = response.await;

let mut response = match response {
let response = match response {
Ok(r) => r,
Err(_e) => {
GET_EX.increment();
Expand All @@ -383,23 +355,13 @@ async fn task(
// read the response body to completion

let mut buffer = BytesMut::new();
let mut body = response.into_body();

while let Some(chunk) = response.body_mut().data().await {
while let Some(chunk) = body.data().await {
if let Ok(b) = chunk {
buffer.extend_from_slice(&b);
if response
.body_mut()
.flow_control()
.release_capacity(b.len())
.is_err()
{
DELETE_EX.increment();
RESPONSE_EX.increment();

continue;
}
} else {
DELETE_EX.increment();
GET_EX.increment();

RESPONSE_EX.increment();

Expand Down

0 comments on commit 9c65c71

Please sign in to comment.