Skip to content

Commit

Permalink
irrelevantly fast parser
Browse files Browse the repository at this point in the history
  • Loading branch information
remi-dupre committed Feb 17, 2024
1 parent 474dc6b commit e5a0b76
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 79 deletions.
9 changes: 5 additions & 4 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,10 @@

- autocomplete
- light shell scripts for wrapper commands
- option : record
- option : debug mode
- option : configure truncation of logs
- perf : no more untagged enums
- perf : precise event subscription
- fix : shrinking terminal f..ks up old logs
- spinner with no new thread through tokio task
- distribute via cargo

# Done

Expand All @@ -19,3 +16,7 @@
- implement a wrapper command
- multi-state progress bars
- easy nixos install through a flake?
- perf : no more untagged enums
- spinner with no new thread through tokio task
- option : record
- option : debug mode
2 changes: 1 addition & 1 deletion benches/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::path::PathBuf;

use divan::Bencher;
use pinix::action::Action;
use pinix::parse::action_raw::RawAction;
use pinix::action_raw::RawAction;

fn main() {
divan::main();
Expand Down
6 changes: 3 additions & 3 deletions src/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use anyhow::Context;
use serde::Deserialize;
use serde_repr::Deserialize_repr;

use crate::parse::action_raw::RawAction;
use crate::action_raw::RawAction;

// ---
// --- ActionType
Expand Down Expand Up @@ -144,8 +144,8 @@ pub enum Action<'a> {
}

