Skip to content

Commit

Permalink
Add large volume test
Browse files Browse the repository at this point in the history
  • Loading branch information
marc-casperlabs committed Jan 22, 2024
1 parent a78a7fd commit f49741c
Showing 1 changed file with 187 additions and 4 deletions.
191 changes: 187 additions & 4 deletions src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -847,12 +847,12 @@ where

#[cfg(test)]
mod tests {
use std::{collections::BinaryHeap, sync::Arc, time::Duration};
use std::{collections::BinaryHeap, iter, sync::Arc, time::Duration};

use bytes::Bytes;
use futures::FutureExt;
use tokio::io::{DuplexStream, ReadHalf, WriteHalf};
use tracing::{span, Instrument, Level};
use tracing::{error_span, info, span, Instrument, Level};

use crate::{
io::IoCoreBuilder,
Expand Down Expand Up @@ -1177,18 +1177,20 @@ mod tests {
payload_steps: u32,
payload_range: u32,
pipe_buffer: usize,
min_send_bytes: usize,
timeout: Duration,
}

impl<const N: usize> Default for LargeVolumeTestSpec<N> {
fn default() -> Self {
Self {
max_frame_size: 37,
request_limit: 5,
request_limit: 3,
payload_steps: 20,
payload_range: 10,
pipe_buffer: 80,
timeout: Duration::from_secs(10),
min_send_bytes: 1024 * 1024,
timeout: Duration::from_millis(250),
}
}
}
Expand Down Expand Up @@ -1282,4 +1284,185 @@ mod tests {
assert_eq!(resp, pl);
}
}

#[tokio::test]
async fn run_large_volume_test_single_channel_single_request() {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init()
.ok();

let spec = LargeVolumeTestSpec {
request_limit: 1,
..Default::default()
};

large_volume_test::<1>(&spec).await;
}

async fn large_volume_test<const N: usize>(spec: &LargeVolumeTestSpec<N>) {
// Our setup is as follows:
//
// 1. All messages are `ACK`'d with empty responses.
// 2. Alice will send a constant stream of small messages to Bob.
// 3. Bob will send a larger message every time he receives a small message from Alice, on
// the same channel.

let channel_ids: Vec<ChannelId> = (0..N).map(|id| ChannelId::new(id as u8)).collect();

let (mut alice, mut bob) = LargeVolumeTestSpec::<N>::default().mk_rpc();

// Alice server. Will close the connection after enough bytes have been sent.
let mut remaining = spec.min_send_bytes;
let alice_server = tokio::spawn(
async move {
while let Some(request) = alice
.server
.next_request()
.await
.expect("next request failed")
{
let payload_size = request
.payload()
.as_ref()
.expect("should have payload in bobs request")
.len();
// Just discard the message payload, but acknowledge receiving it.
request.respond(None);

remaining = remaining.saturating_sub(payload_size);
if remaining == 0 {
// We've reached the volume we were looking for, end test.
break;
}
}

info!("exiting");
}
.instrument(error_span!("alice_server")),
);

let small_payload: Bytes = iter::repeat(0xFF)
.take(spec.payload_steps as usize / 2)
.collect::<Vec<u8>>()
.into();

// Alice client. Will shut down once bob closes the connection.
let alice_client = tokio::spawn(
async move {
let mut next_channel = channel_ids.iter().cloned().cycle();

let mut alice_counter = 0;
loop {
let small_request = alice
.client
.create_request(next_channel.next().unwrap())
.with_payload(small_payload.clone())
.queue_for_sending()
.await;
info!(alice_counter, "alice enqueued request");
alice_counter += 1;

match small_request.try_get_response() {
Ok(Ok(_)) => {
// A surprise to be sure, but a welcome one (very fast answer).
}
Ok(Err(err)) => match err {
RequestError::RemoteClosed(_) | RequestError::Shutdown => break,
RequestError::TimedOut
| RequestError::TimeoutOverflow(_)
| RequestError::RemoteCancelled
| RequestError::Cancelled
| RequestError::Error(_) => {
panic!("{}", err);
}
},

Err(guard) => {
// Not ready, but we are not going to wait.
tokio::spawn(async move {
if let Err(err) = guard.wait_for_response().await {
match err {
RequestError::RemoteClosed(_) | RequestError::Shutdown => {}
err => panic!("{}", err),
}
}
});
}
}
}

info!("exiting");
}
.instrument(error_span!("alice_client")),
);

// Bob server.
let todo_payload: Bytes = iter::repeat(0xFF)
.take(spec.payload_steps as usize / 2)
.collect::<Vec<u8>>()
.into();
let bob_server = tokio::spawn(
async move {
let mut bob_counter = 0;
while let Some(request) = bob
.server
.next_request()
.await
.expect("next request failed")
{
let channel = request.channel();
// Just discard the message payload, but acknowledge receiving it.
request.respond(None);

// Send another request back.
let bobs_request: RequestGuard = bob
.client
.create_request(channel)
.with_payload(todo_payload.clone())
.queue_for_sending()
.await;

info!(bob_counter, "bob enqueued request");
bob_counter += 1;

match bobs_request.try_get_response() {
Ok(Ok(_)) => {}
Ok(Err(err)) => match err {
RequestError::RemoteClosed(_) | RequestError::Shutdown => break,
RequestError::TimedOut
| RequestError::TimeoutOverflow(_)
| RequestError::RemoteCancelled
| RequestError::Cancelled
| RequestError::Error(_) => {
panic!("{}", err);
}
},

Err(guard) => {
// Do not wait, instead attempt to retrieve next request.
tokio::spawn(async move {
if let Err(err) = guard.wait_for_response().await {
match err {
RequestError::RemoteClosed(_) | RequestError::Shutdown => {}
err => panic!("{}", err),
}
}
});
}
}
// TODO: Put in substantial payload.
}

info!("exiting");
}
.instrument(error_span!("bob_server")),
);

alice_server.await.expect("failed to join alice server");
alice_client.await.expect("failed to join alice client");
bob_server.await.expect("failed to join bob server");

info!("all joined");
}
}

0 comments on commit f49741c

Please sign in to comment.