Skip to content

Commit

Permalink
feat(netbench): add built-in drivers
Browse files Browse the repository at this point in the history
  • Loading branch information
camshaft committed Mar 7, 2022
1 parent 483667d commit bca2e3e
Show file tree
Hide file tree
Showing 18 changed files with 1,341 additions and 26 deletions.
25 changes: 25 additions & 0 deletions netbench/netbench-driver/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[package]
name = "netbench-driver"
version = "0.1.0"
authors = ["AWS s2n"]
edition = "2018"
license = "Apache-2.0"

[dependencies]
bytes = "1"
futures = "0.3"
netbench = { version = "0.1", path = "../netbench" }
s2n-quic = { version = "1", path = "../../quic/s2n-quic", features = ["provider-tls-s2n"] }
s2n-quic-core = { version = "0.1", path = "../../quic/s2n-quic-core", features = ["testing"] }
structopt = "0.3"
tokio = { version = "1", features = ["io-util", "net", "time"] }
tokio-native-tls = "0.3"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

# Pin to this version until s2n-tls supports OpenSSL 3.0
# Build the vendored version to make it easy to test in dev
#
# NOTE: The version of the `openssl-sys` crate is not the same as OpenSSL itself.
# Versions 1.0.1 - 3.0.0 are automatically discovered.
openssl-sys = { version = "<= 0.9.68", features = ["vendored"] }
22 changes: 22 additions & 0 deletions netbench/netbench-driver/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# netbench-driver

This crate contains drivers for each transport protocol.

## Running driver tests

Netbench will have a dedicated CLI to automate running tests. Until then, the drivers can be used directly.

```
export DRIVER=s2n-quic
cargo build --release --bin netbench-driver-$DRIVER-server --bin netbench-driver-$DRIVER-client --bin netbench-scenarios
./target/release/netbench-scenarios
./target/release/netbench-driver-$DRIVER-server ./target/netbench/request_response.json
```

In another terminal