impl<'a> Action<'a> {
pub fn parse(s: &'a str) -> anyhow::Result<Self> {
let raw: RawAction = serde_json::from_str(s).context("Could not parse raw JSON")?;
pub fn parse(buf: &'a str) -> anyhow::Result<Self> {
let raw: RawAction = serde_json::from_str(buf).context("Could not parse raw JSON")?;
raw.try_into().context("Could not convert raw action")
}
}
File renamed without changes.
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
pub mod action;
pub mod action_raw;
pub mod handlers;
pub mod indicatif_ext;
pub mod parse;
pub mod state;
pub mod style;
pub mod wrapper;
2 changes: 0 additions & 2 deletions src/parse/mod.rs

This file was deleted.

53 changes: 24 additions & 29 deletions src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::time::Instant;

use anyhow::Context;
use console::style;
use futures::TryStreamExt;
use indicatif::{MultiProgress, ProgressBar, ProgressFinish, ProgressStyle};
use tokio::io::{AsyncWriteExt, BufWriter};

Expand All @@ -16,7 +15,7 @@ use crate::handlers::downloads_group::handle_new_downloads_group;
use crate::handlers::message::handle_new_message;
use crate::handlers::unknown::handle_new_unknown;
use crate::wrapper::command::NixCommand;
use crate::wrapper::stream::{OutputStream, StreamedPipes};
use crate::wrapper::stream::{MergedStreams, OutputStream};

#[derive(Eq, PartialEq)]
pub enum HandlerResult {
Expand Down Expand Up @@ -162,9 +161,9 @@ impl<'s> State<'s> {
}
}

pub async fn monitor_logs(
pub async fn monitor_logs<'c>(
cmd: &NixCommand,
log_stream: impl StreamedPipes<'_>,
mut log_stream: MergedStreams<'c>,
) -> anyhow::Result<()> {
let mut state = State::new(cmd);
let start_time = Instant::now();
Expand All @@ -181,36 +180,32 @@ pub async fn monitor_logs(
}
};

log_stream
.try_fold(&mut record_file, move |mut record_file, (output, line)| {
let elapsed = start_time.elapsed();

let result = match output {
OutputStream::StdOut => state.println(&line),
OutputStream::StdErr => {
if let Some(action_str) = line.strip_prefix("@nix ") {
Action::parse(action_str).and_then(|action| state.handle(&action))
} else {
state.println(&line)
}
}
};
while let Some((output, line)) = log_stream.next_line().await? {
let line = std::str::from_utf8(line).context("invalid utf-8")?;

async move {
result?;
if let Some(file) = &mut record_file {
let elapsed = start_time.elapsed();
let line = format!("{} {:07} {line}\n", output.as_str(), elapsed.as_millis());

if let Some(file) = &mut record_file {
let line = format!("{} {:07} {line}\n", output.as_str(), elapsed.as_millis());
file.write_all(line.as_bytes())
.await
.context("error writing record file")?;
}

file.write_all(line.as_bytes())
.await
.context("error writing record file")?;
match output {
OutputStream::StdOut => {
state.println(line)?;
}
OutputStream::StdErr => {
if let Some(action_raw) = line.strip_prefix("@nix ") {
let action = Action::parse(action_raw)?;
state.handle(&action)?;
} else {
state.println(line)?
}

Ok(record_file)
}
})
.await?;
}
}

if let Some(mut file) = record_file {
file.flush().await.context("error saving record file")?;
Expand Down
4 changes: 2 additions & 2 deletions src/wrapper/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tokio::process;

use crate::state::monitor_logs;

use super::stream::stream_child_output;
use super::stream::MergedStreams;

#[derive(Debug, clap::Parser)]
#[command(
Expand Down Expand Up @@ -171,7 +171,7 @@ impl NixCommand {
.context("failed to spawn command")?;

let logs_stream =
stream_child_output(&mut child).context("could not pipe command output")?;
MergedStreams::new(&mut child).context("could not pipe command output")?;

monitor_logs(self, logs_stream).await?;
let exit_code = child.wait().await.context("child command failed")?;
Expand Down
104 changes: 67 additions & 37 deletions src/wrapper/stream.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::str::FromStr;

use anyhow::Context;
use futures::{FutureExt, TryStream};
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Child;
use futures::FutureExt;
use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader};
use tokio::process::{Child, ChildStderr, ChildStdout};

pub enum OutputStream {
StdOut,
Expand Down Expand Up @@ -31,46 +31,76 @@ impl FromStr for OutputStream {
}
}

pub trait StreamedPipes<'a>:
TryStream<Ok = (OutputStream, String), Error = anyhow::Error> + 'a
{
/// Cancellable way to fetch lines from a reader
struct BorrowLines<R: AsyncRead + Unpin> {
reader: BufReader<R>,
buffer: Vec<u8>,
yielded: bool,
}

impl<'a, T> StreamedPipes<'a> for T where
T: TryStream<Ok = (OutputStream, String), Error = anyhow::Error> + 'a
{
impl<R: AsyncRead + Unpin> BorrowLines<R> {
fn new(reader: R) -> Self {
Self {
reader: BufReader::new(reader),
buffer: Vec::new(),
yielded: false,
}
}

async fn next_line(&mut self) -> anyhow::Result<Option<&[u8]>> {
if self.yielded {
self.yielded = false;
self.buffer.clear();
}

self.reader.read_until(b'\n', &mut self.buffer).await?;

if self.buffer.is_empty() {
Ok(None)
} else {
assert_eq!(self.buffer.last(), Some(&b'\n'));
self.yielded = true;
Ok(Some(self.buffer.as_slice()))
}
}
}

pub fn stream_child_output(child: &mut Child) -> anyhow::Result<impl StreamedPipes<'_>> {
let stdout = child
.stdout
.as_mut()
.context("could not read child command output")?;
pub struct MergedStreams<'c> {
stdout: BorrowLines<&'c mut ChildStdout>,
stderr: BorrowLines<&'c mut ChildStderr>,
}

let stderr = child
.stderr
.as_mut()
.context("could not read child command output")?;
impl<'c> MergedStreams<'c> {
pub fn new(child: &'c mut Child) -> anyhow::Result<Self> {
let stdout = child
.stdout
.as_mut()
.context("could not read child command output")?;

let stdout = BufReader::new(stdout).lines();
let stderr = BufReader::new(stderr).lines();
let stderr = child
.stderr
.as_mut()
.context("could not read child command output")?;

Ok(futures::stream::try_unfold(
(stdout, stderr),
|(mut stdout, mut stderr)| async move {
let res = tokio::select! {
Some(line) = stdout.next_line().map(Result::transpose) => {
(OutputStream::StdOut, line.context("could not read child's stdout")?)
}
Some(line) = stderr.next_line().map(Result::transpose) => {
(OutputStream::StdErr, line.context("could not read child's stderr")?)
}
else => {
return Ok(None)
}
};
Ok(Self {
stdout: BorrowLines::new(stdout),
stderr: BorrowLines::new(stderr),
})
}

Ok(Some((res, (stdout, stderr))))
},
))
pub async fn next_line(&mut self) -> anyhow::Result<Option<(OutputStream, &[u8])>> {
tokio::select! {
Some(line) = self.stdout.next_line().map(Result::transpose) => {
let line = line?;
Ok(Some((OutputStream::StdOut, line)))
}
Some(line) = self.stderr.next_line().map(Result::transpose) => {
let line = line?;
Ok(Some((OutputStream::StdErr, line)))
}
else => {
Ok(None)
}
}
}
}

0 comments on commit e5a0b76

Please sign in to comment.