Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tokio and quinn upgrade #220

Merged
merged 3 commits into from
Jun 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@ ash-window = "0.10.0"
directories = "4.0.1"
vk-shader-macros = "0.2.5"
na = { package = "nalgebra", version = "0.19" }
tokio = { version = "0.2.13", features = ["rt-threaded", "sync", "macros"] }
tokio = { version = "1.18.2", features = ["rt-multi-thread", "sync", "macros"] }
png = "0.17.5"
anyhow = "1.0.26"
whoami = "1.2.1"
serde = { version = "1.0.104", features = ["derive", "rc"] }
toml = "0.5.5"
fxhash = "0.2.1"
downcast-rs = "1.1.1"
quinn = "0.6.1"
quinn = "0.8.3"
futures-util = "0.3.1"
rustls = { version = "0.17.0", features = ["dangerous_configuration"] }
rustls = { version = "0.20.6", features = ["dangerous_configuration"] }
webpki = "0.21.0"
hecs = "0.7.6"
rcgen = { version = "0.9.2", default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion client/src/lahar_deprecated/transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ impl Reactor {
let cmd = self.prepare(sender);
op(&mut self.ctx, cmd);
}
Err(TryRecvError::Closed) => return Err(self::Disconnected),
Err(TryRecvError::Disconnected) => return Err(self::Disconnected),
Err(TryRecvError::Empty) => return Ok(()),
}
}
Expand Down
7 changes: 2 additions & 5 deletions client/src/loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,7 @@ pub struct Loader {

impl Loader {
pub fn new(cfg: Arc<Config>, gfx: Arc<Base>) -> Self {
let runtime = tokio::runtime::Builder::new()
.threaded_scheduler()
.build()
.unwrap();
let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap();
let (send, recv) = mpsc::unbounded_channel();
let staging =
StagingBuffer::new(gfx.device.clone(), &gfx.memory_properties, 32 * 1024 * 1024);
Expand Down Expand Up @@ -161,7 +158,7 @@ impl Loader {
self.runtime.spawn(async move {
while let Some(x) = input_recv.recv().await {
let shared = shared.clone();
let mut out = output_send.clone();
let out = output_send.clone();
tokio::spawn(async move {
match shared.ctx.load(x).await {
Ok(x) => {
Expand Down
6 changes: 2 additions & 4 deletions client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,8 @@ fn main() {
let _guard = span.enter();
if let Err(e) = server::run(
server::NetParams {
certificate_chain: quinn::CertificateChain::from_certs(
quinn::Certificate::from_der(&cert),
),
private_key: quinn::PrivateKey::from_der(&key).unwrap(),
certificate_chain: vec![rustls::Certificate(cert)],
private_key: rustls::PrivateKey(key),
socket,
},
sim_cfg,
Expand Down
33 changes: 17 additions & 16 deletions client/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,19 @@ pub enum Message {
ConnectionLost(Error),
}

#[tokio::main(core_threads = 1)]
#[tokio::main(worker_threads = 1)]
async fn run(
cfg: Arc<Config>,
incoming: mpsc::UnboundedSender<Message>,
outgoing: mpsc::UnboundedReceiver<proto::Command>,
) -> Result<()> {
let mut endpoint = quinn::Endpoint::builder();
let mut client_cfg = quinn::ClientConfig::default();
let tls_cfg = Arc::get_mut(&mut client_cfg.crypto).unwrap();
tls_cfg
.dangerous()
.set_certificate_verifier(Arc::new(AcceptAnyCert));
endpoint.default_client_config(client_cfg);
let (endpoint, _) = endpoint.bind(&"[::]:0".parse().unwrap())?;
let mut endpoint = quinn::Endpoint::client("[::]:0".parse().unwrap())?;
let crypto = rustls::ClientConfig::builder()
.with_safe_defaults()
.with_custom_certificate_verifier(Arc::new(AcceptAnyCert))
.with_no_client_auth();
let client_cfg = quinn::ClientConfig::new(Arc::new(crypto));
endpoint.set_default_client_config(client_cfg);

let result = inner(cfg, incoming, outgoing, endpoint.clone()).await;
endpoint.wait_idle().await;
Expand All @@ -68,7 +67,7 @@ async fn inner(
connection,
mut uni_streams,
..
} = endpoint.connect(&server, "localhost").unwrap().await?;
} = endpoint.connect(server, "localhost").unwrap().await?;

// Open the first stream for our hello message
let clienthello_stream = connection.open_uni().await?;
Expand Down Expand Up @@ -138,14 +137,16 @@ async fn handle_unordered(

struct AcceptAnyCert;

impl rustls::ServerCertVerifier for AcceptAnyCert {
impl rustls::client::ServerCertVerifier for AcceptAnyCert {
fn verify_server_cert(
&self,
_roots: &rustls::RootCertStore,
_presented_certs: &[rustls::Certificate],
_dns_name: webpki::DNSNameRef,
_end_entity: &rustls::Certificate,
_intermediates: &[rustls::Certificate],
_server_name: &rustls::ServerName,
_scts: &mut dyn Iterator<Item = &[u8]>,
_ocsp_response: &[u8],
) -> Result<rustls::ServerCertVerified, rustls::TLSError> {
Ok(rustls::ServerCertVerified::assertion())
_now: std::time::SystemTime,
) -> Result<rustls::client::ServerCertVerified, rustls::Error> {
Ok(rustls::client::ServerCertVerified::assertion())
}
}
2 changes: 1 addition & 1 deletion common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ serde = { version = "1.0.104", features = ["derive"] }
na = { package = "nalgebra", version = "0.19", features = ["serde-serialize"] }
bincode = "1.2.1"
anyhow = "1.0.26"
quinn = "0.6.1"
quinn = "0.8.3"
lazy_static = "1.4.0"
fxhash = "0.2.1"
tracing = "0.1.10"
Expand Down
7 changes: 5 additions & 2 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ license = "Apache-2.0 OR Zlib"
[dependencies]
common = { path = "../common" }
tracing = "0.1.10"
tokio = { version = "0.2", features = ["rt-threaded", "time", "macros", "stream", "sync"] }
quinn = "0.6.1"
tokio = { version = "1.18.2", features = ["rt-multi-thread", "time", "macros", "sync"] }
tokio-stream = "0.1.8"
quinn = { version = "0.8.3", features = ["rustls"] }
serde = { version = "1.0.104", features = ["derive", "rc"] }
toml = "0.5.5"
anyhow = "1.0.26"
Expand All @@ -24,3 +25,5 @@ rand = { version = "0.7.2", features = [ "small_rng" ] }
fxhash = "0.2.1"
na = { package = "nalgebra", version = "0.19" }
slotmap = "1.0.6"
rustls = "0.20.6"
rustls-pemfile = "1.0.0"
35 changes: 21 additions & 14 deletions server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,29 @@ use futures::{select, StreamExt, TryStreamExt};
use hecs::Entity;
use slotmap::DenseSlotMap;
use tokio::sync::mpsc;
use tokio_stream::wrappers::{IntervalStream, ReceiverStream};
use tracing::{debug, error, error_span, info, trace};

use common::{codec, proto, SimConfig};
use input_queue::InputQueue;
use sim::Sim;

pub struct NetParams {
pub certificate_chain: quinn::CertificateChain,
pub private_key: quinn::PrivateKey,
pub certificate_chain: Vec<rustls::Certificate>,
pub private_key: rustls::PrivateKey,
pub socket: UdpSocket,
}

#[tokio::main]
pub async fn run(net: NetParams, sim: SimConfig) -> Result<()> {
let mut server_config = quinn::ServerConfigBuilder::default();
server_config
.certificate(net.certificate_chain, net.private_key)
.context("parsing certificate")?;
let mut endpoint = quinn::Endpoint::builder();
endpoint.listen(server_config.build());
let (endpoint, incoming) = endpoint.with_socket(net.socket)?;
let server_config =
quinn::ServerConfig::with_single_cert(net.certificate_chain, net.private_key)
.context("parsing certificate")?;
let (endpoint, incoming) = quinn::Endpoint::new(
quinn::EndpointConfig::default(),
Some(server_config),
net.socket,
)?;
info!(address = %endpoint.local_addr().unwrap(), "listening");

let server = Server::new(sim);
Expand All @@ -57,12 +59,15 @@ impl Server {
}

async fn run(mut self, incoming: quinn::Incoming) {
let mut ticks = tokio::time::interval(Duration::from_secs(1) / self.cfg.rate as u32).fuse();
let mut ticks = IntervalStream::new(tokio::time::interval(
Duration::from_secs(1) / self.cfg.rate as u32,
))
.fuse();
let mut incoming = incoming
.inspect(|x| trace!(address = %x.remote_address(), "connection incoming"))
.buffer_unordered(16);
let (client_events_send, client_events) = mpsc::channel(128);
let mut client_events = client_events.fuse();
let mut client_events = ReceiverStream::new(client_events).fuse();
loop {
select! {
_ = ticks.next() => { self.on_step(); },
Expand Down Expand Up @@ -130,7 +135,7 @@ impl Server {
assert!(client.handles.is_none());
let snapshot = Arc::new(self.sim.snapshot());
let (id, entity) = self.sim.spawn_character(hello);
let (mut ordered_send, ordered_recv) = mpsc::channel(32);
let (ordered_send, ordered_recv) = mpsc::channel(32);
ordered_send.try_send(snapshot).unwrap();
let (unordered_send, unordered_recv) = mpsc::channel(32);
client.handles = Some(ClientHandles {
Expand Down Expand Up @@ -232,7 +237,7 @@ async fn drive_send(
conn: quinn::Connection,
hello: proto::ServerHello,
unordered: mpsc::Receiver<Unordered>,
mut ordered: mpsc::Receiver<Ordered>,
ordered: mpsc::Receiver<Ordered>,
) -> Result<()> {
let mut stream = conn.open_uni().await?;
codec::send(&mut stream, &hello).await?;
Expand All @@ -242,6 +247,7 @@ async fn drive_send(
let _ = drive_send_unordered(conn.clone(), unordered).await;
});

let mut ordered = ReceiverStream::new(ordered);
while let Some(msg) = ordered.next().await {
codec::send(&mut stream, &msg).await?;
}
Expand All @@ -251,8 +257,9 @@ async fn drive_send(

async fn drive_send_unordered(
conn: quinn::Connection,
mut msgs: mpsc::Receiver<Unordered>,
msgs: mpsc::Receiver<Unordered>,
) -> Result<()> {
let mut msgs = ReceiverStream::new(msgs);
while let Some(msg) = msgs.next().await {
let stream = conn.open_uni().await?;
codec::send_whole(stream, &msg).await?;
Expand Down
25 changes: 15 additions & 10 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ mod config;
use std::{fs, net::UdpSocket, path::Path};

use anyhow::{anyhow, Context, Result};
use quinn::{Certificate, CertificateChain, PrivateKey};
use tracing::warn;

use common::SimConfig;
Expand All @@ -27,12 +26,21 @@ pub fn run() -> Result<()> {

let (certificate_chain, private_key) = match (&cfg.certificate_chain, &cfg.private_key) {
(&Some(ref certificate_chain), &Some(ref private_key)) => (
CertificateChain::from_pem(
&fs::read(certificate_chain).context("reading certificate chain")?,
rustls_pemfile::certs(
&mut &*fs::read(certificate_chain).context("reading certificate chain")?,
)
.context("parsing certificate chain")?,
PrivateKey::from_pem(&fs::read(private_key).context("reading private key")?)
.context("parsing private key")?,
.context("parsing certificate chain")?
.into_iter()
.map(rustls::Certificate)
.collect(),
rustls_pemfile::pkcs8_private_keys(
&mut &*fs::read(private_key).context("reading private key")?,
)
.context("parsing private key")?
.into_iter()
.map(rustls::PrivateKey)
.next()
.ok_or_else(|| anyhow!("no private key found with PKCS #8 format"))?,
),
_ => {
// TODO: Cache on disk
Expand All @@ -50,10 +58,7 @@ pub fn run() -> Result<()> {
.unwrap();
let key = cert.serialize_private_key_der();
let cert = cert.serialize_der().unwrap();
(
CertificateChain::from_certs(Certificate::from_der(&cert)),
PrivateKey::from_der(&key).unwrap(),
)
(vec![rustls::Certificate(cert)], rustls::PrivateKey(key))
}
};

Expand Down