diff --git a/client/Cargo.toml b/client/Cargo.toml index fda52bc2..bd0fb882 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -19,7 +19,7 @@ ash-window = "0.10.0" directories = "4.0.1" vk-shader-macros = "0.2.5" na = { package = "nalgebra", version = "0.19" } -tokio = { version = "0.2.13", features = ["rt-threaded", "sync", "macros"] } +tokio = { version = "1.18.2", features = ["rt-multi-thread", "sync", "macros"] } png = "0.17.5" anyhow = "1.0.26" whoami = "1.2.1" @@ -27,9 +27,9 @@ serde = { version = "1.0.104", features = ["derive", "rc"] } toml = "0.5.5" fxhash = "0.2.1" downcast-rs = "1.1.1" -quinn = "0.6.1" +quinn = "0.8.3" futures-util = "0.3.1" -rustls = { version = "0.17.0", features = ["dangerous_configuration"] } +rustls = { version = "0.20.6", features = ["dangerous_configuration"] } webpki = "0.21.0" hecs = "0.7.6" rcgen = { version = "0.9.2", default-features = false } diff --git a/client/src/lahar_deprecated/transfer.rs b/client/src/lahar_deprecated/transfer.rs index 21f135ee..445f3ccb 100644 --- a/client/src/lahar_deprecated/transfer.rs +++ b/client/src/lahar_deprecated/transfer.rs @@ -166,7 +166,7 @@ impl Reactor { let cmd = self.prepare(sender); op(&mut self.ctx, cmd); } - Err(TryRecvError::Closed) => return Err(self::Disconnected), + Err(TryRecvError::Disconnected) => return Err(self::Disconnected), Err(TryRecvError::Empty) => return Ok(()), } } diff --git a/client/src/loader.rs b/client/src/loader.rs index d4b95724..7d0f117e 100644 --- a/client/src/loader.rs +++ b/client/src/loader.rs @@ -51,10 +51,7 @@ pub struct Loader { impl Loader { pub fn new(cfg: Arc, gfx: Arc) -> Self { - let runtime = tokio::runtime::Builder::new() - .threaded_scheduler() - .build() - .unwrap(); + let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap(); let (send, recv) = mpsc::unbounded_channel(); let staging = StagingBuffer::new(gfx.device.clone(), &gfx.memory_properties, 32 * 1024 * 1024); @@ -161,7 +158,7 @@ impl Loader { self.runtime.spawn(async move { while let Some(x) = input_recv.recv().await { let shared = shared.clone(); - let mut out = output_send.clone(); + let out = output_send.clone(); tokio::spawn(async move { match shared.ctx.load(x).await { Ok(x) => { diff --git a/client/src/main.rs b/client/src/main.rs index ce339ab6..f70991f7 100644 --- a/client/src/main.rs +++ b/client/src/main.rs @@ -32,10 +32,8 @@ fn main() { let _guard = span.enter(); if let Err(e) = server::run( server::NetParams { - certificate_chain: quinn::CertificateChain::from_certs( - quinn::Certificate::from_der(&cert), - ), - private_key: quinn::PrivateKey::from_der(&key).unwrap(), + certificate_chain: vec![rustls::Certificate(cert)], + private_key: rustls::PrivateKey(key), socket, }, sim_cfg, diff --git a/client/src/net.rs b/client/src/net.rs index 3d0153c0..25e458bf 100644 --- a/client/src/net.rs +++ b/client/src/net.rs @@ -37,20 +37,19 @@ pub enum Message { ConnectionLost(Error), } -#[tokio::main(core_threads = 1)] +#[tokio::main(worker_threads = 1)] async fn run( cfg: Arc, incoming: mpsc::UnboundedSender, outgoing: mpsc::UnboundedReceiver, ) -> Result<()> { - let mut endpoint = quinn::Endpoint::builder(); - let mut client_cfg = quinn::ClientConfig::default(); - let tls_cfg = Arc::get_mut(&mut client_cfg.crypto).unwrap(); - tls_cfg - .dangerous() - .set_certificate_verifier(Arc::new(AcceptAnyCert)); - endpoint.default_client_config(client_cfg); - let (endpoint, _) = endpoint.bind(&"[::]:0".parse().unwrap())?; + let mut endpoint = quinn::Endpoint::client("[::]:0".parse().unwrap())?; + let crypto = rustls::ClientConfig::builder() + .with_safe_defaults() + .with_custom_certificate_verifier(Arc::new(AcceptAnyCert)) + .with_no_client_auth(); + let client_cfg = quinn::ClientConfig::new(Arc::new(crypto)); + endpoint.set_default_client_config(client_cfg); let result = inner(cfg, incoming, outgoing, endpoint.clone()).await; endpoint.wait_idle().await; @@ -68,7 +67,7 @@ async fn inner( connection, mut uni_streams, .. - } = endpoint.connect(&server, "localhost").unwrap().await?; + } = endpoint.connect(server, "localhost").unwrap().await?; // Open the first stream for our hello message let clienthello_stream = connection.open_uni().await?; @@ -138,14 +137,16 @@ async fn handle_unordered( struct AcceptAnyCert; -impl rustls::ServerCertVerifier for AcceptAnyCert { +impl rustls::client::ServerCertVerifier for AcceptAnyCert { fn verify_server_cert( &self, - _roots: &rustls::RootCertStore, - _presented_certs: &[rustls::Certificate], - _dns_name: webpki::DNSNameRef, + _end_entity: &rustls::Certificate, + _intermediates: &[rustls::Certificate], + _server_name: &rustls::ServerName, + _scts: &mut dyn Iterator, _ocsp_response: &[u8], - ) -> Result { - Ok(rustls::ServerCertVerified::assertion()) + _now: std::time::SystemTime, + ) -> Result { + Ok(rustls::client::ServerCertVerified::assertion()) } } diff --git a/common/Cargo.toml b/common/Cargo.toml index f86f8a48..7aca1658 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -13,7 +13,7 @@ serde = { version = "1.0.104", features = ["derive"] } na = { package = "nalgebra", version = "0.19", features = ["serde-serialize"] } bincode = "1.2.1" anyhow = "1.0.26" -quinn = "0.6.1" +quinn = "0.8.3" lazy_static = "1.4.0" fxhash = "0.2.1" tracing = "0.1.10" diff --git a/server/Cargo.toml b/server/Cargo.toml index 5cc35225..3814eec2 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -11,8 +11,9 @@ license = "Apache-2.0 OR Zlib" [dependencies] common = { path = "../common" } tracing = "0.1.10" -tokio = { version = "0.2", features = ["rt-threaded", "time", "macros", "stream", "sync"] } -quinn = "0.6.1" +tokio = { version = "1.18.2", features = ["rt-multi-thread", "time", "macros", "sync"] } +tokio-stream = "0.1.8" +quinn = { version = "0.8.3", features = ["rustls"] } serde = { version = "1.0.104", features = ["derive", "rc"] } toml = "0.5.5" anyhow = "1.0.26" @@ -24,3 +25,5 @@ rand = { version = "0.7.2", features = [ "small_rng" ] } fxhash = "0.2.1" na = { package = "nalgebra", version = "0.19" } slotmap = "1.0.6" +rustls = "0.20.6" +rustls-pemfile = "1.0.0" diff --git a/server/src/lib.rs b/server/src/lib.rs index e9b4f858..2b818e2e 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -12,6 +12,7 @@ use futures::{select, StreamExt, TryStreamExt}; use hecs::Entity; use slotmap::DenseSlotMap; use tokio::sync::mpsc; +use tokio_stream::wrappers::{IntervalStream, ReceiverStream}; use tracing::{debug, error, error_span, info, trace}; use common::{codec, proto, SimConfig}; @@ -19,20 +20,21 @@ use input_queue::InputQueue; use sim::Sim; pub struct NetParams { - pub certificate_chain: quinn::CertificateChain, - pub private_key: quinn::PrivateKey, + pub certificate_chain: Vec, + pub private_key: rustls::PrivateKey, pub socket: UdpSocket, } #[tokio::main] pub async fn run(net: NetParams, sim: SimConfig) -> Result<()> { - let mut server_config = quinn::ServerConfigBuilder::default(); - server_config - .certificate(net.certificate_chain, net.private_key) - .context("parsing certificate")?; - let mut endpoint = quinn::Endpoint::builder(); - endpoint.listen(server_config.build()); - let (endpoint, incoming) = endpoint.with_socket(net.socket)?; + let server_config = + quinn::ServerConfig::with_single_cert(net.certificate_chain, net.private_key) + .context("parsing certificate")?; + let (endpoint, incoming) = quinn::Endpoint::new( + quinn::EndpointConfig::default(), + Some(server_config), + net.socket, + )?; info!(address = %endpoint.local_addr().unwrap(), "listening"); let server = Server::new(sim); @@ -57,12 +59,15 @@ impl Server { } async fn run(mut self, incoming: quinn::Incoming) { - let mut ticks = tokio::time::interval(Duration::from_secs(1) / self.cfg.rate as u32).fuse(); + let mut ticks = IntervalStream::new(tokio::time::interval( + Duration::from_secs(1) / self.cfg.rate as u32, + )) + .fuse(); let mut incoming = incoming .inspect(|x| trace!(address = %x.remote_address(), "connection incoming")) .buffer_unordered(16); let (client_events_send, client_events) = mpsc::channel(128); - let mut client_events = client_events.fuse(); + let mut client_events = ReceiverStream::new(client_events).fuse(); loop { select! { _ = ticks.next() => { self.on_step(); }, @@ -130,7 +135,7 @@ impl Server { assert!(client.handles.is_none()); let snapshot = Arc::new(self.sim.snapshot()); let (id, entity) = self.sim.spawn_character(hello); - let (mut ordered_send, ordered_recv) = mpsc::channel(32); + let (ordered_send, ordered_recv) = mpsc::channel(32); ordered_send.try_send(snapshot).unwrap(); let (unordered_send, unordered_recv) = mpsc::channel(32); client.handles = Some(ClientHandles { @@ -232,7 +237,7 @@ async fn drive_send( conn: quinn::Connection, hello: proto::ServerHello, unordered: mpsc::Receiver, - mut ordered: mpsc::Receiver, + ordered: mpsc::Receiver, ) -> Result<()> { let mut stream = conn.open_uni().await?; codec::send(&mut stream, &hello).await?; @@ -242,6 +247,7 @@ async fn drive_send( let _ = drive_send_unordered(conn.clone(), unordered).await; }); + let mut ordered = ReceiverStream::new(ordered); while let Some(msg) = ordered.next().await { codec::send(&mut stream, &msg).await?; } @@ -251,8 +257,9 @@ async fn drive_send( async fn drive_send_unordered( conn: quinn::Connection, - mut msgs: mpsc::Receiver, + msgs: mpsc::Receiver, ) -> Result<()> { + let mut msgs = ReceiverStream::new(msgs); while let Some(msg) = msgs.next().await { let stream = conn.open_uni().await?; codec::send_whole(stream, &msg).await?; diff --git a/server/src/main.rs b/server/src/main.rs index 6af08721..9b0ec55e 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -3,7 +3,6 @@ mod config; use std::{fs, net::UdpSocket, path::Path}; use anyhow::{anyhow, Context, Result}; -use quinn::{Certificate, CertificateChain, PrivateKey}; use tracing::warn; use common::SimConfig; @@ -27,12 +26,21 @@ pub fn run() -> Result<()> { let (certificate_chain, private_key) = match (&cfg.certificate_chain, &cfg.private_key) { (&Some(ref certificate_chain), &Some(ref private_key)) => ( - CertificateChain::from_pem( - &fs::read(certificate_chain).context("reading certificate chain")?, + rustls_pemfile::certs( + &mut &*fs::read(certificate_chain).context("reading certificate chain")?, ) - .context("parsing certificate chain")?, - PrivateKey::from_pem(&fs::read(private_key).context("reading private key")?) - .context("parsing private key")?, + .context("parsing certificate chain")? + .into_iter() + .map(rustls::Certificate) + .collect(), + rustls_pemfile::pkcs8_private_keys( + &mut &*fs::read(private_key).context("reading private key")?, + ) + .context("parsing private key")? + .into_iter() + .map(rustls::PrivateKey) + .next() + .ok_or_else(|| anyhow!("no private key found with PKCS #8 format"))?, ), _ => { // TODO: Cache on disk @@ -50,10 +58,7 @@ pub fn run() -> Result<()> { .unwrap(); let key = cert.serialize_private_key_der(); let cert = cert.serialize_der().unwrap(); - ( - CertificateChain::from_certs(Certificate::from_der(&cert)), - PrivateKey::from_der(&key).unwrap(), - ) + (vec![rustls::Certificate(cert)], rustls::PrivateKey(key)) } };