Skip to content

Commit

Permalink
fix(pty): use async io to avoid polling
Browse files Browse the repository at this point in the history
This patch fixes zellij-org#509 by using async read instead of polling a
non-blocking fd. This reduces CPU usage when the ptys are idle.
  • Loading branch information
kxt committed May 19, 2021
1 parent 68445af commit efaef8f
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 90 deletions.
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ structopt = "0.3"
interprocess = "1.1.1"
vte = "0.10.1"
nix = "0.19.1"
async-trait = "0.1.50"

[dependencies.async-std]
version = "1.3.0"
features = ["unstable"]

[dev-dependencies]
insta = "1.6.0"
Expand Down
36 changes: 35 additions & 1 deletion src/tests/fakes.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::tests::possible_tty_inputs::{get_possible_tty_inputs, Bytes};
use crate::tests::utils::commands::{QUIT, SLEEP};
use async_trait::async_trait;
use interprocess::local_socket::LocalSocketStream;
use std::collections::{HashMap, VecDeque};
use std::io::Write;
Expand All @@ -8,7 +9,7 @@ use std::path::PathBuf;
use std::sync::{mpsc, Arc, Condvar, Mutex};
use std::time::{Duration, Instant};
use zellij_client::os_input_output::ClientOsApi;
use zellij_server::os_input_output::{Pid, ServerOsApi};
use zellij_server::os_input_output::{AsyncReader, Pid, ServerOsApi};
use zellij_tile::data::Palette;
use zellij_utils::{
channels::{ChannelWithContext, SenderType, SenderWithContext},
Expand Down Expand Up @@ -224,6 +225,33 @@ impl ClientOsApi for FakeInputOutput {
}
}

struct FakeAsyncReader {
fd: RawFd,
os_api: Box<dyn ServerOsApi>,
}

#[async_trait]
impl AsyncReader for FakeAsyncReader {
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
// simulates async semantics: EAGAIN is not propagated to caller
loop {
let res = self.os_api.read_from_tty_stdout(self.fd, buf);
match res {
Err(nix::Error::Sys(nix::errno::Errno::EAGAIN)) => {
async_std::task::sleep(::std::time::Duration::from_millis(10)).await;
continue;
}
Err(e) => {
break Err(std::io::Error::from_raw_os_error(
e.as_errno().unwrap() as i32
))
}
Ok(n_bytes) => break Ok(n_bytes),
}
}
}
}

