Skip to content

Commit

Permalink
Fix warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
al8n committed Apr 14, 2024
1 parent 7fd6646 commit fa6f5a4
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 69 deletions.
6 changes: 3 additions & 3 deletions core/src/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use async_channel::{Receiver, Sender};
use async_lock::{Mutex, RwLock};

use atomic_refcell::AtomicRefCell;
use futures::{stream::FuturesUnordered, StreamExt};
use futures::stream::FuturesUnordered;
use nodecraft::{resolver::AddressResolver, CheapClone, Node};

use super::{
Expand Down Expand Up @@ -316,7 +316,7 @@ where
if let Err(e) = self.transport.shutdown().await {
tracing::error!(err=%e, "memberlist: failed to shutdown transport");
return Err(e);
}
}

Ok(())
}
Expand Down Expand Up @@ -472,7 +472,7 @@ where
&self,
shutdown_rx: Receiver<()>,
) -> <<T::Runtime as RuntimeLite>::Spawner as AsyncSpawner>::JoinHandle<()> {
use futures::FutureExt;
use futures::{FutureExt, StreamExt};

let queue_check_interval = self.inner.opts.queue_check_interval;
let this = self.clone();
Expand Down
22 changes: 6 additions & 16 deletions transports/net/src/packet_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,29 +356,19 @@ where
};
let keys = encryptor.keys().await;
if encrypted_message_size <= offload_size {
return Self::decrypt(
algo,
keys,
packet_label.as_bytes(),
&mut encrypted_message,
)
.and_then(|_| Self::read_from_packet_without_compression_and_encryption(encrypted_message));
return Self::decrypt(algo, keys, packet_label.as_bytes(), &mut encrypted_message).and_then(
|_| Self::read_from_packet_without_compression_and_encryption(encrypted_message),
);
}

let (tx, rx) = futures::channel::oneshot::channel();

rayon::spawn(move || {
if tx
.send(
Self::decrypt(
algo,
keys,
packet_label.as_bytes(),
&mut encrypted_message,
)
.and_then(|_| {
Self::read_from_packet_without_compression_and_encryption(encrypted_message)
}),
Self::decrypt(algo, keys, packet_label.as_bytes(), &mut encrypted_message).and_then(
|_| Self::read_from_packet_without_compression_and_encryption(encrypted_message),
),
)
.is_err()
{
Expand Down
50 changes: 0 additions & 50 deletions transports/quic/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,53 +601,3 @@ mod s2n_stream_layer {
Options::new("localhost".into(), p.join("cert.pem"), p.join("key.pem"))
}
}

#[cfg(test)]
mod test {
use agnostic::tokio::TokioRuntime;
use memberlist_core::transport::Lpe;
use nodecraft::resolver::socket_addr::SocketAddrResolver;

use crate::{quinn::Quinn, QuicTransport, QuicTransportOptions};

use super::*;

#[test]
fn test_shutdown_cleanup() {
let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
let layer = quinn_stream_layer::<TokioRuntime>().await;
let addr = "127.0.0.1:0".parse().unwrap();
let mut opts = QuicTransportOptions::with_stream_layer_options("test".into(), layer);
opts.add_bind_address(addr);
let transport = QuicTransport::<
SmolStr,
SocketAddrResolver<TokioRuntime>,
Quinn<TokioRuntime>,
Lpe<_, _>,
TokioRuntime,
>::new(opts)
.await
.unwrap();
let resolved_addr = *transport.advertise_address();

// drop the transport now
transport.shutdown().await;
drop(transport);

// we should be able to bind to the same address
let layer = quinn_stream_layer::<TokioRuntime>().await;
let mut opts = QuicTransportOptions::with_stream_layer_options("test".into(), layer);
opts.add_bind_address(resolved_addr);
let _ = QuicTransport::<
SmolStr,
SocketAddrResolver<TokioRuntime>,
Quinn<TokioRuntime>,
Lpe<_, _>,
TokioRuntime,
>::new(opts)
.await
.unwrap();
});
}
}

0 comments on commit fa6f5a4

Please sign in to comment.