Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[vsock] refactor VhostUserVsockThread worker #450

Merged
merged 2 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 0 additions & 128 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/vsock/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ byteorder = "1"
clap = { version = "4.4", features = ["derive"] }
env_logger = "0.10"
epoll = "4.3.2"
futures = { version = "0.3", features = ["thread-pool"] }
log = "0.4"
thiserror = "1.0"
vhost = { version = "0.8", features = ["vhost-user-slave"] }
Expand Down
2 changes: 0 additions & 2 deletions crates/vsock/src/vhu_vsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,6 @@ pub(crate) enum Error {
IterateQueue,
#[error("No rx request available")]
NoRequestRx,
#[error("Unable to create thread pool")]
CreateThreadPool(std::io::Error),
#[error("Packet missing data buffer")]
PktBufMissing,
#[error("Failed to connect to unix socket")]
Expand Down
129 changes: 71 additions & 58 deletions crates/vsock/src/vhu_vsock_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ use std::{
net::{UnixListener, UnixStream},
prelude::{AsRawFd, FromRawFd, RawFd},
},
sync::{Arc, RwLock},
sync::mpsc::Sender,
sync::{mpsc, Arc, RwLock},
thread,
};

use futures::executor::{ThreadPool, ThreadPoolBuilder};
use log::warn;
use vhost_user_backend::{VringEpollHandler, VringRwLock, VringT};
use virtio_queue::QueueOwnedT;
Expand All @@ -42,6 +43,15 @@ enum RxQueueType {
Standard,
RawPkts,
}

// Data which is required by a worker handling event idx.
struct EventData {
vring: VringRwLock,
event_idx: bool,
head_idx: u16,
used_len: usize,
}

pub(crate) struct VhostUserVsockThread {
/// Guest memory map.
pub mem: Option<GuestMemoryAtomic<GuestMemoryMmap>>,
Expand All @@ -61,8 +71,8 @@ pub(crate) struct VhostUserVsockThread {
pub thread_backend: VsockThreadBackend,
/// CID of the guest.
guest_cid: u64,
/// Thread pool to handle event idx.
pool: ThreadPool,
/// Channel to a worker which handles event idx.
sender: Sender<EventData>,
/// host side port on which application listens.
local_port: Wrapping<u32>,
/// The tx buffer size
Expand Down Expand Up @@ -126,7 +136,15 @@ impl VhostUserVsockThread {
),
);
}

let (sender, receiver) = mpsc::channel::<EventData>();
thread::spawn(move || loop {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: It might be worth factoring this closure out to be its own function that explicitly takes the captured parameters by value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. is it what you want?

// TODO: Understand why doing the following in the background thread works.
// maybe we'd better have thread pool for the entire application if necessary.
let Ok(event_data) = receiver.recv() else {
break;
};
Self::vring_handle_event(event_data);
});
let thread = VhostUserVsockThread {
mem: None,
event_idx: false,
Expand All @@ -137,10 +155,7 @@ impl VhostUserVsockThread {
epoll_file,
thread_backend,
guest_cid,
pool: ThreadPoolBuilder::new()
.pool_size(1)
.create()
.map_err(Error::CreateThreadPool)?,
sender,
local_port: Wrapping(0),
tx_buffer_size,
sibling_event_fd,
Expand All @@ -152,6 +167,37 @@ impl VhostUserVsockThread {
Ok(thread)
}

fn vring_handle_event(event_data: EventData) {
if event_data.event_idx {
if event_data
.vring
.add_used(event_data.head_idx, event_data.used_len as u32)
.is_err()
{
warn!("Could not return used descriptors to ring");
}
match event_data.vring.needs_notification() {
Err(_) => {
warn!("Could not check if queue needs to be notified");
event_data.vring.signal_used_queue().unwrap();
}
Ok(needs_notification) => {
if needs_notification {
event_data.vring.signal_used_queue().unwrap();
}
}
}
} else {
if event_data
.vring
.add_used(event_data.head_idx, event_data.used_len as u32)
.is_err()
{
warn!("Could not return used descriptors to ring");
}
event_data.vring.signal_used_queue().unwrap();
}
}
/// Register a file with an epoll to listen for events in evset.
pub fn epoll_register(epoll_fd: RawFd, fd: RawFd, evset: epoll::Events) -> Result<()> {
epoll::ctl(
Expand Down Expand Up @@ -504,31 +550,14 @@ impl VhostUserVsockThread {

let vring = vring.clone();
let event_idx = self.event_idx;

self.pool.spawn_ok(async move {
// TODO: Understand why doing the following in the pool works
stefano-garzarella marked this conversation as resolved.
Show resolved Hide resolved
if event_idx {
if vring.add_used(head_idx, used_len as u32).is_err() {
warn!("Could not return used descriptors to ring");
}
match vring.needs_notification() {
Err(_) => {
warn!("Could not check if queue needs to be notified");
vring.signal_used_queue().unwrap();
}
Ok(needs_notification) => {
if needs_notification {
vring.signal_used_queue().unwrap();
}
}
}
} else {
if vring.add_used(head_idx, used_len as u32).is_err() {
warn!("Could not return used descriptors to ring");
}
vring.signal_used_queue().unwrap();
}
});
self.sender
.send(EventData {
vring,
event_idx,
head_idx,
used_len,
})
.unwrap();

match rx_queue_type {
RxQueueType::Standard => {
Expand Down Expand Up @@ -661,30 +690,14 @@ impl VhostUserVsockThread {

let vring = vring.clone();
let event_idx = self.event_idx;

self.pool.spawn_ok(async move {
if event_idx {
if vring.add_used(head_idx, used_len as u32).is_err() {
warn!("Could not return used descriptors to ring");
}
match vring.needs_notification() {
Err(_) => {
warn!("Could not check if queue needs to be notified");
vring.signal_used_queue().unwrap();
}
Ok(needs_notification) => {
if needs_notification {
vring.signal_used_queue().unwrap();
}
}
}
} else {
if vring.add_used(head_idx, used_len as u32).is_err() {
warn!("Could not return used descriptors to ring");
}
vring.signal_used_queue().unwrap();
}
});
self.sender
.send(EventData {
vring,
event_idx,
head_idx,
used_len,
})
.unwrap();
}

Ok(used_any)
Expand Down