From ce60145f0b346e3eb632fa14d10e036037470121 Mon Sep 17 00:00:00 2001 From: Babis Chalios Date: Thu, 12 Sep 2024 21:05:53 +0200 Subject: [PATCH] net: use readv for reading frames from TAP device Right now, we are performing two copies for writing a frame from the TAP device into guest memory. We first read the frame in an array held by the Net device and then copy that array in a DescriptorChain. In order to avoid the double copy use the readv system call to read directly from the TAP device into the buffers described by DescriptorChain. The main challenge with this is that DescriptorChain objects describe memory that is at least 65562 bytes long when guest TSO4, TSO6 or UFO are enabled or 1526 otherwise and parsing the chain includes overhead which we pay even if the frame we are receiving is much smaller than these sizes. PR https://github.com/firecracker-microvm/firecracker/pull/4748 reduced the overheads involved with parsing DescriptorChain objects. To further avoid this overhead, move the parsing of DescriptorChain objects out of the hot path of process_rx() where we are actually receiving a frame into process_rx_queue_event() where we get the notification that the guest added new buffers for network RX. Signed-off-by: Babis Chalios --- .../seccomp/aarch64-unknown-linux-musl.json | 102 ++++ .../seccomp/x86_64-unknown-linux-musl.json | 102 ++++ src/vmm/src/devices/virtio/net/device.rs | 565 ++++++++++-------- src/vmm/src/devices/virtio/net/mod.rs | 6 + src/vmm/src/devices/virtio/net/tap.rs | 59 +- src/vmm/src/devices/virtio/net/test_utils.rs | 20 +- 6 files changed, 557 insertions(+), 297 deletions(-) diff --git a/resources/seccomp/aarch64-unknown-linux-musl.json b/resources/seccomp/aarch64-unknown-linux-musl.json index 868e7ce0e99..f0a27b6e40d 100644 --- a/resources/seccomp/aarch64-unknown-linux-musl.json +++ b/resources/seccomp/aarch64-unknown-linux-musl.json @@ -32,6 +32,108 @@ "syscall": "writev", "comment": "Used by the VirtIO net device to write to tap" }, + { + "syscall": "readv", + "comment": "Used by the VirtIO net device to read from tap" + }, + { + "syscall": "memfd_create", + "comment": "Used by the IovDeque implementation" + }, + { + "syscall": "fcntl", + "comment": "Used by the IovDeque implementation", + "args": [ + { + "index": 1, + "type": "dword", + "op": "eq", + "val": 1033, + "comment": "FCNTL_F_SETFD" + }, + { + "index": 2, + "type": "dword", + "op": "eq", + "val": 6, + "comment": "F_SEAL_SHRINK|F_SEAL_GROW" + } + ] + }, + { + "syscall": "fcntl", + "comment": "Used by the IovDeque implementation", + "args": [ + { + "index": 1, + "type": "dword", + "op": "eq", + "val": 1033, + "comment": "FCNTL_F_SETFD" + }, + { + "index": 2, + "type": "dword", + "op": "eq", + "val": 1, + "comment": "F_SEAL_SEAL" + } + ] + }, + { + "syscall": "mmap", + "comment": "Used by the IovDeque implementation", + "args": [ + { + "index": 1, + "type": "dword", + "op": "eq", + "val": 4096, + "comment": "Page size allocation" + }, + { + "index": 2, + "type": "dword", + "op": "eq", + "val": 3, + "comment": "PROT_READ|PROT_WRITE" + }, + { + "index": 3, + "type": "dword", + "op": "eq", + "val": 17, + "comment": "MAP_SHARED|MAP_FIXED" + } + ] + }, + { + "syscall": "mmap", + "comment": "Used by the IovDeque implementation", + "args": [ + { + "index": 1, + "type": "dword", + "op": "eq", + "val": 8192, + "comment": "2 pages allocation" + }, + { + "index": 2, + "type": "dword", + "op": "eq", + "val": 0, + "comment": "PROT_NONE" + }, + { + "index": 3, + "type": "dword", + "op": "eq", + "val": 34, + "comment": "MAP_PRIVATE|MAP_ANONYMOUS" + } + ] + }, { "syscall": "fsync" }, diff --git a/resources/seccomp/x86_64-unknown-linux-musl.json b/resources/seccomp/x86_64-unknown-linux-musl.json index e5b4b690196..630211f47d1 100644 --- a/resources/seccomp/x86_64-unknown-linux-musl.json +++ b/resources/seccomp/x86_64-unknown-linux-musl.json @@ -32,6 +32,108 @@ "syscall": "writev", "comment": "Used by the VirtIO net device to write to tap" }, + { + "syscall": "readv", + "comment": "Used by the VirtIO net device to read from tap" + }, + { + "syscall": "memfd_create", + "comment": "Used by the IovDeque implementation" + }, + { + "syscall": "fcntl", + "comment": "Used by the IovDeque implementation", + "args": [ + { + "index": 1, + "type": "dword", + "op": "eq", + "val": 1033, + "comment": "FCNTL_F_SETFD" + }, + { + "index": 2, + "type": "dword", + "op": "eq", + "val": 6, + "comment": "F_SEAL_SHRINK|F_SEAL_GROW" + } + ] + }, + { + "syscall": "fcntl", + "comment": "Used by the IovDeque implementation", + "args": [ + { + "index": 1, + "type": "dword", + "op": "eq", + "val": 1033, + "comment": "FCNTL_F_SETFD" + }, + { + "index": 2, + "type": "dword", + "op": "eq", + "val": 1, + "comment": "F_SEAL_SEAL" + } + ] + }, + { + "syscall": "mmap", + "comment": "Used by the IovDeque implementation", + "args": [ + { + "index": 1, + "type": "dword", + "op": "eq", + "val": 4096, + "comment": "Page size allocation" + }, + { + "index": 2, + "type": "dword", + "op": "eq", + "val": 3, + "comment": "PROT_READ|PROT_WRITE" + }, + { + "index": 3, + "type": "dword", + "op": "eq", + "val": 17, + "comment": "MAP_SHARED|MAP_FIXED" + } + ] + }, + { + "syscall": "mmap", + "comment": "Used by the IovDeque implementation", + "args": [ + { + "index": 1, + "type": "dword", + "op": "eq", + "val": 8192, + "comment": "2 pages allocation" + }, + { + "index": 2, + "type": "dword", + "op": "eq", + "val": 0, + "comment": "PROT_NONE" + }, + { + "index": 3, + "type": "dword", + "op": "eq", + "val": 34, + "comment": "MAP_PRIVATE|MAP_ANONYMOUS" + } + ] + }, { "syscall": "fsync" }, diff --git a/src/vmm/src/devices/virtio/net/device.rs b/src/vmm/src/devices/virtio/net/device.rs index f8c29f95175..ed6f36dcb0b 100755 --- a/src/vmm/src/devices/virtio/net/device.rs +++ b/src/vmm/src/devices/virtio/net/device.rs @@ -5,14 +5,13 @@ // Use of this source code is governed by a BSD-style license that can be // found in the THIRD-PARTY file. -use std::io::Read; +use std::collections::VecDeque; use std::mem; use std::net::Ipv4Addr; use std::sync::{Arc, Mutex}; use libc::EAGAIN; -use log::{error, warn}; -use vm_memory::GuestMemoryError; +use log::error; use vmm_sys_util::eventfd::EventFd; use crate::devices::virtio::device::{DeviceState, IrqTrigger, IrqType, VirtioDevice}; @@ -23,13 +22,15 @@ use crate::devices::virtio::gen::virtio_net::{ VIRTIO_NET_F_HOST_TSO6, VIRTIO_NET_F_HOST_UFO, VIRTIO_NET_F_MAC, }; use crate::devices::virtio::gen::virtio_ring::VIRTIO_RING_F_EVENT_IDX; -use crate::devices::virtio::iovec::IoVecBuffer; +use crate::devices::virtio::iovec::{ + IoVecBuffer, IoVecBufferMut, IoVecError, ParsedDescriptorChain, +}; use crate::devices::virtio::net::metrics::{NetDeviceMetrics, NetMetricsPerDevice}; use crate::devices::virtio::net::tap::Tap; use crate::devices::virtio::net::{ gen, NetError, NetQueue, MAX_BUFFER_SIZE, NET_QUEUE_SIZES, RX_INDEX, TX_INDEX, }; -use crate::devices::virtio::queue::{DescriptorChain, Queue}; +use crate::devices::virtio::queue::{DescriptorChain, Queue, FIRECRACKER_MAX_QUEUE_SIZE}; use crate::devices::virtio::{ActivateError, TYPE_NET}; use crate::devices::{report_net_event_fail, DeviceError}; use crate::dumbo::pdu::arp::ETH_IPV4_FRAME_LEN; @@ -40,24 +41,10 @@ use crate::mmds::ns::MmdsNetworkStack; use crate::rate_limiter::{BucketUpdate, RateLimiter, TokenType}; use crate::utils::net::mac::MacAddr; use crate::utils::u64_to_usize; -use crate::vstate::memory::{ByteValued, Bytes, GuestMemoryMmap}; +use crate::vstate::memory::{ByteValued, GuestMemoryMmap}; const FRAME_HEADER_MAX_LEN: usize = PAYLOAD_OFFSET + ETH_IPV4_FRAME_LEN; -#[derive(Debug, thiserror::Error, displaydoc::Display)] -enum FrontendError { - /// Add user. - AddUsed, - /// Descriptor chain too mall. - DescriptorChainTooSmall, - /// Empty queue. - EmptyQueue, - /// Guest memory error: {0} - GuestMemory(GuestMemoryError), - /// Read only descriptor. - ReadOnlyDescriptor, -} - pub(crate) const fn vnet_hdr_len() -> usize { mem::size_of::() } @@ -102,6 +89,118 @@ pub struct ConfigSpace { // SAFETY: `ConfigSpace` contains only PODs in `repr(C)` or `repr(transparent)`, without padding. unsafe impl ByteValued for ConfigSpace {} +#[derive(Debug, thiserror::Error, displaydoc::Display)] +enum AddRxBufferError { + /// Error while parsing new buffer: {0} + Parsing(#[from] IoVecError), + /// RX buffer is too small + BufferTooSmall, +} + +/// A map of all the memory the guest has provided us with for performing RX +#[derive(Debug)] +pub struct RxBuffers { + // minimum size of a usable buffer for doing RX + pub min_buffer_size: u32, + // An [`IoVecBufferMut`] covering all the memory we have available for receiving network + // frames. + pub iovec: IoVecBufferMut, + // A map of which part of the memory belongs to which `DescriptorChain` object + pub parsed_descriptors: VecDeque, + // Buffers that we have used and they are ready to be given back to the guest. + pub deferred_descriptor: Option, +} + +impl RxBuffers { + /// Create a new [`RxBuffers`] object for storing guest memory for performing RX + fn new() -> Result { + Ok(Self { + min_buffer_size: 0, + iovec: IoVecBufferMut::new()?, + parsed_descriptors: VecDeque::with_capacity(FIRECRACKER_MAX_QUEUE_SIZE.into()), + deferred_descriptor: None, + }) + } + + /// Add a new `DescriptorChain` that we received from the RX queue in the buffer. + /// + /// SAFETY: The `DescriptorChain` cannot be referencing the same memory location as any other + /// `DescriptorChain`. (See also related comment in + /// [`IoVecBufferMut::append_descriptor_chain`]). + unsafe fn add_buffer( + &mut self, + mem: &GuestMemoryMmap, + head: DescriptorChain, + ) -> Result<(), AddRxBufferError> { + let parsed_dc = self.iovec.append_descriptor_chain(mem, head)?; + if parsed_dc.length < self.min_buffer_size { + self.iovec.drop_descriptor_chain(&parsed_dc); + return Err(AddRxBufferError::BufferTooSmall); + } + self.parsed_descriptors.push_back(parsed_dc); + Ok(()) + } + + /// Returns the number of available `iovec` objects. + #[inline(always)] + fn len(&self) -> usize { + self.iovec.len() + } + + /// Returns `true` if there aren't any available `iovec` objects. + #[inline(always)] + fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Mark the first `size` bytes of available memory as used. + /// + /// # Safety: + /// + /// * The `RxBuffers` should include at least one parsed `DescriptorChain`. + /// * `size` needs to be smaller or equal to total length of the first `DescriptorChain` stored + /// in the `RxBuffers`. + unsafe fn mark_used(&mut self, size: u32) -> ParsedDescriptorChain { + // Since we were able to write a frame in guest memory, we should have at least one + // descriptor chain here. If not, we have a bug, so fail fast, since the device is + // fundamentally broken. + let mut parsed_dc = self.parsed_descriptors.pop_front().expect( + "net: internal bug. Mismatch between written frame size and available descriptors", + ); + + self.header_set_num_buffers(1); + self.iovec.drop_descriptor_chain(&parsed_dc); + parsed_dc.length = size; + parsed_dc + } + + /// Write the number of descriptors used in VirtIO header + fn header_set_num_buffers(&mut self, nr_descs: u16) { + // We can unwrap here, because we have checked before that the `IoVecBufferMut` holds at + // least one buffer with the proper size, depending on the feature negotiation. In any + // case, the buffer holds memory of at least `std::mem::size_of::()` + // bytes. + self.iovec + .write_all_volatile_at( + &nr_descs.to_le_bytes(), + std::mem::offset_of!(virtio_net_hdr_v1, num_buffers), + ) + .unwrap() + } + + /// This will let the guest know that about all the `DescriptorChain` object that has been + /// used to receive a frame from the TAP. + fn finish_frame(&mut self, dc: &ParsedDescriptorChain, rx_queue: &mut Queue) { + // It is fine to `.unrap()` here. The only reason why `add_used` can fail is if the + // `head_index` is not a valid descriptor id. `head_index` here is a valid + // `DescriptorChain` index. We got it from `queue.pop_or_enable_notification()` which + // checks for its validity. In other words, if this unwrap() fails there's a bug in our + // emulation logic which, most likely, we can't recover from. So, let's crash here + // instead of logging an error and continuing. + rx_queue.add_used(dc.head_index, dc.length).unwrap(); + } +} + /// VirtIO network device. /// /// It emulates a network device able to exchange L2 frames between the guest @@ -122,9 +221,6 @@ pub struct Net { pub(crate) rx_rate_limiter: RateLimiter, pub(crate) tx_rate_limiter: RateLimiter, - pub(crate) rx_deferred_frame: bool, - - rx_bytes_read: usize, rx_frame_buf: [u8; MAX_BUFFER_SIZE], tx_frame_headers: [u8; frame_hdr_len()], @@ -143,6 +239,7 @@ pub struct Net { pub(crate) metrics: Arc, tx_buffer: IoVecBuffer, + pub(crate) rx_buffer: RxBuffers, } impl Net { @@ -189,8 +286,6 @@ impl Net { queue_evts, rx_rate_limiter, tx_rate_limiter, - rx_deferred_frame: false, - rx_bytes_read: 0, rx_frame_buf: [0u8; MAX_BUFFER_SIZE], tx_frame_headers: [0u8; frame_hdr_len()], irq_trigger: IrqTrigger::new().map_err(NetError::EventFd)?, @@ -201,6 +296,7 @@ impl Net { mmds_ns: None, metrics: NetMetricsPerDevice::alloc(id), tx_buffer: Default::default(), + rx_buffer: RxBuffers::new()?, }) } @@ -311,126 +407,50 @@ impl Net { // Attempts to copy a single frame into the guest if there is enough // rate limiting budget. // Returns true on successful frame delivery. - fn rate_limited_rx_single_frame(&mut self) -> bool { - if !Self::rate_limiter_consume_op(&mut self.rx_rate_limiter, self.rx_bytes_read as u64) { + fn rate_limited_rx_single_frame(&mut self, dc: &ParsedDescriptorChain) -> bool { + let rx_queue = &mut self.queues[RX_INDEX]; + if !Self::rate_limiter_consume_op(&mut self.rx_rate_limiter, dc.length as u64) { self.metrics.rx_rate_limiter_throttled.inc(); return false; } - // Attempt frame delivery. - let success = self.write_frame_to_guest(); - - // Undo the tokens consumption if guest delivery failed. - if !success { - // revert the rate limiting budget consumption - Self::rate_limiter_replenish_op(&mut self.rx_rate_limiter, self.rx_bytes_read as u64); - } - - success + self.rx_buffer.finish_frame(dc, rx_queue); + true } - /// Write a slice in a descriptor chain - /// - /// # Errors - /// - /// Returns an error if the descriptor chain is too short or - /// an inappropriate (read only) descriptor is found in the chain - fn write_to_descriptor_chain( - mem: &GuestMemoryMmap, - data: &[u8], - head: DescriptorChain, - net_metrics: &NetDeviceMetrics, - ) -> Result<(), FrontendError> { - let mut chunk = data; - let mut next_descriptor = Some(head); - - while let Some(descriptor) = &next_descriptor { - if !descriptor.is_write_only() { - return Err(FrontendError::ReadOnlyDescriptor); - } - - let len = std::cmp::min(chunk.len(), descriptor.len as usize); - match mem.write_slice(&chunk[..len], descriptor.addr) { - Ok(()) => { - net_metrics.rx_count.inc(); - chunk = &chunk[len..]; - } - Err(err) => { - error!("Failed to write slice: {:?}", err); - if let GuestMemoryError::PartialBuffer { .. } = err { - net_metrics.rx_partial_writes.inc(); - } - return Err(FrontendError::GuestMemory(err)); - } - } - - // If chunk is empty we are done here. - if chunk.is_empty() { - let len = data.len() as u64; - net_metrics.rx_bytes_count.add(len); - net_metrics.rx_packets_count.inc(); - return Ok(()); - } - - next_descriptor = descriptor.next_descriptor(); + /// Returns the minimum size of buffer we expect the guest to provide us depending on the + /// features we have negotiated with it + fn minimum_rx_buffer_size(&self) -> u32 { + if self.has_feature(VIRTIO_NET_F_GUEST_TSO4 as u64) + || self.has_feature(VIRTIO_NET_F_GUEST_TSO6 as u64) + || self.has_feature(VIRTIO_NET_F_GUEST_UFO as u64) + { + 65562 + } else { + 1526 } - - warn!("Receiving buffer is too small to hold frame of current size"); - Err(FrontendError::DescriptorChainTooSmall) } - // Copies a single frame from `self.rx_frame_buf` into the guest. - fn do_write_frame_to_guest(&mut self) -> Result<(), FrontendError> { + /// Parse available RX `DescriptorChains` from the queue + pub fn parse_rx_descriptors(&mut self) { // This is safe since we checked in the event handler that the device is activated. let mem = self.device_state.mem().unwrap(); - let queue = &mut self.queues[RX_INDEX]; - let head_descriptor = queue.pop_or_enable_notification().ok_or_else(|| { - self.metrics.no_rx_avail_buffer.inc(); - FrontendError::EmptyQueue - })?; - let head_index = head_descriptor.index; - - let result = Self::write_to_descriptor_chain( - mem, - &self.rx_frame_buf[..self.rx_bytes_read], - head_descriptor, - &self.metrics, - ); - // Mark the descriptor chain as used. If an error occurred, skip the descriptor chain. - let used_len = if result.is_err() { - self.metrics.rx_fails.inc(); - 0 - } else { - // Safe to unwrap because a frame must be smaller than 2^16 bytes. - u32::try_from(self.rx_bytes_read).unwrap() - }; - queue.add_used(head_index, used_len).map_err(|err| { - error!("Failed to add available descriptor {}: {}", head_index, err); - FrontendError::AddUsed - })?; - - result - } - - // Copies a single frame from `self.rx_frame_buf` into the guest. In case of an error retries - // the operation if possible. Returns true if the operation was successfull. - fn write_frame_to_guest(&mut self) -> bool { - let max_iterations = self.queues[RX_INDEX].actual_size(); - for _ in 0..max_iterations { - match self.do_write_frame_to_guest() { - Ok(()) => return true, - Err(FrontendError::EmptyQueue) | Err(FrontendError::AddUsed) => { - return false; - } - Err(_) => { - // retry - continue; + while let Some(head) = queue.pop_or_enable_notification() { + let index = head.index; + // SAFETY: we are only using this `DescriptorChain` here. + if let Err(err) = unsafe { self.rx_buffer.add_buffer(mem, head) } { + self.metrics.rx_fails.inc(); + error!("net: Could not parse an RX descriptor: {err}"); + // Try to add the bad descriptor to the used ring. + if let Err(err) = queue.add_used(index, 0) { + error!( + "net: Failed to add available RX descriptor {index} while handling a \ + parsing error: {err}", + ); } } } - - false } // Tries to detour the frame to MMDS and if MMDS doesn't accept it, sends it on the host TAP. @@ -508,7 +528,20 @@ impl Net { } // We currently prioritize packets from the MMDS over regular network packets. - fn read_from_mmds_or_tap(&mut self) -> Result { + fn read_from_mmds_or_tap(&mut self) -> Result, NetError> { + // If we don't have any buffers available try to parse more from the RX queue. There might + // be some buffers we didn't get the chance to process, because we got to handle the TAP + // event before the RX queue event. + if self.rx_buffer.is_empty() { + self.parse_rx_descriptors(); + + // If after parsing the RX queue we still don't have any buffers stop processing RX + // frames. + if self.rx_buffer.is_empty() { + return Ok(None); + } + } + if let Some(ns) = self.mmds_ns.as_mut() { if let Some(len) = ns.write_next_frame(frame_bytes_from_buf_mut(&mut self.rx_frame_buf)?) @@ -517,22 +550,48 @@ impl Net { METRICS.mmds.tx_frames.inc(); METRICS.mmds.tx_bytes.add(len as u64); init_vnet_hdr(&mut self.rx_frame_buf); - return Ok(vnet_hdr_len() + len); + self.rx_buffer + .iovec + .write_all_volatile_at(&self.rx_frame_buf[..vnet_hdr_len() + len], 0)?; + // SAFETY: This is safe: + // * We checked that `rx_buffer` includes at least one `DescriptorChain` + // * `rx_frame_buf` has size of `MAX_BUFFER_SIZE` and all `DescriptorChain` objects + // are at least that big. + let dc = unsafe { + self.rx_buffer + .mark_used((vnet_hdr_len() + len).try_into().unwrap()) + }; + + return Ok(Some(dc)); } } - self.read_tap().map_err(NetError::IO) + // SAFETY: this is safe because we ensured that `self.rx_buffer` has at least one + // DescriptorChain parsed in it. + let len = unsafe { self.read_tap().map_err(NetError::IO) }?; + + // SAFETY: This is safe, + // * `rx_buffer` has at least one `DescriptorChain` + // * `read_tap` passes the first `DescriptorChain` to `readv` so we can't have read more + // bytes than its capacity. + let dc = unsafe { self.rx_buffer.mark_used(len.try_into().unwrap()) }; + Ok(Some(dc)) } + /// Read as many frames as possible. fn process_rx(&mut self) -> Result<(), DeviceError> { - // Read as many frames as possible. loop { match self.read_from_mmds_or_tap() { - Ok(count) => { - self.rx_bytes_read = count; + Ok(None) => { + self.metrics.no_rx_avail_buffer.inc(); + break; + } + Ok(Some(dc)) => { self.metrics.rx_count.inc(); - if !self.rate_limited_rx_single_frame() { - self.rx_deferred_frame = true; + self.metrics.rx_bytes_count.add(dc.length as u64); + self.metrics.rx_packets_count.inc(); + if !self.rate_limited_rx_single_frame(&dc) { + self.rx_buffer.deferred_descriptor = Some(dc); break; } } @@ -558,24 +617,18 @@ impl Net { self.try_signal_queue(NetQueue::Rx) } - // Process the deferred frame first, then continue reading from tap. - fn handle_deferred_frame(&mut self) -> Result<(), DeviceError> { - if self.rate_limited_rx_single_frame() { - self.rx_deferred_frame = false; - // process_rx() was interrupted possibly before consuming all - // packets in the tap; try continuing now. - return self.process_rx(); - } - - self.try_signal_queue(NetQueue::Rx) - } - fn resume_rx(&mut self) -> Result<(), DeviceError> { - if self.rx_deferred_frame { - self.handle_deferred_frame() - } else { - Ok(()) + // First try to handle any deferred frame + if let Some(deferred_descriptor) = self.rx_buffer.deferred_descriptor.take() { + // If can't finish sending this frame, re-set it as deferred and return; we can't + // process any more frames from the TAP. + if !self.rate_limited_rx_single_frame(&deferred_descriptor) { + self.rx_buffer.deferred_descriptor = Some(deferred_descriptor); + return Ok(()); + } } + + self.process_rx() } fn process_tx(&mut self) -> Result<(), DeviceError> { @@ -636,7 +689,7 @@ impl Net { &self.metrics, ) .unwrap_or(false); - if frame_consumed_by_mmds && !self.rx_deferred_frame { + if frame_consumed_by_mmds && self.rx_buffer.deferred_descriptor.is_none() { // MMDS consumed this frame/request, let's also try to process the response. process_rx_for_mmds = true; } @@ -715,8 +768,15 @@ impl Net { self.tx_rate_limiter.update_buckets(tx_bytes, tx_ops); } - fn read_tap(&mut self) -> std::io::Result { - self.tap.read(&mut self.rx_frame_buf) + /// Reads a frame from the TAP device inside the first descriptor held by `self.rx_buffer`. + /// + /// # Safety + /// + /// `self.rx_buffer` needs to have at least one descriptor chain parsed + pub unsafe fn read_tap(&mut self) -> std::io::Result { + let nr_iovecs = self.rx_buffer.parsed_descriptors[0].nr_iovecs as usize; + self.tap + .read_iovec(&mut self.rx_buffer.iovec.as_iovec_mut_slice()[..nr_iovecs]) } fn write_tap(tap: &mut Tap, buf: &IoVecBuffer) -> std::io::Result { @@ -734,7 +794,12 @@ impl Net { // rate limiters present but with _very high_ allowed rate error!("Failed to get rx queue event: {:?}", err); self.metrics.event_fails.inc(); - } else if self.rx_rate_limiter.is_blocked() { + return; + } else { + self.parse_rx_descriptors(); + } + + if self.rx_rate_limiter.is_blocked() { self.metrics.rx_rate_limiter_throttled.inc(); } else { // If the limiter is not blocked, resume the receiving of bytes. @@ -747,31 +812,14 @@ impl Net { // This is safe since we checked in the event handler that the device is activated. self.metrics.rx_tap_event_count.inc(); - // While there are no available RX queue buffers and there's a deferred_frame - // don't process any more incoming. Otherwise start processing a frame. In the - // process the deferred_frame flag will be set in order to avoid freezing the - // RX queue. - if self.queues[RX_INDEX].is_empty() && self.rx_deferred_frame { - self.metrics.no_rx_avail_buffer.inc(); - return; - } - // While limiter is blocked, don't process any more incoming. if self.rx_rate_limiter.is_blocked() { self.metrics.rx_rate_limiter_throttled.inc(); return; } - if self.rx_deferred_frame - // Process a deferred frame first if available. Don't read from tap again - // until we manage to receive this deferred frame. - { - self.handle_deferred_frame() - .unwrap_or_else(|err| report_net_event_fail(&self.metrics, err)); - } else { - self.process_rx() - .unwrap_or_else(|err| report_net_event_fail(&self.metrics, err)); - } + self.resume_rx() + .unwrap_or_else(|err| report_net_event_fail(&self.metrics, err)); } /// Process a single TX queue event. @@ -913,6 +961,8 @@ impl VirtioDevice for Net { .set_offload(supported_flags) .map_err(super::super::ActivateError::TapSetOffload)?; + self.rx_buffer.min_buffer_size = self.minimum_rx_buffer_size(); + if self.activate_evt.write(1).is_err() { self.metrics.activate_fails.inc(); return Err(ActivateError::EventFd); @@ -958,6 +1008,14 @@ pub mod tests { use crate::utils::net::mac::{MacAddr, MAC_ADDR_LEN}; use crate::vstate::memory::{Address, GuestMemory}; + /// Write the number of descriptors used in VirtIO header + fn header_set_num_buffers(frame: &mut [u8], nr_descs: u16) { + let bytes = nr_descs.to_le_bytes(); + let offset = std::mem::offset_of!(virtio_net_hdr_v1, num_buffers); + frame[offset] = bytes[0]; + frame[offset + 1] = bytes[1]; + } + #[test] fn test_vnet_helpers() { let mut frame_buf = vec![42u8; vnet_hdr_len() - 1]; @@ -1144,9 +1202,14 @@ pub mod tests { (2, 1000, VIRTQ_DESC_F_WRITE), ], ); - let frame = th.check_rx_deferred_frame(1000); + let mut frame = inject_tap_tx_frame(&th.net(), 1000); + check_metric_after_block!( + th.net().metrics.rx_fails, + 1, + th.event_manager.run_with_timeout(100).unwrap() + ); th.rxq.check_used_elem(0, 0, 0); - + header_set_num_buffers(frame.as_mut_slice(), 1); th.check_rx_queue_resume(&frame); } @@ -1157,9 +1220,10 @@ pub mod tests { th.activate_net(); th.add_desc_chain(NetQueue::Rx, 0, &[(0, 100, VIRTQ_DESC_F_WRITE)]); - let frame = th.check_rx_deferred_frame(1000); + let mut frame = th.check_rx_discarded_buffer(1000); th.rxq.check_used_elem(0, 0, 0); + header_set_num_buffers(frame.as_mut_slice(), 1); th.check_rx_queue_resume(&frame); } @@ -1181,9 +1245,10 @@ pub mod tests { (2, 4096, VIRTQ_DESC_F_WRITE), ], ); - let frame = th.check_rx_deferred_frame(1000); + let mut frame = th.check_rx_discarded_buffer(1000); th.rxq.check_used_elem(0, 0, 0); + header_set_num_buffers(frame.as_mut_slice(), 1); th.check_rx_queue_resume(&frame); } @@ -1212,11 +1277,12 @@ pub mod tests { &[(4, 1000, VIRTQ_DESC_F_WRITE)], ); - // Add valid descriptor chain. - th.add_desc_chain(NetQueue::Rx, 1300, &[(5, 1000, VIRTQ_DESC_F_WRITE)]); + // Add valid descriptor chain. TestHelper does not negotiate any feature offloading so the + // buffers need to be at least 1526 bytes long. + th.add_desc_chain(NetQueue::Rx, 1300, &[(5, 1526, VIRTQ_DESC_F_WRITE)]); // Inject frame to tap and run epoll. - let frame = inject_tap_tx_frame(&th.net(), 1000); + let mut frame = inject_tap_tx_frame(&th.net(), 1000); check_metric_after_block!( th.net().metrics.rx_packets_count, 1, @@ -1231,10 +1297,11 @@ pub mod tests { th.rxq.check_used_elem(1, 3, 0); th.rxq.check_used_elem(2, 4, 0); // Check that the frame wasn't deferred. - assert!(!th.net().rx_deferred_frame); + assert!(th.net().rx_buffer.deferred_descriptor.is_none()); // Check that the frame has been written successfully to the valid Rx descriptor chain. th.rxq .check_used_elem(3, 5, frame.len().try_into().unwrap()); + header_set_num_buffers(frame.as_mut_slice(), 1); th.rxq.dtable[5].check_data(&frame); } @@ -1257,7 +1324,7 @@ pub mod tests { ], ); // Inject frame to tap and run epoll. - let frame = inject_tap_tx_frame(&th.net(), 1000); + let mut frame = inject_tap_tx_frame(&th.net(), 1000); check_metric_after_block!( th.net().metrics.rx_packets_count, 1, @@ -1265,11 +1332,12 @@ pub mod tests { ); // Check that the frame wasn't deferred. - assert!(!th.net().rx_deferred_frame); + assert!(th.net().rx_buffer.deferred_descriptor.is_none()); // Check that the used queue has advanced. assert_eq!(th.rxq.used.idx.get(), 1); assert!(&th.net().irq_trigger.has_pending_irq(IrqType::Vring)); // Check that the frame has been written successfully to the Rx descriptor chain. + header_set_num_buffers(frame.as_mut_slice(), 1); th.rxq .check_used_elem(0, 3, frame.len().try_into().unwrap()); th.rxq.dtable[3].check_data(&frame[..100]); @@ -1288,16 +1356,24 @@ pub mod tests { th.add_desc_chain( NetQueue::Rx, 0, - &[(0, 500, VIRTQ_DESC_F_WRITE), (1, 500, VIRTQ_DESC_F_WRITE)], + &[ + (0, 500, VIRTQ_DESC_F_WRITE), + (1, 500, VIRTQ_DESC_F_WRITE), + (2, 526, VIRTQ_DESC_F_WRITE), + ], ); th.add_desc_chain( NetQueue::Rx, - 1000, - &[(2, 500, VIRTQ_DESC_F_WRITE), (3, 500, VIRTQ_DESC_F_WRITE)], + 2000, + &[ + (3, 500, VIRTQ_DESC_F_WRITE), + (4, 500, VIRTQ_DESC_F_WRITE), + (5, 526, VIRTQ_DESC_F_WRITE), + ], ); // Inject 2 frames to tap and run epoll. - let frame_1 = inject_tap_tx_frame(&th.net(), 200); - let frame_2 = inject_tap_tx_frame(&th.net(), 300); + let mut frame_1 = inject_tap_tx_frame(&th.net(), 200); + let mut frame_2 = inject_tap_tx_frame(&th.net(), 300); check_metric_after_block!( th.net().metrics.rx_packets_count, 2, @@ -1305,20 +1381,24 @@ pub mod tests { ); // Check that the frames weren't deferred. - assert!(!th.net().rx_deferred_frame); + assert!(th.net().rx_buffer.deferred_descriptor.is_none()); // Check that the used queue has advanced. assert_eq!(th.rxq.used.idx.get(), 2); assert!(&th.net().irq_trigger.has_pending_irq(IrqType::Vring)); // Check that the 1st frame was written successfully to the 1st Rx descriptor chain. + header_set_num_buffers(frame_1.as_mut_slice(), 1); th.rxq .check_used_elem(0, 0, frame_1.len().try_into().unwrap()); th.rxq.dtable[0].check_data(&frame_1); th.rxq.dtable[1].check_data(&[0; 500]); + th.rxq.dtable[2].check_data(&[0; 526]); // Check that the 2nd frame was written successfully to the 2nd Rx descriptor chain. + header_set_num_buffers(frame_2.as_mut_slice(), 1); th.rxq - .check_used_elem(1, 2, frame_2.len().try_into().unwrap()); - th.rxq.dtable[2].check_data(&frame_2); - th.rxq.dtable[3].check_data(&[0; 500]); + .check_used_elem(1, 3, frame_2.len().try_into().unwrap()); + th.rxq.dtable[3].check_data(&frame_2); + th.rxq.dtable[4].check_data(&[0; 500]); + th.rxq.dtable[2].check_data(&[0; 526]); } #[test] @@ -1605,6 +1685,19 @@ pub mod tests { fn test_mmds_detour_and_injection() { let mut net = default_net(); + // Inject a fake buffer in the devices buffers, otherwise we won't be able to receive the + // MMDS frame. One iovec will be just fine. + let mut fake_buffer = vec![0u8; 1024]; + let iov_buffer = IoVecBufferMut::from(fake_buffer.as_mut_slice()); + net.rx_buffer.iovec = iov_buffer; + net.rx_buffer + .parsed_descriptors + .push_back(ParsedDescriptorChain { + head_index: 1, + length: 1024, + nr_iovecs: 1, + }); + let src_mac = MacAddr::from_str("11:11:11:11:11:11").unwrap(); let src_ip = Ipv4Addr::new(10, 1, 2, 3); let dst_mac = MacAddr::from_str("22:22:22:22:22:22").unwrap(); @@ -1721,8 +1814,12 @@ pub mod tests { // SAFETY: its a valid fd unsafe { libc::close(th.net.lock().unwrap().tap.as_raw_fd()) }; - // The RX queue is empty and rx_deffered_frame is set. - th.net().rx_deferred_frame = true; + // The RX queue is empty and there is a deferred frame. + th.net().rx_buffer.deferred_descriptor = Some(ParsedDescriptorChain { + head_index: 1, + length: 100, + nr_iovecs: 1, + }); check_metric_after_block!( th.net().metrics.no_rx_avail_buffer, 1, @@ -1732,68 +1829,14 @@ pub mod tests { // We need to set this here to false, otherwise the device will try to // handle a deferred frame, it will fail and will never try to read from // the tap. - th.net().rx_deferred_frame = false; + th.net().rx_buffer.deferred_descriptor = None; - // Fake an avail buffer; this time, tap reading should error out. - th.rxq.avail.idx.set(1); - check_metric_after_block!( - th.net().metrics.tap_read_fails, - 1, - th.simulate_event(NetEvent::Tap) - ); - } - - #[test] - fn test_deferred_frame() { - let mem = single_region_mem(2 * MAX_BUFFER_SIZE); - let mut th = TestHelper::get_default(&mem); - th.activate_net(); - - let rx_packets_count = th.net().metrics.rx_packets_count.count(); - let _ = inject_tap_tx_frame(&th.net(), 1000); - // Trigger a Tap event that. This should fail since there - // are not any available descriptors in the queue - check_metric_after_block!( - th.net().metrics.no_rx_avail_buffer, - 1, - th.simulate_event(NetEvent::Tap) - ); - // The frame we read from the tap should be deferred now and - // no frames should have been transmitted - assert!(th.net().rx_deferred_frame); - assert_eq!(th.net().metrics.rx_packets_count.count(), rx_packets_count); - - // Let's add a second frame, which should really have the same - // fate. - let _ = inject_tap_tx_frame(&th.net(), 1000); - - // Adding a descriptor in the queue. This should handle the first deferred - // frame. However, this should try to handle the second tap as well and fail - // since there's only one Descriptor Chain in the queue. th.add_desc_chain(NetQueue::Rx, 0, &[(0, 4096, VIRTQ_DESC_F_WRITE)]); check_metric_after_block!( - th.net().metrics.no_rx_avail_buffer, + th.net().metrics.tap_read_fails, 1, th.simulate_event(NetEvent::Tap) ); - // We should still have a deferred frame - assert!(th.net().rx_deferred_frame); - // However, we should have delivered the first frame - assert_eq!( - th.net().metrics.rx_packets_count.count(), - rx_packets_count + 1 - ); - - // Let's add one more descriptor and try to handle the last frame as well. - th.add_desc_chain(NetQueue::Rx, 0, &[(0, 4096, VIRTQ_DESC_F_WRITE)]); - check_metric_after_block!( - th.net().metrics.rx_packets_count, - 1, - th.simulate_event(NetEvent::RxQueue) - ); - - // We should be done with any deferred frame - assert!(!th.net().rx_deferred_frame); } #[test] @@ -1908,10 +1951,10 @@ pub mod tests { let mut rl = RateLimiter::new(1000, 0, 500, 0, 0, 0).unwrap(); // set up RX - assert!(!th.net().rx_deferred_frame); + assert!(th.net().rx_buffer.deferred_descriptor.is_none()); th.add_desc_chain(NetQueue::Rx, 0, &[(0, 4096, VIRTQ_DESC_F_WRITE)]); - let frame = inject_tap_tx_frame(&th.net(), 1000); + let mut frame = inject_tap_tx_frame(&th.net(), 1000); // use up the budget (do it after injecting the tx frame, as socket communication is // slow enough that the ratelimiter could replenish in the meantime). @@ -1928,7 +1971,7 @@ pub mod tests { // assert that limiter is blocked assert!(th.net().rx_rate_limiter.is_blocked()); assert_eq!(th.net().metrics.rx_rate_limiter_throttled.count(), 1); - assert!(th.net().rx_deferred_frame); + assert!(th.net().rx_buffer.deferred_descriptor.is_some()); // assert that no operation actually completed (limiter blocked it) assert!(&th.net().irq_trigger.has_pending_irq(IrqType::Vring)); // make sure the data is still queued for processing @@ -1963,6 +2006,7 @@ pub mod tests { assert_eq!(th.rxq.used.idx.get(), 1); th.rxq .check_used_elem(0, 0, frame.len().try_into().unwrap()); + header_set_num_buffers(frame.as_mut_slice(), 1); th.rxq.dtable[0].check_data(&frame); } } @@ -2026,9 +2070,9 @@ pub mod tests { let mut rl = RateLimiter::new(0, 0, 0, 1, 0, 500).unwrap(); // set up RX - assert!(!th.net().rx_deferred_frame); + assert!(th.net().rx_buffer.deferred_descriptor.is_none()); th.add_desc_chain(NetQueue::Rx, 0, &[(0, 4096, VIRTQ_DESC_F_WRITE)]); - let frame = inject_tap_tx_frame(&th.net(), 1234); + let mut frame = inject_tap_tx_frame(&th.net(), 1234); // use up the initial budget assert!(rl.consume(1, TokenType::Ops)); @@ -2048,7 +2092,7 @@ pub mod tests { // assert that limiter is blocked assert!(th.net().rx_rate_limiter.is_blocked()); assert!(th.net().metrics.rx_rate_limiter_throttled.count() >= 1); - assert!(th.net().rx_deferred_frame); + assert!(th.net().rx_buffer.deferred_descriptor.is_some()); // assert that no operation actually completed (limiter blocked it) assert!(&th.net().irq_trigger.has_pending_irq(IrqType::Vring)); // make sure the data is still queued for processing @@ -2075,6 +2119,7 @@ pub mod tests { assert_eq!(th.rxq.used.idx.get(), 1); th.rxq .check_used_elem(0, 0, frame.len().try_into().unwrap()); + header_set_num_buffers(frame.as_mut_slice(), 1); th.rxq.dtable[0].check_data(&frame); } } diff --git a/src/vmm/src/devices/virtio/net/mod.rs b/src/vmm/src/devices/virtio/net/mod.rs index 1a7972595ad..e8a3f86ac72 100644 --- a/src/vmm/src/devices/virtio/net/mod.rs +++ b/src/vmm/src/devices/virtio/net/mod.rs @@ -27,8 +27,10 @@ pub mod test_utils; mod gen; pub use tap::{Tap, TapError}; +use vm_memory::VolatileMemoryError; pub use self::device::Net; +use super::iovec::IoVecError; /// Enum representing the Net device queue types #[derive(Debug)] @@ -50,6 +52,10 @@ pub enum NetError { EventFd(io::Error), /// IO error: {0} IO(io::Error), + /// Error writing in guest memory: {0} + GuestMemoryError(#[from] VolatileMemoryError), /// The VNET header is missing from the frame VnetHeaderMissing, + /// IoVecBuffer(Mut) error: {0} + IoVecError(#[from] IoVecError), } diff --git a/src/vmm/src/devices/virtio/net/tap.rs b/src/vmm/src/devices/virtio/net/tap.rs index 20024a1ae8e..4d1757edc8e 100644 --- a/src/vmm/src/devices/virtio/net/tap.rs +++ b/src/vmm/src/devices/virtio/net/tap.rs @@ -7,7 +7,7 @@ use std::fmt::{self, Debug}; use std::fs::File; -use std::io::{Error as IoError, Read}; +use std::io::Error as IoError; use std::os::raw::*; use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; @@ -190,11 +190,19 @@ impl Tap { } Ok(usize::try_from(ret).unwrap()) } -} -impl Read for Tap { - fn read(&mut self, buf: &mut [u8]) -> Result { - self.tap_file.read(buf) + /// Read from tap to an `IoVecBufferMut` + pub(crate) fn read_iovec(&mut self, buffer: &mut [libc::iovec]) -> Result { + let iov = buffer.as_mut_ptr(); + let iovcnt = buffer.len().try_into().unwrap(); + + // SAFETY: `readv` is safe. Called with a valid tap fd, the iovec pointer and length + // is provide by the `IoVecBufferMut` implementation and we check the return value. + let ret = unsafe { libc::readv(self.tap_file.as_raw_fd(), iov, iovcnt) }; + if ret == -1 { + return Err(IoError::last_os_error()); + } + Ok(usize::try_from(ret).unwrap()) } } @@ -211,6 +219,7 @@ pub mod tests { use std::os::unix::ffi::OsStrExt; use super::*; + use crate::devices::virtio::iovec::IoVecBufferMut; use crate::devices::virtio::net::gen; use crate::devices::virtio::net::test_utils::{enable, if_index, TapTrafficSimulator}; @@ -218,7 +227,6 @@ pub mod tests { const VNET_HDR_SIZE: usize = 10; const PAYLOAD_SIZE: usize = 512; - const PACKET_SIZE: usize = 1024; #[test] fn test_tap_name() { @@ -287,23 +295,6 @@ pub mod tests { assert_eq!(tap.as_raw_fd(), tap.tap_file.as_raw_fd()); } - #[test] - fn test_read() { - let mut tap = Tap::open_named("").unwrap(); - enable(&tap); - let tap_traffic_simulator = TapTrafficSimulator::new(if_index(&tap)); - - let packet = vmm_sys_util::rand::rand_alphanumerics(PAYLOAD_SIZE); - tap_traffic_simulator.push_tx_packet(packet.as_bytes()); - - let mut buf = [0u8; PACKET_SIZE]; - assert_eq!(tap.read(&mut buf).unwrap(), PAYLOAD_SIZE + VNET_HDR_SIZE); - assert_eq!( - &buf[VNET_HDR_SIZE..packet.len() + VNET_HDR_SIZE], - packet.as_bytes() - ); - } - #[test] fn test_write_iovec() { let mut tap = Tap::open_named("").unwrap(); @@ -339,4 +330,26 @@ pub mod tests { fragment3 ); } + + #[test] + fn test_read_iovec() { + let mut tap = Tap::open_named("").unwrap(); + enable(&tap); + let tap_traffic_simulator = TapTrafficSimulator::new(if_index(&tap)); + + let mut buff1 = vec![0; PAYLOAD_SIZE + VNET_HDR_SIZE]; + let mut buff2 = vec![0; 2 * PAYLOAD_SIZE]; + + let mut rx_buffers = IoVecBufferMut::from(vec![buff1.as_mut_slice(), buff2.as_mut_slice()]); + + let packet = vmm_sys_util::rand::rand_alphanumerics(2 * PAYLOAD_SIZE); + tap_traffic_simulator.push_tx_packet(packet.as_bytes()); + assert_eq!( + tap.read_iovec(rx_buffers.as_iovec_mut_slice()).unwrap(), + 2 * PAYLOAD_SIZE + VNET_HDR_SIZE + ); + assert_eq!(&buff1[VNET_HDR_SIZE..], &packet.as_bytes()[..PAYLOAD_SIZE]); + assert_eq!(&buff2[..PAYLOAD_SIZE], &packet.as_bytes()[PAYLOAD_SIZE..]); + assert_eq!(&buff2[PAYLOAD_SIZE..], &vec![0; PAYLOAD_SIZE]) + } } diff --git a/src/vmm/src/devices/virtio/net/test_utils.rs b/src/vmm/src/devices/virtio/net/test_utils.rs index 07808bbb44b..eb1c6f6e883 100644 --- a/src/vmm/src/devices/virtio/net/test_utils.rs +++ b/src/vmm/src/devices/virtio/net/test_utils.rs @@ -430,8 +430,9 @@ pub mod test { event_fd.write(1).unwrap(); } - /// Generate a tap frame of `frame_len` and check that it is deferred - pub fn check_rx_deferred_frame(&mut self, frame_len: usize) -> Vec { + /// Generate a tap frame of `frame_len` and check that it is not read and + /// the descriptor chain has been discarded + pub fn check_rx_discarded_buffer(&mut self, frame_len: usize) -> Vec { let used_idx = self.rxq.used.idx.get(); // Inject frame to tap and run epoll. @@ -441,8 +442,6 @@ pub mod test { 0, self.event_manager.run_with_timeout(100).unwrap() ); - // Check that the frame has been deferred. - assert!(self.net().rx_deferred_frame); // Check that the descriptor chain has been discarded. assert_eq!(self.rxq.used.idx.get(), used_idx + 1); assert!(&self.net().irq_trigger.has_pending_irq(IrqType::Vring)); @@ -454,16 +453,9 @@ pub mod test { /// is eventually received by the guest pub fn check_rx_queue_resume(&mut self, expected_frame: &[u8]) { let used_idx = self.rxq.used.idx.get(); - // Add a valid Rx avail descriptor chain and run epoll. - self.add_desc_chain( - NetQueue::Rx, - 0, - &[( - 0, - u32::try_from(expected_frame.len()).unwrap(), - VIRTQ_DESC_F_WRITE, - )], - ); + // Add a valid Rx avail descriptor chain and run epoll. We do not negotiate any feature + // offloading so the buffers need to be at least 1526 bytes long. + self.add_desc_chain(NetQueue::Rx, 0, &[(0, 1526, VIRTQ_DESC_F_WRITE)]); check_metric_after_block!( self.net().metrics.rx_packets_count, 1,