```
export DRIVER=s2n-quic
export SERVER_0=localhost:4433
./target/release/netbench-driver-$DRIVER-client ./target/netbench/request_response.json
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

use netbench::{multiplex, scenario, Result};
use std::{collections::HashSet, future::Future, net::SocketAddr, pin::Pin, sync::Arc};
use structopt::StructOpt;
use tokio::{io::AsyncWriteExt, net::TcpStream};
use tokio_native_tls::{
native_tls::{Certificate, TlsConnector},
TlsStream,
};

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
Client::from_args().run().await
}

#[derive(Debug, StructOpt)]
pub struct Client {
#[structopt(flatten)]
opts: netbench_driver::Client,
}

impl Client {
pub async fn run(&self) -> Result<()> {
let addresses = self.opts.address_map().await?;
let scenario = self.opts.scenario();

let client = self.client()?;
let client = netbench::Client::new(client, &scenario, &addresses);
let mut trace = self.opts.trace();
let mut checkpoints = HashSet::new();
let mut timer = netbench::timer::Tokio::default();
client.run(&mut trace, &mut checkpoints, &mut timer).await?;

Ok(())
}

fn client(&self) -> Result<ClientImpl> {
let mut builder = TlsConnector::builder();
for ca in self.opts.certificate_authorities() {
let ca = Certificate::from_pem(ca.pem.as_bytes())?;
builder.add_root_certificate(ca);
}
let connector = builder.build()?;
let connector: tokio_native_tls::TlsConnector = connector.into();
let connector = Arc::new(connector);

let config = multiplex::Config::default();

Ok(ClientImpl { config, connector })
}
}

type Connection<'a> = netbench::Driver<'a, multiplex::Connection<TlsStream<TcpStream>>>;

#[derive(Clone, Debug)]
struct ClientImpl {
config: multiplex::Config,
connector: Arc<tokio_native_tls::TlsConnector>,
}

impl<'a> netbench::client::Client<'a> for ClientImpl {
type Connect = Pin<Box<dyn Future<Output = Result<Self::Connection>> + 'a>>;
type Connection = Connection<'a>;

fn connect(
&mut self,
addr: SocketAddr,
server_name: &str,
server_conn_id: u64,
scenario: &'a Arc<scenario::Connection>,
) -> Self::Connect {
let config = self.config.clone();
let connector = self.connector.clone();
let server_name = server_name.to_string();

let fut = async move {
let conn = TcpStream::connect(addr).await?;
let mut conn = connector.connect(&server_name, conn).await?;

// The native-tls crate does not expose the server name on the server so we need to
// write the connection id for now.
conn.write_u64(server_conn_id).await?;

let conn = Box::pin(conn);
let conn = multiplex::Connection::new(conn, config);
let conn: Connection = netbench::Driver::new(scenario, conn);

Result::Ok(conn)
};

Box::pin(fut)
}
}
101 changes: 101 additions & 0 deletions netbench/netbench-driver/src/bin/netbench-driver-native-tls-server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

use netbench::{multiplex, scenario, Result, Timer};
use std::{collections::HashSet, sync::Arc};
use structopt::StructOpt;
use tokio::{
io::AsyncReadExt,
net::{TcpListener, TcpStream},
spawn,
};
use tokio_native_tls::native_tls::{Identity, TlsAcceptor};

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
Server::from_args().run().await
}

#[derive(Debug, StructOpt)]
pub struct Server {
#[structopt(flatten)]
opts: netbench_driver::Server,
}

impl Server {
pub async fn run(&self) -> Result<()> {
let scenario = self.opts.scenario();

let server = self.server().await?;

let trace = self.opts.trace();
let ident = self.identity()?;
let acceptor = TlsAcceptor::builder(ident).build()?;
let acceptor: tokio_native_tls::TlsAcceptor = acceptor.into();
let acceptor = Arc::new(acceptor);

// TODO load configuration from scenario
let config = netbench::multiplex::Config::default();

let mut conn_id = 0;
loop {
let (connection, _addr) = server.accept().await?;
let scenario = scenario.clone();
let id = conn_id;
conn_id += 1;
let acceptor = acceptor.clone();
let trace = trace.clone();
let config = config.clone();
spawn(async move {
if let Err(err) =
handle_connection(acceptor, connection, id, scenario, trace, config).await
{
eprintln!("error: {}", err);
}
});
}

async fn handle_connection(
acceptor: Arc<tokio_native_tls::TlsAcceptor>,
connection: TcpStream,
conn_id: u64,
scenario: Arc<scenario::Server>,
mut trace: impl netbench::Trace,
config: multiplex::Config,
) -> Result<()> {
let mut connection = acceptor.accept(connection).await?;
let server_idx = connection.read_u64().await?;
let scenario = scenario
.connections
.get(server_idx as usize)
.ok_or("invalid connection id")?;
let connection = Box::pin(connection);

let conn = netbench::Driver::new(
scenario,
netbench::multiplex::Connection::new(connection, config),
);

let mut checkpoints = HashSet::new();
let mut timer = netbench::timer::Tokio::default();

trace.enter(timer.now(), conn_id, 0);
conn.run(&mut trace, &mut checkpoints, &mut timer).await?;
trace.exit(timer.now());

Ok(())
}
}

async fn server(&self) -> Result<TcpListener> {
let server = TcpListener::bind((self.opts.ip, self.opts.port)).await?;
Ok(server)
}

fn identity(&self) -> Result<Identity> {
let (_, private_key) = self.opts.certificate();
let cert = &private_key.pkcs12;
let ident = Identity::from_pkcs12(cert, "")?;
Ok(ident)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

use netbench::Result;
use s2n_quic::provider::{io, tls::default::certificate::IntoCertificate};
use std::collections::HashSet;
use structopt::StructOpt;

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), String> {
Client::from_args().run().await.map_err(|e| e.to_string())
}

#[derive(Debug, StructOpt)]
pub struct Client {
#[structopt(flatten)]
opts: netbench_driver::Client,

#[structopt(long)]
disable_gso: bool,
}

impl Client {
pub async fn run(&self) -> Result<()> {
let addresses = self.opts.address_map().await?;
let scenario = self.opts.scenario();

let client = self.client()?;
let client = netbench::Client::new(client, &scenario, &addresses);
let mut trace = self.opts.trace();
let mut checkpoints = HashSet::new();
let mut timer = netbench::timer::Tokio::default();
let mut client = client.run(&mut trace, &mut checkpoints, &mut timer).await?;

client.wait_idle().await?;

Ok(())
}

fn client(&self) -> Result<s2n_quic::Client> {
let mut tls = s2n_quic::provider::tls::default::Client::builder()
// handle larger cert chains
.with_max_cert_chain_depth(10)?
.with_application_protocols(
self.opts.application_protocols.iter().map(String::as_bytes),
)?
.with_key_logging()?;

for ca in self.opts.certificate_authorities() {
tls = tls.with_certificate(ca.pem.as_str())?;
}

let tls = tls.build()?;

let mut io_builder =
io::Default::builder().with_receive_address((self.opts.local_ip, 0u16).into())?;

if self.disable_gso {
io_builder = io_builder.with_gso_disabled()?;
}

let io = io_builder.build()?;

let client = s2n_quic::Client::builder()
.with_io(io)?
.with_tls(tls)?
.start()
.unwrap();

Ok(client)
}
}
Loading

0 comments on commit bca2e3e

Please sign in to comment.