diff --git a/Cargo.lock b/Cargo.lock index b0731cdba1357..f8c99ea4eb1ea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1388,6 +1388,7 @@ checksum = "f476fe445d41c9e991fd07515a6f463074b782242ccf4a5b7b1d1012e70824df" dependencies = [ "bitflags 2.5.0", "crossterm_winapi", + "futures-core", "libc", "mio 0.8.11", "parking_lot", @@ -6569,6 +6570,7 @@ dependencies = [ "console", "crossterm 0.27.0", "dialoguer", + "futures", "indicatif", "indoc", "lazy_static", diff --git a/crates/turborepo-lib/src/commands/run.rs b/crates/turborepo-lib/src/commands/run.rs index 82b3a3debac5c..b3b6e5dada7c9 100644 --- a/crates/turborepo-lib/src/commands/run.rs +++ b/crates/turborepo-lib/src/commands/run.rs @@ -57,7 +57,7 @@ pub async fn run(base: CommandBase, telemetry: CommandEventBuilder) -> Result Visitor<'a> { // Once we have the full picture we will go about grouping these pieces of data // together #[allow(clippy::too_many_arguments)] - pub fn new( + pub async fn new( package_graph: Arc, run_cache: Arc, run_tracker: RunTracker, @@ -133,8 +133,11 @@ impl<'a> Visitor<'a> { let sink = Self::sink(run_opts); let color_cache = ColorSelector::default(); // Set up correct size for underlying pty - if let Some(pane_size) = ui_sender.as_ref().and_then(|sender| sender.pane_size()) { - manager.set_pty_size(pane_size.rows, pane_size.cols); + + if let Some(app) = ui_sender.as_ref() { + if let Some(pane_size) = app.pane_size().await { + manager.set_pty_size(pane_size.rows, pane_size.cols); + } } Self { @@ -330,7 +333,7 @@ impl<'a> Visitor<'a> { if !self.is_watch { if let Some(handle) = &self.ui_sender { - handle.stop(); + handle.stop().await; } } diff --git a/crates/turborepo-ui/Cargo.toml b/crates/turborepo-ui/Cargo.toml index a75108233c79f..9c0a0caf35c5b 100644 --- a/crates/turborepo-ui/Cargo.toml +++ b/crates/turborepo-ui/Cargo.toml @@ -24,8 +24,9 @@ axum-server = { workspace = true } base64 = "0.22" chrono = { workspace = true } console = { workspace = true } -crossterm = "0.27.0" +crossterm = { version = "0.27.0", features = ["event-stream"] } dialoguer = { workspace = true } +futures = { workspace = true } indicatif = { workspace = true } lazy_static = { workspace = true } nix = { version = "0.26.2", features = ["signal"] } @@ -34,7 +35,6 @@ serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } - tracing = { workspace = true } tui-term = { workspace = true } turbopath = { workspace = true } diff --git a/crates/turborepo-ui/src/sender.rs b/crates/turborepo-ui/src/sender.rs index a7b3c11d39be1..0fd6f84690bb6 100644 --- a/crates/turborepo-ui/src/sender.rs +++ b/crates/turborepo-ui/src/sender.rs @@ -62,9 +62,9 @@ impl UISender { UISender::Wui(sender) => sender.task(task), } } - pub fn stop(&self) { + pub async fn stop(&self) { match self { - UISender::Tui(sender) => sender.stop(), + UISender::Tui(sender) => sender.stop().await, UISender::Wui(sender) => sender.stop(), } } @@ -75,9 +75,9 @@ impl UISender { } } - pub fn pane_size(&self) -> Option { + pub async fn pane_size(&self) -> Option { match self { - UISender::Tui(sender) => sender.pane_size(), + UISender::Tui(sender) => sender.pane_size().await, // Not applicable to the web UI UISender::Wui(_) => None, } diff --git a/crates/turborepo-ui/src/tui/app.rs b/crates/turborepo-ui/src/tui/app.rs index 4d83a65d10a87..8ba6e9a8d2539 100644 --- a/crates/turborepo-ui/src/tui/app.rs +++ b/crates/turborepo-ui/src/tui/app.rs @@ -2,8 +2,7 @@ use std::{ collections::BTreeMap, io::{self, Stdout, Write}, mem, - sync::mpsc, - time::{Duration, Instant}, + time::Duration, }; use ratatui::{ @@ -12,9 +11,13 @@ use ratatui::{ widgets::TableState, Frame, Terminal, }; +use tokio::{ + sync::{mpsc, oneshot}, + time::Instant, +}; use tracing::{debug, trace}; -const FRAMERATE: Duration = Duration::from_millis(3); +pub const FRAMERATE: Duration = Duration::from_millis(3); const RESIZE_DEBOUNCE_DELAY: Duration = Duration::from_millis(10); use super::{ @@ -43,7 +46,6 @@ pub struct App { tasks: BTreeMap>, tasks_by_status: TasksByStatus, focus: LayoutSections, - tty_stdin: bool, scroll: TableState, selected_task_index: usize, has_user_scrolled: bool, @@ -78,8 +80,6 @@ impl App { size, done: false, focus: LayoutSections::TaskList, - // Check if stdin is a tty that we should read input from - tty_stdin: atty::is(atty::Stream::Stdin), tasks: tasks_by_status .task_names_in_displayed_order() .map(|task_name| { @@ -112,7 +112,6 @@ impl App { let has_selection = self.get_full_task()?.has_selection(); Ok(InputOptions { focus: &self.focus, - tty_stdin: self.tty_stdin, has_selection, }) } @@ -558,16 +557,19 @@ impl App { /// Handle the rendering of the `App` widget based on events received by /// `receiver` -pub fn run_app(tasks: Vec, receiver: AppReceiver) -> Result<(), Error> { +pub async fn run_app(tasks: Vec, receiver: AppReceiver) -> Result<(), Error> { let mut terminal = startup()?; let size = terminal.size()?; let mut app: App> = App::new(size.height, size.width, tasks); + let (crossterm_tx, crossterm_rx) = mpsc::channel(1024); + input::start_crossterm_stream(crossterm_tx); - let (result, callback) = match run_app_inner(&mut terminal, &mut app, receiver) { - Ok(callback) => (Ok(()), callback), - Err(err) => (Err(err), None), - }; + let (result, callback) = + match run_app_inner(&mut terminal, &mut app, receiver, crossterm_rx).await { + Ok(callback) => (Ok(()), callback), + Err(err) => (Err(err), None), + }; cleanup(terminal, app, callback)?; @@ -576,18 +578,19 @@ pub fn run_app(tasks: Vec, receiver: AppReceiver) -> Result<(), Error> { // Break out inner loop so we can use `?` without worrying about cleaning up the // terminal. -fn run_app_inner( +async fn run_app_inner( terminal: &mut Terminal, app: &mut App>, - receiver: AppReceiver, -) -> Result>, Error> { + mut receiver: AppReceiver, + mut crossterm_rx: mpsc::Receiver, +) -> Result>, Error> { // Render initial state to paint the screen terminal.draw(|f| view(app, f))?; let mut last_render = Instant::now(); let mut resize_debouncer = Debouncer::new(RESIZE_DEBOUNCE_DELAY); let mut callback = None; let mut needs_rerender = true; - while let Some(event) = poll(app.input_options()?, &receiver, last_render + FRAMERATE) { + while let Some(event) = poll(app.input_options()?, &mut receiver, &mut crossterm_rx).await { // If we only receive ticks, then there's been no state change so no update // needed if !matches!(event, Event::Tick) { @@ -625,13 +628,31 @@ fn run_app_inner( /// Blocking poll for events, will only return None if app handle has been /// dropped -fn poll(input_options: InputOptions, receiver: &AppReceiver, deadline: Instant) -> Option { - match input(input_options) { - Ok(Some(event)) => Some(event), - Ok(None) => receiver.recv(deadline).ok(), - // Unable to read from stdin, shut down and attempt to clean up - Err(_) => Some(Event::InternalStop), - } +async fn poll<'a>( + input_options: InputOptions<'a>, + receiver: &mut AppReceiver, + crossterm_rx: &mut mpsc::Receiver, +) -> Option { + let input_closed = crossterm_rx.is_closed(); + let input_fut = async { + crossterm_rx + .recv() + .await + .and_then(|event| input_options.handle_crossterm_event(event)) + }; + let receiver_fut = async { receiver.recv().await }; + let event_fut = async move { + if input_closed { + receiver_fut.await + } else { + tokio::select! { + e = input_fut => e, + e = receiver_fut => e, + } + } + }; + + event_fut.await } const MIN_HEIGHT: u16 = 10; @@ -672,7 +693,7 @@ fn startup() -> io::Result>> { fn cleanup( mut terminal: Terminal, mut app: App>, - callback: Option>, + callback: Option>, ) -> io::Result<()> { terminal.clear()?; crossterm::execute!( @@ -692,7 +713,7 @@ fn cleanup( fn update( app: &mut App>, event: Event, -) -> Result>, Error> { +) -> Result>, Error> { match event { Event::StartTask { task, output_logs } => { app.start_task(&task, output_logs)?; diff --git a/crates/turborepo-ui/src/tui/event.rs b/crates/turborepo-ui/src/tui/event.rs index 4e6365b9b1cc2..4f609f9a59028 100644 --- a/crates/turborepo-ui/src/tui/event.rs +++ b/crates/turborepo-ui/src/tui/event.rs @@ -1,5 +1,6 @@ use async_graphql::Enum; use serde::Serialize; +use tokio::sync::oneshot; pub enum Event { StartTask { @@ -19,8 +20,8 @@ pub enum Event { status: String, result: CacheResult, }, - PaneSizeQuery(std::sync::mpsc::SyncSender), - Stop(std::sync::mpsc::SyncSender<()>), + PaneSizeQuery(oneshot::Sender), + Stop(oneshot::Sender<()>), // Stop initiated by the TUI itself InternalStop, Tick, diff --git a/crates/turborepo-ui/src/tui/handle.rs b/crates/turborepo-ui/src/tui/handle.rs index 2b8eccff3cc44..f69c913cf0713 100644 --- a/crates/turborepo-ui/src/tui/handle.rs +++ b/crates/turborepo-ui/src/tui/handle.rs @@ -1,6 +1,7 @@ -use std::{sync::mpsc, time::Instant}; +use tokio::sync::{mpsc, oneshot}; use super::{ + app::FRAMERATE, event::{CacheResult, OutputLogs, PaneSize}, Error, Event, TaskResult, }; @@ -9,12 +10,12 @@ use crate::sender::{TaskSender, UISender}; /// Struct for sending app events to TUI rendering #[derive(Debug, Clone)] pub struct TuiSender { - primary: mpsc::Sender, + primary: mpsc::UnboundedSender, } /// Struct for receiving app events pub struct AppReceiver { - primary: mpsc::Receiver, + primary: mpsc::UnboundedReceiver, } impl TuiSender { @@ -23,7 +24,17 @@ impl TuiSender { /// AppSender is meant to be held by the actual task runner /// AppReceiver should be passed to `crate::tui::run_app` pub fn new() -> (Self, AppReceiver) { - let (primary_tx, primary_rx) = mpsc::channel(); + let (primary_tx, primary_rx) = mpsc::unbounded_channel(); + let tick_sender = primary_tx.clone(); + tokio::spawn(async move { + let mut interval = tokio::time::interval(FRAMERATE); + loop { + interval.tick().await; + if tick_sender.send(Event::Tick).is_err() { + break; + } + } + }); ( Self { primary: primary_tx, @@ -70,13 +81,13 @@ impl TuiSender { } /// Stop rendering TUI and restore terminal to default configuration - pub fn stop(&self) { - let (callback_tx, callback_rx) = mpsc::sync_channel(1); + pub async fn stop(&self) { + let (callback_tx, callback_rx) = oneshot::channel(); // Send stop event, if receiver has dropped ignore error as // it'll be a no-op. self.primary.send(Event::Stop(callback_tx)).ok(); // Wait for callback to be sent or the channel closed. - callback_rx.recv().ok(); + callback_rx.await.ok(); } /// Update the list of tasks displayed in the TUI @@ -103,23 +114,19 @@ impl TuiSender { } /// Fetches the size of the terminal pane - pub fn pane_size(&self) -> Option { - let (callback_tx, callback_rx) = mpsc::sync_channel(1); + pub async fn pane_size(&self) -> Option { + let (callback_tx, callback_rx) = oneshot::channel(); // Send query, if no receiver to handle the request return None self.primary.send(Event::PaneSizeQuery(callback_tx)).ok()?; // Wait for callback to be sent - callback_rx.recv().ok() + callback_rx.await.ok() } } impl AppReceiver { /// Receive an event, producing a tick event if no events are rec eived by /// the deadline. - pub fn recv(&self, deadline: Instant) -> Result { - match self.primary.recv_deadline(deadline) { - Ok(event) => Ok(event), - Err(mpsc::RecvTimeoutError::Timeout) => Ok(Event::Tick), - Err(mpsc::RecvTimeoutError::Disconnected) => Err(mpsc::RecvError), - } + pub async fn recv(&mut self) -> Option { + self.primary.recv().await } } diff --git a/crates/turborepo-ui/src/tui/input.rs b/crates/turborepo-ui/src/tui/input.rs index 0dcc585bf00c3..6fca06a13505c 100644 --- a/crates/turborepo-ui/src/tui/input.rs +++ b/crates/turborepo-ui/src/tui/input.rs @@ -1,45 +1,51 @@ -use std::time::Duration; - -use crossterm::event::{KeyCode, KeyEvent, KeyEventKind, KeyModifiers}; +use crossterm::event::{EventStream, KeyCode, KeyEvent, KeyEventKind, KeyModifiers}; +use futures::StreamExt; +use tokio::{sync::mpsc, task::JoinHandle}; use super::{ app::LayoutSections, event::{Direction, Event}, - Error, }; #[derive(Debug, Clone, Copy)] pub struct InputOptions<'a> { pub focus: &'a LayoutSections, - pub tty_stdin: bool, pub has_selection: bool, } -/// Return any immediately available event -pub fn input(options: InputOptions) -> Result, Error> { - // If stdin is not a tty, then we do not attempt to read from it - if !options.tty_stdin { - return Ok(None); +pub fn start_crossterm_stream(tx: mpsc::Sender) -> Option> { + // quick check if stdin is tty + if !atty::is(atty::Stream::Stdin) { + return None; } - // poll with 0 duration will only return true if event::read won't need to wait - // for input - if crossterm::event::poll(Duration::from_millis(0))? { - match crossterm::event::read()? { - crossterm::event::Event::Key(k) => Ok(translate_key_event(options, k)), + + let mut events = EventStream::new(); + Some(tokio::spawn(async move { + while let Some(Ok(event)) = events.next().await { + if tx.send(event).await.is_err() { + break; + } + } + })) +} + +impl<'a> InputOptions<'a> { + /// Maps a crossterm::event::Event to a tui::Event + pub fn handle_crossterm_event(self, event: crossterm::event::Event) -> Option { + match event { + crossterm::event::Event::Key(k) => translate_key_event(self, k), crossterm::event::Event::Mouse(m) => match m.kind { - crossterm::event::MouseEventKind::ScrollDown => Ok(Some(Event::ScrollDown)), - crossterm::event::MouseEventKind::ScrollUp => Ok(Some(Event::ScrollUp)), + crossterm::event::MouseEventKind::ScrollDown => Some(Event::ScrollDown), + crossterm::event::MouseEventKind::ScrollUp => Some(Event::ScrollUp), crossterm::event::MouseEventKind::Down(crossterm::event::MouseButton::Left) | crossterm::event::MouseEventKind::Drag(crossterm::event::MouseButton::Left) => { - Ok(Some(Event::Mouse(m))) + Some(Event::Mouse(m)) } - _ => Ok(None), + _ => None, }, - crossterm::event::Event::Resize(cols, rows) => Ok(Some(Event::Resize { rows, cols })), - _ => Ok(None), + crossterm::event::Event::Resize(cols, rows) => Some(Event::Resize { rows, cols }), + _ => None, } - } else { - Ok(None) } } diff --git a/crates/turborepo-ui/src/tui/mod.rs b/crates/turborepo-ui/src/tui/mod.rs index 6b0ce05538777..5e0995829d507 100644 --- a/crates/turborepo-ui/src/tui/mod.rs +++ b/crates/turborepo-ui/src/tui/mod.rs @@ -17,7 +17,7 @@ use clipboard::copy_to_clipboard; use debouncer::Debouncer; use event::{Event, TaskResult}; pub use handle::{AppReceiver, TuiSender}; -use input::{input, InputOptions}; +use input::InputOptions; pub use pane::TerminalPane; use size::SizeInfo; pub use table::TaskTable; diff --git a/crates/turborepo-ui/src/tui/search.rs b/crates/turborepo-ui/src/tui/search.rs index eabced60bc0e5..23044a5bc09b9 100644 --- a/crates/turborepo-ui/src/tui/search.rs +++ b/crates/turborepo-ui/src/tui/search.rs @@ -1,4 +1,4 @@ -use std::{collections::HashSet, rc::Rc}; +use std::{collections::HashSet, sync::Arc}; use super::task::TasksByStatus; @@ -9,8 +9,8 @@ pub struct SearchResults { // - Rc for cheap clones since elements in `matches` will always be in `tasks` as well // - Rc implements Borrow meaning we can query a `HashSet>` using a `&str` // We do not modify the provided task names so we do not need the capabilities of String. - tasks: Vec>, - matches: HashSet>, + tasks: Vec>, + matches: HashSet>, } impl SearchResults { @@ -18,7 +18,7 @@ impl SearchResults { Self { tasks: tasks .task_names_in_displayed_order() - .map(Rc::from) + .map(Arc::from) .collect(), query: String::new(), matches: HashSet::new(), @@ -29,7 +29,7 @@ impl SearchResults { pub fn update_tasks(&mut self, tasks: &TasksByStatus) { self.tasks.clear(); self.tasks - .extend(tasks.task_names_in_displayed_order().map(Rc::from)); + .extend(tasks.task_names_in_displayed_order().map(Arc::from)); self.update_matches(); }