diff --git a/Cargo.lock b/Cargo.lock index 16bd70ac..a53f69c4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -93,12 +93,6 @@ dependencies = [ "syn 2.0.29", ] -[[package]] -name = "autocfg" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" - [[package]] name = "base64" version = "0.13.1" @@ -362,96 +356,6 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6999dc1837253364c2ebb0704ba97994bd874e8f195d665c50b7548f6ea92764" -[[package]] -name = "futures" -version = "0.3.28" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" -dependencies = [ - "futures-channel", - "futures-core", - "futures-executor", - "futures-io", - "futures-sink", - "futures-task", - "futures-util", -] - -[[package]] -name = "futures-channel" -version = "0.3.28" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" -dependencies = [ - "futures-core", - "futures-sink", -] - -[[package]] -name = "futures-core" -version = "0.3.28" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" - -[[package]] -name = "futures-executor" -version = "0.3.28" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0" -dependencies = [ - "futures-core", - "futures-task", - "futures-util", - "num_cpus", -] - -[[package]] -name = "futures-io" -version = "0.3.28" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" - -[[package]] -name = "futures-macro" -version = "0.3.28" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.29", -] - -[[package]] -name = "futures-sink" -version = "0.3.28" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e" - -[[package]] -name = "futures-task" -version = "0.3.28" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" - -[[package]] -name = "futures-util" -version = "0.3.28" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" -dependencies = [ - "futures-channel", - "futures-core", - "futures-io", - "futures-macro", - "futures-sink", - "futures-task", - "memchr", - "pin-project-lite", - "pin-utils", - "slab", -] - [[package]] name = "generic-array" version = "0.14.7" @@ -665,16 +569,6 @@ dependencies = [ "minimal-lexical", ] -[[package]] -name = "num_cpus" -version = "1.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" -dependencies = [ - "hermit-abi", - "libc", -] - [[package]] name = "num_enum" version = "0.7.0" @@ -769,18 +663,6 @@ dependencies = [ "sha2", ] -[[package]] -name = "pin-project-lite" -version = "0.2.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12cc1b0bf1727a77a54b6654e7b5f1af8604923edc8b81885f8ec92f9e3f0a05" - -[[package]] -name = "pin-utils" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" - [[package]] name = "pkg-config" version = "0.3.27" @@ -996,15 +878,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "43b2853a4d09f215c24cc5489c992ce46052d359b5109343cbafbf26bc62f8a3" -[[package]] -name = "slab" -version = "0.4.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" -dependencies = [ - "autocfg", -] - [[package]] name = "strsim" version = "0.10.0" @@ -1299,7 +1172,6 @@ dependencies = [ "config", "env_logger", "epoll", - "futures", "log", "serde", "serde_yaml", diff --git a/crates/vsock/Cargo.toml b/crates/vsock/Cargo.toml index c4371f33..3788e637 100644 --- a/crates/vsock/Cargo.toml +++ b/crates/vsock/Cargo.toml @@ -17,7 +17,6 @@ byteorder = "1" clap = { version = "4.4", features = ["derive"] } env_logger = "0.10" epoll = "4.3.2" -futures = { version = "0.3", features = ["thread-pool"] } log = "0.4" thiserror = "1.0" vhost = { version = "0.8", features = ["vhost-user-slave"] } diff --git a/crates/vsock/src/vhu_vsock.rs b/crates/vsock/src/vhu_vsock.rs index e67cc165..34ef99a6 100644 --- a/crates/vsock/src/vhu_vsock.rs +++ b/crates/vsock/src/vhu_vsock.rs @@ -115,8 +115,6 @@ pub(crate) enum Error { IterateQueue, #[error("No rx request available")] NoRequestRx, - #[error("Unable to create thread pool")] - CreateThreadPool(std::io::Error), #[error("Packet missing data buffer")] PktBufMissing, #[error("Failed to connect to unix socket")] diff --git a/crates/vsock/src/vhu_vsock_thread.rs b/crates/vsock/src/vhu_vsock_thread.rs index b0fe7740..fcefc4a4 100644 --- a/crates/vsock/src/vhu_vsock_thread.rs +++ b/crates/vsock/src/vhu_vsock_thread.rs @@ -12,10 +12,11 @@ use std::{ net::{UnixListener, UnixStream}, prelude::{AsRawFd, FromRawFd, RawFd}, }, - sync::{Arc, RwLock}, + sync::mpsc::Sender, + sync::{mpsc, Arc, RwLock}, + thread, }; -use futures::executor::{ThreadPool, ThreadPoolBuilder}; use log::warn; use vhost_user_backend::{VringEpollHandler, VringRwLock, VringT}; use virtio_queue::QueueOwnedT; @@ -42,6 +43,15 @@ enum RxQueueType { Standard, RawPkts, } + +// Data which is required by a worker handling event idx. +struct EventData { + vring: VringRwLock, + event_idx: bool, + head_idx: u16, + used_len: usize, +} + pub(crate) struct VhostUserVsockThread { /// Guest memory map. pub mem: Option>, @@ -61,8 +71,8 @@ pub(crate) struct VhostUserVsockThread { pub thread_backend: VsockThreadBackend, /// CID of the guest. guest_cid: u64, - /// Thread pool to handle event idx. - pool: ThreadPool, + /// Channel to a worker which handles event idx. + sender: Sender, /// host side port on which application listens. local_port: Wrapping, /// The tx buffer size @@ -126,7 +136,15 @@ impl VhostUserVsockThread { ), ); } - + let (sender, receiver) = mpsc::channel::(); + thread::spawn(move || loop { + // TODO: Understand why doing the following in the background thread works. + // maybe we'd better have thread pool for the entire application if necessary. + let Ok(event_data) = receiver.recv() else { + break; + }; + Self::vring_handle_event(event_data); + }); let thread = VhostUserVsockThread { mem: None, event_idx: false, @@ -137,10 +155,7 @@ impl VhostUserVsockThread { epoll_file, thread_backend, guest_cid, - pool: ThreadPoolBuilder::new() - .pool_size(1) - .create() - .map_err(Error::CreateThreadPool)?, + sender, local_port: Wrapping(0), tx_buffer_size, sibling_event_fd, @@ -152,6 +167,37 @@ impl VhostUserVsockThread { Ok(thread) } + fn vring_handle_event(event_data: EventData) { + if event_data.event_idx { + if event_data + .vring + .add_used(event_data.head_idx, event_data.used_len as u32) + .is_err() + { + warn!("Could not return used descriptors to ring"); + } + match event_data.vring.needs_notification() { + Err(_) => { + warn!("Could not check if queue needs to be notified"); + event_data.vring.signal_used_queue().unwrap(); + } + Ok(needs_notification) => { + if needs_notification { + event_data.vring.signal_used_queue().unwrap(); + } + } + } + } else { + if event_data + .vring + .add_used(event_data.head_idx, event_data.used_len as u32) + .is_err() + { + warn!("Could not return used descriptors to ring"); + } + event_data.vring.signal_used_queue().unwrap(); + } + } /// Register a file with an epoll to listen for events in evset. pub fn epoll_register(epoll_fd: RawFd, fd: RawFd, evset: epoll::Events) -> Result<()> { epoll::ctl( @@ -504,31 +550,14 @@ impl VhostUserVsockThread { let vring = vring.clone(); let event_idx = self.event_idx; - - self.pool.spawn_ok(async move { - // TODO: Understand why doing the following in the pool works - if event_idx { - if vring.add_used(head_idx, used_len as u32).is_err() { - warn!("Could not return used descriptors to ring"); - } - match vring.needs_notification() { - Err(_) => { - warn!("Could not check if queue needs to be notified"); - vring.signal_used_queue().unwrap(); - } - Ok(needs_notification) => { - if needs_notification { - vring.signal_used_queue().unwrap(); - } - } - } - } else { - if vring.add_used(head_idx, used_len as u32).is_err() { - warn!("Could not return used descriptors to ring"); - } - vring.signal_used_queue().unwrap(); - } - }); + self.sender + .send(EventData { + vring, + event_idx, + head_idx, + used_len, + }) + .unwrap(); match rx_queue_type { RxQueueType::Standard => { @@ -661,30 +690,14 @@ impl VhostUserVsockThread { let vring = vring.clone(); let event_idx = self.event_idx; - - self.pool.spawn_ok(async move { - if event_idx { - if vring.add_used(head_idx, used_len as u32).is_err() { - warn!("Could not return used descriptors to ring"); - } - match vring.needs_notification() { - Err(_) => { - warn!("Could not check if queue needs to be notified"); - vring.signal_used_queue().unwrap(); - } - Ok(needs_notification) => { - if needs_notification { - vring.signal_used_queue().unwrap(); - } - } - } - } else { - if vring.add_used(head_idx, used_len as u32).is_err() { - warn!("Could not return used descriptors to ring"); - } - vring.signal_used_queue().unwrap(); - } - }); + self.sender + .send(EventData { + vring, + event_idx, + head_idx, + used_len, + }) + .unwrap(); } Ok(used_any)