impl ServerOsApi for FakeInputOutput {
fn set_terminal_size_using_fd(&self, pid: RawFd, cols: u16, rows: u16) {
let terminal_input = self
Expand Down Expand Up @@ -274,6 +302,12 @@ impl ServerOsApi for FakeInputOutput {
None => Err(nix::Error::Sys(nix::errno::Errno::EAGAIN)),
}
}
fn async_reader(&self, fd: RawFd) -> Box<dyn AsyncReader> {
Box::new(FakeAsyncReader {
fd,
os_api: ServerOsApi::box_clone(self),
})
}
fn tcdrain(&self, pid: RawFd) -> Result<(), nix::Error> {
self.io_events.lock().unwrap().push(IoEvent::TcDrain(pid));
Ok(())
Expand Down
1 change: 1 addition & 0 deletions zellij-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ libc = "0.2"
serde_json = "1.0"
daemonize = "0.4.1"
interprocess = "1.1.1"
async-trait = "0.1.50"

[dependencies.async-std]
version = "1.3.0"
Expand Down
32 changes: 29 additions & 3 deletions zellij-server/src/os_input_output.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use async_std::fs::File as AsyncFile;
use async_std::os::unix::io::FromRawFd;
use interprocess::local_socket::LocalSocketStream;
use nix::fcntl::{fcntl, FcntlArg, OFlag};
use nix::pty::{forkpty, Winsize};
use nix::sys::signal::{kill, Signal};
use nix::sys::termios;
Expand All @@ -18,7 +19,11 @@ use zellij_utils::ipc::{
};
use zellij_utils::shared::default_palette;

use async_std::io::ReadExt;
use async_trait::async_trait;

pub use nix::unistd::Pid;

pub(crate) fn set_terminal_size_using_fd(fd: RawFd, columns: u16, rows: u16) {
// TODO: do this with the nix ioctl
use libc::ioctl;
Expand Down Expand Up @@ -85,8 +90,6 @@ fn spawn_terminal(file_to_open: Option<PathBuf>, orig_termios: termios::Termios)
let pid_secondary = match fork_pty_res.fork_result {
ForkResult::Parent { child } => {
// fcntl(pid_primary, FcntlArg::F_SETFL(OFlag::empty())).expect("could not fcntl");
fcntl(pid_primary, FcntlArg::F_SETFL(OFlag::O_NONBLOCK))
.expect("could not fcntl");
child
}
ForkResult::Child => match file_to_open {
Expand Down Expand Up @@ -130,6 +133,22 @@ pub struct ServerOsInputOutput {
send_instructions_to_client: Arc<Mutex<Option<IpcSenderWithContext<ServerToClientMsg>>>>,
}

#[async_trait]
pub trait AsyncReader: Send + Sync {
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error>;
}

struct RawFdAsyncReader {
fd: async_std::fs::File,
}

#[async_trait]
impl AsyncReader for RawFdAsyncReader {
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
self.fd.read(buf).await
}
}

/// The `ServerOsApi` trait represents an abstract interface to the features of an operating system that
/// Zellij server requires.
pub trait ServerOsApi: Send + Sync {
Expand All @@ -139,6 +158,8 @@ pub trait ServerOsApi: Send + Sync {
fn spawn_terminal(&self, file_to_open: Option<PathBuf>) -> (RawFd, Pid);
/// Read bytes from the standard output of the virtual terminal referred to by `fd`.
fn read_from_tty_stdout(&self, fd: RawFd, buf: &mut [u8]) -> Result<usize, nix::Error>;
///
fn async_reader(&self, fd: RawFd) -> Box<dyn AsyncReader>;
/// Write bytes to the standard input of the virtual terminal referred to by `fd`.
fn write_to_tty_stdin(&self, fd: RawFd, buf: &[u8]) -> Result<usize, nix::Error>;
/// Wait until all output written to the object referred to by `fd` has been transmitted.
Expand Down Expand Up @@ -169,6 +190,11 @@ impl ServerOsApi for ServerOsInputOutput {
fn read_from_tty_stdout(&self, fd: RawFd, buf: &mut [u8]) -> Result<usize, nix::Error> {
unistd::read(fd, buf)
}
fn async_reader(&self, fd: RawFd) -> Box<dyn AsyncReader> {
Box::new(RawFdAsyncReader {
fd: unsafe { AsyncFile::from_raw_fd(fd) },
})
}
fn write_to_tty_stdin(&self, fd: RawFd, buf: &[u8]) -> Result<usize, nix::Error> {
unistd::write(fd, buf)
}
Expand Down
115 changes: 29 additions & 86 deletions zellij-server/src/pty.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use async_std::stream::*;
use async_std::task;
use async_std::task::*;
use async_std::future::timeout as async_timeout;
use async_std::task::{self, JoinHandle};
use std::collections::HashMap;
use std::os::unix::io::RawFd;
use std::path::PathBuf;
use std::pin::*;
use std::time::{Duration, Instant};

use crate::{
Expand All @@ -21,52 +19,6 @@ use zellij_utils::{
logging::debug_to_file,
};

pub struct ReadFromPid {
pid: RawFd,
os_input: Box<dyn ServerOsApi>,
}

impl ReadFromPid {
pub fn new(pid: &RawFd, os_input: Box<dyn ServerOsApi>) -> ReadFromPid {
ReadFromPid {
pid: *pid,
os_input,
}
}
}

impl Stream for ReadFromPid {
type Item = Vec<u8>;
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut read_buffer = [0; 65535];
let pid = self.pid;
let read_result = &self.os_input.read_from_tty_stdout(pid, &mut read_buffer);
match read_result {
Ok(res) => {
if *res == 0 {
// indicates end of file
Poll::Ready(None)
} else {
let res = Some(read_buffer[..*res].to_vec());
Poll::Ready(res)
}
}
Err(e) => {
match e {
nix::Error::Sys(errno) => {
if *errno == nix::errno::Errno::EAGAIN {
Poll::Ready(Some(vec![])) // TODO: better with timeout waker somehow
} else {
Poll::Ready(None)
}
}
_ => Poll::Ready(None),
}
}
}
}
}

pub type VteBytes = Vec<u8>;

/// Instructions related to PTYs (pseudoterminals).
Expand Down Expand Up @@ -168,50 +120,41 @@ fn stream_terminal_bytes(
task::spawn({
async move {
err_ctx.add_call(ContextType::AsyncTask);
let mut terminal_bytes = ReadFromPid::new(&pid, os_input);

let mut last_byte_receive_time: Option<Instant> = None;
let mut pending_render = false;
let max_render_pause = Duration::from_millis(30);
let infinity = Duration::from_secs(120);
let mut buf = [0u8; 65536];

while let Some(bytes) = terminal_bytes.next().await {
let bytes_is_empty = bytes.is_empty();
if debug {
debug_to_file(&bytes, pid).unwrap();
}
if !bytes_is_empty {
let _ = senders.send_to_screen(ScreenInstruction::PtyBytes(pid, bytes));
// for UX reasons, if we got something on the wire, we only send the render notice if:
// 1. there aren't any more bytes on the wire afterwards
// 2. a certain period (currently 30ms) has elapsed since the last render
// (otherwise if we get a large amount of data, the display would hang
// until it's done)
// 3. the stream has ended, and so we render 1 last time
match last_byte_receive_time.as_mut() {
Some(receive_time) => {
if receive_time.elapsed() > max_render_pause {
pending_render = false;
let _ = senders.send_to_screen(ScreenInstruction::Render);
last_byte_receive_time = Some(Instant::now());
} else {
pending_render = true;
let mut async_reader = os_input.async_reader(pid);
'read_loop: loop {
// For each read, we keep on reading additional data up to a duration of
// max_render_pause. This is in order to batch up PtyBytes before rendering them.
let deadline = Instant::now() + max_render_pause;
let mut timeout = infinity;
'batcher_loop: while let Ok(res) =
async_timeout(timeout, async_reader.read(&mut buf)).await
{
match res {
Ok(0) | Err(_) => break 'read_loop, // eof or error
Ok(n_bytes) => {
let bytes = &buf[..n_bytes];
if debug {
let _ = debug_to_file(bytes, pid);
}
let _ = senders
.send_to_screen(ScreenInstruction::PtyBytes(pid, bytes.to_vec()));
timeout = match deadline.checked_duration_since(Instant::now()) {
None => break 'batcher_loop,
Some(t) => t,
}
}
None => {
last_byte_receive_time = Some(Instant::now());
pending_render = true;
}
};
} else {
if pending_render {
pending_render = false;
let _ = senders.send_to_screen(ScreenInstruction::Render);
}
last_byte_receive_time = None;
task::sleep(::std::time::Duration::from_millis(10)).await;
}
let _ = senders.send_to_screen(ScreenInstruction::Render);
task::sleep(::std::time::Duration::from_millis(10)).await;
}
senders.send_to_screen(ScreenInstruction::Render).unwrap();
let _ = senders.send_to_screen(ScreenInstruction::Render);

#[cfg(not(any(feature = "test", test)))]
// this is a little hacky, and is because the tests end the file as soon as
// we read everything, rather than hanging until there is new data
Expand Down

0 comments on commit efaef8f

Please sign in to comment.