diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index cb588c6..1662e10 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -55,6 +55,12 @@ jobs: # the backend that uses poll, and is not a public API. RUSTFLAGS: ${{ env.RUSTFLAGS }} --cfg polling_test_poll_backend if: startsWith(matrix.os, 'ubuntu') + - run: cargo test + env: + # Note: This cfg is intended to make it easy for polling developers to test + # the backend that uses pipes, and is not a public API. + RUSTFLAGS: ${{ env.RUSTFLAGS }} --cfg polling_test_epoll_pipe + if: startsWith(matrix.os, 'ubuntu') - run: cargo hack build --feature-powerset --no-dev-deps - name: Add rust-src if: startsWith(matrix.rust, 'nightly') diff --git a/src/epoll.rs b/src/epoll.rs index 6d39f9f..222a41d 100644 --- a/src/epoll.rs +++ b/src/epoll.rs @@ -7,7 +7,9 @@ use std::time::Duration; use rustix::event::{epoll, eventfd, EventfdFlags}; use rustix::fd::OwnedFd; -use rustix::io::{read, write}; +use rustix::fs::{fcntl_getfl, fcntl_setfl, OFlags}; +use rustix::io::{fcntl_getfd, fcntl_setfd, read, write, FdFlags}; +use rustix::pipe::{pipe, pipe_with, PipeFlags}; use rustix::time::{ timerfd_create, timerfd_settime, Itimerspec, TimerfdClockId, TimerfdFlags, TimerfdTimerFlags, Timespec, @@ -20,8 +22,10 @@ use crate::{Event, PollMode}; pub struct Poller { /// File descriptor for the epoll instance. epoll_fd: OwnedFd, - /// File descriptor for the eventfd that produces notifications. - event_fd: OwnedFd, + + /// Notifier used to wake up epoll. + notifier: Notifier, + /// File descriptor for the timerfd that produces timeouts. timer_fd: Option, } @@ -34,8 +38,8 @@ impl Poller { // Use `epoll_create1` with `EPOLL_CLOEXEC`. let epoll_fd = epoll::create(epoll::CreateFlags::CLOEXEC)?; - // Set up eventfd and timerfd. - let event_fd = eventfd(0, EventfdFlags::CLOEXEC | EventfdFlags::NONBLOCK)?; + // Set up notifier and timerfd. + let notifier = Notifier::new()?; let timer_fd = timerfd_create( TimerfdClockId::Monotonic, TimerfdFlags::CLOEXEC | TimerfdFlags::NONBLOCK, @@ -44,7 +48,7 @@ impl Poller { let poller = Poller { epoll_fd, - event_fd, + notifier, timer_fd, }; @@ -58,7 +62,7 @@ impl Poller { } poller.add( - poller.event_fd.as_raw_fd(), + poller.notifier.as_fd().as_raw_fd(), Event::readable(crate::NOTIFY_KEY), PollMode::Oneshot, )?; @@ -66,7 +70,7 @@ impl Poller { tracing::trace!( epoll_fd = ?poller.epoll_fd.as_raw_fd(), - event_fd = ?poller.event_fd.as_raw_fd(), + notifier = ?poller.notifier, timer_fd = ?poller.timer_fd, "new", ); @@ -201,10 +205,9 @@ impl Poller { ); // Clear the notification (if received) and re-register interest in it. - let mut buf = [0u8; 8]; - let _ = read(&self.event_fd, &mut buf); + self.notifier.clear(); self.modify( - self.event_fd.as_fd(), + self.notifier.as_fd(), Event::readable(crate::NOTIFY_KEY), PollMode::Oneshot, )?; @@ -216,12 +219,11 @@ impl Poller { let span = tracing::trace_span!( "notify", epoll_fd = ?self.epoll_fd.as_raw_fd(), - event_fd = ?self.event_fd.as_raw_fd(), + notifier = ?self.notifier, ); let _enter = span.enter(); - let buf: [u8; 8] = 1u64.to_ne_bytes(); - let _ = write(&self.event_fd, &buf); + self.notifier.notify(); Ok(()) } } @@ -243,7 +245,7 @@ impl Drop for Poller { let span = tracing::trace_span!( "drop", epoll_fd = ?self.epoll_fd.as_raw_fd(), - event_fd = ?self.event_fd.as_raw_fd(), + notifier = ?self.notifier, timer_fd = ?self.timer_fd ); let _enter = span.enter(); @@ -251,7 +253,7 @@ impl Drop for Poller { if let Some(timer_fd) = self.timer_fd.take() { let _ = self.delete(timer_fd.as_fd()); } - let _ = self.delete(self.event_fd.as_fd()); + let _ = self.delete(self.notifier.as_fd()); } } @@ -365,3 +367,97 @@ impl EventExtra { self.flags.contains(epoll::EventFlags::PRI) } } + +/// The notifier for Linux. +/// +/// Certain container runtimes do not expose eventfd to the client, as it relies on the host and +/// can be used to "escape" the container under certain conditions. Gramine is the prime example, +/// see [here](gramine). In this case, fall back to using a pipe. +/// +/// [gramine]: https://gramine.readthedocs.io/en/stable/manifest-syntax.html#allowing-eventfd +#[derive(Debug)] +enum Notifier { + /// The primary notifier, using eventfd. + EventFd(OwnedFd), + + /// The fallback notifier, using a pipe. + Pipe { + /// The read end of the pipe. + read_pipe: OwnedFd, + + /// The write end of the pipe. + write_pipe: OwnedFd, + }, +} + +impl Notifier { + /// Create a new notifier. + fn new() -> io::Result { + // Skip eventfd for testing if necessary. + if !cfg!(polling_test_epoll_pipe) { + // Try to create an eventfd. + match eventfd(0, EventfdFlags::CLOEXEC | EventfdFlags::NONBLOCK) { + Ok(fd) => { + tracing::trace!("created eventfd for notifier"); + return Ok(Notifier::EventFd(fd)); + } + + Err(err) => { + tracing::warn!( + "eventfd() failed with error ({}), falling back to pipe", + err + ); + } + } + } + + let (read, write) = pipe_with(PipeFlags::CLOEXEC).or_else(|_| { + let (read, write) = pipe()?; + fcntl_setfd(&read, fcntl_getfd(&read)? | FdFlags::CLOEXEC)?; + fcntl_setfd(&write, fcntl_getfd(&write)? | FdFlags::CLOEXEC)?; + io::Result::Ok((read, write)) + })?; + + fcntl_setfl(&read, fcntl_getfl(&read)? | OFlags::NONBLOCK)?; + Ok(Notifier::Pipe { + read_pipe: read, + write_pipe: write, + }) + } + + /// The file descriptor to register in the poller. + fn as_fd(&self) -> BorrowedFd<'_> { + match self { + Notifier::EventFd(fd) => fd.as_fd(), + Notifier::Pipe { + read_pipe: read, .. + } => read.as_fd(), + } + } + + /// Notify the poller. + fn notify(&self) { + match self { + Self::EventFd(fd) => { + let buf: [u8; 8] = 1u64.to_ne_bytes(); + let _ = write(fd, &buf); + } + + Self::Pipe { write_pipe, .. } => { + write(write_pipe, &[0; 1]).ok(); + } + } + } + + /// Clear the notification. + fn clear(&self) { + match self { + Self::EventFd(fd) => { + let mut buf = [0u8; 8]; + let _ = read(fd, &mut buf); + } + + Self::Pipe { read_pipe, .. } => while read(read_pipe, &mut [0u8; 1024]).is_ok() {}, + } + } +}