Skip to content

Commit

Permalink
fix for momento http with large payloads (iopsystems#288)
Browse files Browse the repository at this point in the history
Fixes the handling of large http payloads with the Momento HTTP API
by making sure we release capacity back to the connection and
stream when each chunk is read.

Adds the ability to log TLS keys to a key log file for easier
troubleshooting with packet captures.
  • Loading branch information
brayniac authored Oct 8, 2024
1 parent 5c05279 commit 7bbe5d1
Showing 1 changed file with 56 additions and 18 deletions.
74 changes: 56 additions & 18 deletions src/clients/cache/momento/http.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::clients::common::*;
use crate::workload::{ClientRequest, ClientWorkItemKind};
use crate::*;
use rustls::KeyLogFile;

use async_channel::Receiver;
use bytes::{Bytes, BytesMut};
Expand Down Expand Up @@ -56,11 +57,13 @@ pub async fn pool_manager(endpoint: String, _config: Config, queue: Queue<SendRe
roots: webpki_roots::TLS_SERVER_ROOTS.into(),
};

let config = Arc::new(
rustls::ClientConfig::builder()
.with_root_certificates(root_store)
.with_no_client_auth(),
);
let mut config = rustls::ClientConfig::builder()
.with_root_certificates(root_store)
.with_no_client_auth();

config.key_log = Arc::new(KeyLogFile::new());

let config = Arc::new(config);

while RUNNING.load(Ordering::Relaxed) {
if client.is_none() {
Expand All @@ -79,7 +82,9 @@ pub async fn pool_manager(endpoint: String, _config: Config, queue: Queue<SendRe
.await
.unwrap();

if let Ok((h2, connection)) = ::h2::client::handshake(stream).await {
let client_builder = ::h2::client::Builder::new().handshake(stream);

if let Ok((h2, connection)) = client_builder.await {
tokio::spawn(async move {
let _ = connection.await;
});
Expand Down Expand Up @@ -175,7 +180,7 @@ async fn task(
Ok((response, _)) => {
let response = response.await;

let response = match response {
let mut response = match response {
Ok(r) => r,
Err(_e) => {
GET_EX.increment();
Expand All @@ -191,11 +196,22 @@ async fn task(
// read the response body to completion

let mut buffer = BytesMut::new();
let mut body = response.into_body();

while let Some(chunk) = body.data().await {
while let Some(chunk) = response.body_mut().data().await {
if let Ok(b) = chunk {
buffer.extend_from_slice(&b);
if response
.body_mut()
.flow_control()
.release_capacity(b.len())
.is_err()
{
GET_EX.increment();

RESPONSE_EX.increment();

continue;
}
} else {
GET_EX.increment();

Expand Down Expand Up @@ -266,10 +282,10 @@ async fn task(

let response = response.await;

let response = match response {
let mut response = match response {
Ok(r) => r,
Err(_e) => {
GET_EX.increment();
SET_EX.increment();

RESPONSE_EX.increment();

Expand All @@ -282,13 +298,25 @@ async fn task(
// read the response body to completion

let mut buffer = BytesMut::new();
let mut body = response.into_body();

while let Some(chunk) = body.data().await {
while let Some(chunk) = response.body_mut().data().await {
if let Ok(b) = chunk {
// info!("chunk for set: {}", b.len());
buffer.extend_from_slice(&b);
if response
.body_mut()
.flow_control()
.release_capacity(b.len())
.is_err()
{
SET_EX.increment();

RESPONSE_EX.increment();

continue;
}
} else {
GET_EX.increment();
SET_EX.increment();

RESPONSE_EX.increment();

Expand Down Expand Up @@ -336,7 +364,7 @@ async fn task(
Ok((response, _)) => {
let response = response.await;

let response = match response {
let mut response = match response {
Ok(r) => r,
Err(_e) => {
GET_EX.increment();
Expand All @@ -352,13 +380,23 @@ async fn task(
// read the response body to completion

let mut buffer = BytesMut::new();
let mut body = response.into_body();

while let Some(chunk) = body.data().await {
while let Some(chunk) = response.body_mut().data().await {
if let Ok(b) = chunk {
buffer.extend_from_slice(&b);
if response
.body_mut()
.flow_control()
.release_capacity(b.len())
.is_err()
{
DELETE_EX.increment();
RESPONSE_EX.increment();

continue;
}
} else {
GET_EX.increment();
DELETE_EX.increment();

RESPONSE_EX.increment();

Expand Down

0 comments on commit 7bbe5d1

Please sign in to comment.