From e5a0b76e70da66fbaa1ffd60b64312ddb333971b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9mi=20Dupr=C3=A9?= Date: Sat, 17 Feb 2024 15:08:37 +0100 Subject: [PATCH] irrelevantly fast parser --- TODO.md | 9 +-- benches/parser.rs | 2 +- src/action.rs | 6 +- src/{parse => }/action_raw.rs | 0 src/lib.rs | 2 +- src/parse/mod.rs | 2 - src/state.rs | 53 ++++++++--------- src/wrapper/command.rs | 4 +- src/wrapper/stream.rs | 104 ++++++++++++++++++++++------------ 9 files changed, 103 insertions(+), 79 deletions(-) rename src/{parse => }/action_raw.rs (100%) delete mode 100644 src/parse/mod.rs diff --git a/TODO.md b/TODO.md index 0b04b4b..65ea8fd 100644 --- a/TODO.md +++ b/TODO.md @@ -2,13 +2,10 @@ - autocomplete - light shell scripts for wrapper commands -- option : record -- option : debug mode - option : configure truncation of logs -- perf : no more untagged enums - perf : precise event subscription - fix : shrinking terminal f..ks up old logs -- spinner with no new thread through tokio task +- distribute via cargo # Done @@ -19,3 +16,7 @@ - implement a wrapper command - multi-state progress bars - easy nixos install through a flake? +- perf : no more untagged enums +- spinner with no new thread through tokio task +- option : record +- option : debug mode diff --git a/benches/parser.rs b/benches/parser.rs index 385c97f..94a9fc4 100644 --- a/benches/parser.rs +++ b/benches/parser.rs @@ -4,7 +4,7 @@ use std::path::PathBuf; use divan::Bencher; use pinix::action::Action; -use pinix::parse::action_raw::RawAction; +use pinix::action_raw::RawAction; fn main() { divan::main(); diff --git a/src/action.rs b/src/action.rs index 72f882b..1b144d5 100644 --- a/src/action.rs +++ b/src/action.rs @@ -6,7 +6,7 @@ use anyhow::Context; use serde::Deserialize; use serde_repr::Deserialize_repr; -use crate::parse::action_raw::RawAction; +use crate::action_raw::RawAction; // --- // --- ActionType @@ -144,8 +144,8 @@ pub enum Action<'a> { } impl<'a> Action<'a> { - pub fn parse(s: &'a str) -> anyhow::Result { - let raw: RawAction = serde_json::from_str(s).context("Could not parse raw JSON")?; + pub fn parse(buf: &'a str) -> anyhow::Result { + let raw: RawAction = serde_json::from_str(buf).context("Could not parse raw JSON")?; raw.try_into().context("Could not convert raw action") } } diff --git a/src/parse/action_raw.rs b/src/action_raw.rs similarity index 100% rename from src/parse/action_raw.rs rename to src/action_raw.rs diff --git a/src/lib.rs b/src/lib.rs index 53cbe37..a52f0a1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,7 @@ pub mod action; +pub mod action_raw; pub mod handlers; pub mod indicatif_ext; -pub mod parse; pub mod state; pub mod style; pub mod wrapper; diff --git a/src/parse/mod.rs b/src/parse/mod.rs deleted file mode 100644 index f33baf7..0000000 --- a/src/parse/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod action_raw; - diff --git a/src/state.rs b/src/state.rs index 5861214..57e6853 100644 --- a/src/state.rs +++ b/src/state.rs @@ -3,7 +3,6 @@ use std::time::Instant; use anyhow::Context; use console::style; -use futures::TryStreamExt; use indicatif::{MultiProgress, ProgressBar, ProgressFinish, ProgressStyle}; use tokio::io::{AsyncWriteExt, BufWriter}; @@ -16,7 +15,7 @@ use crate::handlers::downloads_group::handle_new_downloads_group; use crate::handlers::message::handle_new_message; use crate::handlers::unknown::handle_new_unknown; use crate::wrapper::command::NixCommand; -use crate::wrapper::stream::{OutputStream, StreamedPipes}; +use crate::wrapper::stream::{MergedStreams, OutputStream}; #[derive(Eq, PartialEq)] pub enum HandlerResult { @@ -162,9 +161,9 @@ impl<'s> State<'s> { } } -pub async fn monitor_logs( +pub async fn monitor_logs<'c>( cmd: &NixCommand, - log_stream: impl StreamedPipes<'_>, + mut log_stream: MergedStreams<'c>, ) -> anyhow::Result<()> { let mut state = State::new(cmd); let start_time = Instant::now(); @@ -181,36 +180,32 @@ pub async fn monitor_logs( } }; - log_stream - .try_fold(&mut record_file, move |mut record_file, (output, line)| { - let elapsed = start_time.elapsed(); - - let result = match output { - OutputStream::StdOut => state.println(&line), - OutputStream::StdErr => { - if let Some(action_str) = line.strip_prefix("@nix ") { - Action::parse(action_str).and_then(|action| state.handle(&action)) - } else { - state.println(&line) - } - } - }; + while let Some((output, line)) = log_stream.next_line().await? { + let line = std::str::from_utf8(line).context("invalid utf-8")?; - async move { - result?; + if let Some(file) = &mut record_file { + let elapsed = start_time.elapsed(); + let line = format!("{} {:07} {line}\n", output.as_str(), elapsed.as_millis()); - if let Some(file) = &mut record_file { - let line = format!("{} {:07} {line}\n", output.as_str(), elapsed.as_millis()); + file.write_all(line.as_bytes()) + .await + .context("error writing record file")?; + } - file.write_all(line.as_bytes()) - .await - .context("error writing record file")?; + match output { + OutputStream::StdOut => { + state.println(line)?; + } + OutputStream::StdErr => { + if let Some(action_raw) = line.strip_prefix("@nix ") { + let action = Action::parse(action_raw)?; + state.handle(&action)?; + } else { + state.println(line)? } - - Ok(record_file) } - }) - .await?; + } + } if let Some(mut file) = record_file { file.flush().await.context("error saving record file")?; diff --git a/src/wrapper/command.rs b/src/wrapper/command.rs index ff920c9..342a971 100644 --- a/src/wrapper/command.rs +++ b/src/wrapper/command.rs @@ -9,7 +9,7 @@ use tokio::process; use crate::state::monitor_logs; -use super::stream::stream_child_output; +use super::stream::MergedStreams; #[derive(Debug, clap::Parser)] #[command( @@ -171,7 +171,7 @@ impl NixCommand { .context("failed to spawn command")?; let logs_stream = - stream_child_output(&mut child).context("could not pipe command output")?; + MergedStreams::new(&mut child).context("could not pipe command output")?; monitor_logs(self, logs_stream).await?; let exit_code = child.wait().await.context("child command failed")?; diff --git a/src/wrapper/stream.rs b/src/wrapper/stream.rs index d8547bc..18a1452 100644 --- a/src/wrapper/stream.rs +++ b/src/wrapper/stream.rs @@ -1,9 +1,9 @@ use std::str::FromStr; use anyhow::Context; -use futures::{FutureExt, TryStream}; -use tokio::io::{AsyncBufReadExt, BufReader}; -use tokio::process::Child; +use futures::FutureExt; +use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader}; +use tokio::process::{Child, ChildStderr, ChildStdout}; pub enum OutputStream { StdOut, @@ -31,46 +31,76 @@ impl FromStr for OutputStream { } } -pub trait StreamedPipes<'a>: - TryStream + 'a -{ +/// Cancellable way to fetch lines from a reader +struct BorrowLines { + reader: BufReader, + buffer: Vec, + yielded: bool, } -impl<'a, T> StreamedPipes<'a> for T where - T: TryStream + 'a -{ +impl BorrowLines { + fn new(reader: R) -> Self { + Self { + reader: BufReader::new(reader), + buffer: Vec::new(), + yielded: false, + } + } + + async fn next_line(&mut self) -> anyhow::Result> { + if self.yielded { + self.yielded = false; + self.buffer.clear(); + } + + self.reader.read_until(b'\n', &mut self.buffer).await?; + + if self.buffer.is_empty() { + Ok(None) + } else { + assert_eq!(self.buffer.last(), Some(&b'\n')); + self.yielded = true; + Ok(Some(self.buffer.as_slice())) + } + } } -pub fn stream_child_output(child: &mut Child) -> anyhow::Result> { - let stdout = child - .stdout - .as_mut() - .context("could not read child command output")?; +pub struct MergedStreams<'c> { + stdout: BorrowLines<&'c mut ChildStdout>, + stderr: BorrowLines<&'c mut ChildStderr>, +} - let stderr = child - .stderr - .as_mut() - .context("could not read child command output")?; +impl<'c> MergedStreams<'c> { + pub fn new(child: &'c mut Child) -> anyhow::Result { + let stdout = child + .stdout + .as_mut() + .context("could not read child command output")?; - let stdout = BufReader::new(stdout).lines(); - let stderr = BufReader::new(stderr).lines(); + let stderr = child + .stderr + .as_mut() + .context("could not read child command output")?; - Ok(futures::stream::try_unfold( - (stdout, stderr), - |(mut stdout, mut stderr)| async move { - let res = tokio::select! { - Some(line) = stdout.next_line().map(Result::transpose) => { - (OutputStream::StdOut, line.context("could not read child's stdout")?) - } - Some(line) = stderr.next_line().map(Result::transpose) => { - (OutputStream::StdErr, line.context("could not read child's stderr")?) - } - else => { - return Ok(None) - } - }; + Ok(Self { + stdout: BorrowLines::new(stdout), + stderr: BorrowLines::new(stderr), + }) + } - Ok(Some((res, (stdout, stderr)))) - }, - )) + pub async fn next_line(&mut self) -> anyhow::Result> { + tokio::select! { + Some(line) = self.stdout.next_line().map(Result::transpose) => { + let line = line?; + Ok(Some((OutputStream::StdOut, line))) + } + Some(line) = self.stderr.next_line().map(Result::transpose) => { + let line = line?; + Ok(Some((OutputStream::StdErr, line))) + } + else => { + Ok(None) + } + } + } }