Skip to content

Commit

Permalink
[vsock] refactor VhostUserVsockThread worker
Browse files Browse the repository at this point in the history
For now, VhostUserVsockThread uses thread pool executor from futures,
but it doesn't need to use thread pool executor and futures because
we just need background worker thread, and a way to let it work.

So I removed unnecessary external dependency and made the logic simpler
by using just thread and channel

Signed-off-by: Jeongik Cha <jeongik@google.com>
  • Loading branch information
ikicha committed Sep 19, 2023
1 parent 38caab2 commit 8b5c07f
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 189 deletions.
128 changes: 0 additions & 128 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/vsock/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ byteorder = "1"
clap = { version = "4.4", features = ["derive"] }
env_logger = "0.10"
epoll = "4.3.2"
futures = { version = "0.3", features = ["thread-pool"] }
log = "0.4"
thiserror = "1.0"
vhost = { version = "0.8", features = ["vhost-user-slave"] }
Expand Down
2 changes: 0 additions & 2 deletions crates/vsock/src/vhu_vsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,6 @@ pub(crate) enum Error {
IterateQueue,
#[error("No rx request available")]
NoRequestRx,
#[error("Unable to create thread pool")]
CreateThreadPool(std::io::Error),
#[error("Packet missing data buffer")]
PktBufMissing,
#[error("Failed to connect to unix socket")]
Expand Down
122 changes: 64 additions & 58 deletions crates/vsock/src/vhu_vsock_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ use std::{
net::{UnixListener, UnixStream},
prelude::{AsRawFd, FromRawFd, RawFd},
},
sync::{Arc, RwLock},
sync::mpsc::Sender,
sync::{mpsc, Arc, RwLock},
thread,
};

use futures::executor::{ThreadPool, ThreadPoolBuilder};
use log::warn;
use vhost_user_backend::{VringEpollHandler, VringRwLock, VringT};
use virtio_queue::QueueOwnedT;
Expand All @@ -42,6 +43,15 @@ enum RxQueueType {
Standard,
RawPkts,
}

// Data which is required by a worker handling event idx.
struct EventData {
vring: VringRwLock,
event_idx: bool,
head_idx: u16,
used_len: usize,
}

pub(crate) struct VhostUserVsockThread {
/// Guest memory map.
pub mem: Option<GuestMemoryAtomic<GuestMemoryMmap>>,
Expand All @@ -61,8 +71,8 @@ pub(crate) struct VhostUserVsockThread {
pub thread_backend: VsockThreadBackend,
/// CID of the guest.
guest_cid: u64,
/// Thread pool to handle event idx.
pool: ThreadPool,
/// Channel to a worker which handles event idx.
sender: Sender<EventData>,
/// host side port on which application listens.
local_port: Wrapping<u32>,
/// The tx buffer size
Expand Down Expand Up @@ -126,7 +136,39 @@ impl VhostUserVsockThread {
),
);
}

let (sender, receiver) = mpsc::channel::<EventData>();
thread::spawn(move || loop {
let event_data = receiver.recv().unwrap();
if event_data.event_idx {
if event_data
.vring
.add_used(event_data.head_idx, event_data.used_len as u32)
.is_err()
{
warn!("Could not return used descriptors to ring");
}
match event_data.vring.needs_notification() {
Err(_) => {
warn!("Could not check if queue needs to be notified");
event_data.vring.signal_used_queue().unwrap();
}
Ok(needs_notification) => {
if needs_notification {
event_data.vring.signal_used_queue().unwrap();
}
}
}
} else {
if event_data
.vring
.add_used(event_data.head_idx, event_data.used_len as u32)
.is_err()
{
warn!("Could not return used descriptors to ring");
}
event_data.vring.signal_used_queue().unwrap();
}
});
let thread = VhostUserVsockThread {
mem: None,
event_idx: false,
Expand All @@ -137,10 +179,7 @@ impl VhostUserVsockThread {
epoll_file,
thread_backend,
guest_cid,
pool: ThreadPoolBuilder::new()
.pool_size(1)
.create()
.map_err(Error::CreateThreadPool)?,
sender,
local_port: Wrapping(0),
tx_buffer_size,
sibling_event_fd,
Expand Down Expand Up @@ -504,31 +543,14 @@ impl VhostUserVsockThread {

let vring = vring.clone();
let event_idx = self.event_idx;

self.pool.spawn_ok(async move {
// TODO: Understand why doing the following in the pool works
if event_idx {
if vring.add_used(head_idx, used_len as u32).is_err() {
warn!("Could not return used descriptors to ring");
}
match vring.needs_notification() {
Err(_) => {
warn!("Could not check if queue needs to be notified");
vring.signal_used_queue().unwrap();
}
Ok(needs_notification) => {
if needs_notification {
vring.signal_used_queue().unwrap();
}
}
}
} else {
if vring.add_used(head_idx, used_len as u32).is_err() {
warn!("Could not return used descriptors to ring");
}
vring.signal_used_queue().unwrap();
}
});
self.sender
.send(EventData {
vring,
event_idx,
head_idx,
used_len,
})
.unwrap();

match rx_queue_type {
RxQueueType::Standard => {
Expand Down Expand Up @@ -661,30 +683,14 @@ impl VhostUserVsockThread {

let vring = vring.clone();
let event_idx = self.event_idx;

self.pool.spawn_ok(async move {
if event_idx {
if vring.add_used(head_idx, used_len as u32).is_err() {
warn!("Could not return used descriptors to ring");
}
match vring.needs_notification() {
Err(_) => {
warn!("Could not check if queue needs to be notified");
vring.signal_used_queue().unwrap();
}
Ok(needs_notification) => {
if needs_notification {
vring.signal_used_queue().unwrap();
}
}
}
} else {
if vring.add_used(head_idx, used_len as u32).is_err() {
warn!("Could not return used descriptors to ring");
}
vring.signal_used_queue().unwrap();
}
});
self.sender
.send(EventData {
vring,
event_idx,
head_idx,
used_len,
})
.unwrap();
}

Ok(used_any)
Expand Down

0 comments on commit 8b5c07f

Please sign in to comment.