Skip to content

Commit

Permalink
Merge pull request #895 from tavianator/receiver-buffer
Browse files Browse the repository at this point in the history
walk: Encapsulate the buffering behavior in a struct
  • Loading branch information
tavianator authored Dec 5, 2021
2 parents 1a6638b + a4bb734 commit 7fe4bfa
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 86 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ dirs-next = "2.0"
normpath = "0.3"
chrono = "0.4"
once_cell = "1.8.0"
crossbeam-channel = "0.5.1"

[dependencies.clap]
version = "2.34.0"
Expand Down
3 changes: 2 additions & 1 deletion src/exec/job.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::path::PathBuf;
use std::sync::mpsc::Receiver;
use std::sync::{Arc, Mutex};

use crossbeam_channel::Receiver;

use crate::error::print_error;
use crate::exit_codes::{merge_exitcodes, ExitCode};
use crate::walk::WorkerResult;
Expand Down
245 changes: 160 additions & 85 deletions src/walk.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use std::ffi::OsStr;
use std::fs::{FileType, Metadata};
use std::io;
use std::mem;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time;
use std::time::{Duration, Instant};
use std::{borrow::Cow, io::Write};

use anyhow::{anyhow, Result};
use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender};
use ignore::overrides::OverrideBuilder;
use ignore::{self, WalkBuilder};
use once_cell::unsync::OnceCell;
Expand All @@ -23,6 +24,7 @@ use crate::filesystem;
use crate::output;

/// The receiver thread can either be buffering results or directly streaming to the console.
#[derive(PartialEq)]
enum ReceiverMode {
/// Receiver is still buffering in order to sort the results, if the search finishes fast
/// enough.
Expand All @@ -41,7 +43,7 @@ pub enum WorkerResult {
/// Maximum size of the output buffer before flushing results to the console
pub const MAX_BUFFER_LENGTH: usize = 1000;
/// Default duration until output buffering switches to streaming.
pub const DEFAULT_MAX_BUFFER_TIME: time::Duration = time::Duration::from_millis(100);
pub const DEFAULT_MAX_BUFFER_TIME: Duration = Duration::from_millis(100);

/// Recursively scan the given search path for files / pathnames matching the pattern.
///
Expand All @@ -53,7 +55,7 @@ pub fn scan(path_vec: &[PathBuf], pattern: Arc<Regex>, config: Arc<Config>) -> R
let first_path_buf = path_iter
.next()
.expect("Error: Path vector can not be empty");
let (tx, rx) = channel();
let (tx, rx) = unbounded();

let mut override_builder = OverrideBuilder::new(first_path_buf.as_path());

Expand Down Expand Up @@ -160,6 +162,157 @@ pub fn scan(path_vec: &[PathBuf], pattern: Arc<Regex>, config: Arc<Config>) -> R
}
}

/// Wrapper for the receiver thread's buffering behavior.
struct ReceiverBuffer<W> {
/// The configuration.
config: Arc<Config>,
/// The ^C notifier.
wants_to_quit: Arc<AtomicBool>,
/// Receiver for worker results.
rx: Receiver<WorkerResult>,
/// Standard output.
stdout: W,
/// The current buffer mode.
mode: ReceiverMode,
/// The deadline to switch to streaming mode.
deadline: Instant,
/// The buffer of quickly received paths.
buffer: Vec<PathBuf>,
/// Result count.
num_results: usize,
}

impl<W: Write> ReceiverBuffer<W> {
/// Create a new receiver buffer.
fn new(
config: Arc<Config>,
wants_to_quit: Arc<AtomicBool>,
rx: Receiver<WorkerResult>,
stdout: W,
) -> Self {
let max_buffer_time = config.max_buffer_time.unwrap_or(DEFAULT_MAX_BUFFER_TIME);
let deadline = Instant::now() + max_buffer_time;

Self {
config,
wants_to_quit,
rx,
stdout,
mode: ReceiverMode::Buffering,
deadline,
buffer: Vec::with_capacity(MAX_BUFFER_LENGTH),
num_results: 0,
}
}

/// Process results until finished.
fn process(&mut self) -> ExitCode {
loop {
if let Err(ec) = self.poll() {
return ec;
}
}
}

/// Receive the next worker result.
fn recv(&self) -> Result<WorkerResult, RecvTimeoutError> {
match self.mode {
ReceiverMode::Buffering => {
// Wait at most until we should switch to streaming
self.rx.recv_deadline(self.deadline)
}
ReceiverMode::Streaming => {
// Wait however long it takes for a result
Ok(self.rx.recv()?)
}
}
}

/// Wait for a result or state change.
fn poll(&mut self) -> Result<(), ExitCode> {
match self.recv() {
Ok(WorkerResult::Entry(path)) => {
if self.config.quiet {
return Err(ExitCode::HasResults(true));
}

match self.mode {
ReceiverMode::Buffering => {
self.buffer.push(path);
if self.buffer.len() > MAX_BUFFER_LENGTH {
self.stream()?;
}
}
ReceiverMode::Streaming => {
self.print(&path);
self.flush()?;
}
}

self.num_results += 1;
if let Some(max_results) = self.config.max_results {
if self.num_results >= max_results {
return self.stop();
}
}
}
Ok(WorkerResult::Error(err)) => {
if self.config.show_filesystem_errors {
print_error(err.to_string());
}
}
Err(RecvTimeoutError::Timeout) => {
self.stream()?;
}
Err(RecvTimeoutError::Disconnected) => {
return self.stop();
}
}

Ok(())
}

/// Output a path.
fn print(&mut self, path: &Path) {
output::print_entry(&mut self.stdout, path, &self.config, &self.wants_to_quit)
}

/// Switch ourselves into streaming mode.
fn stream(&mut self) -> Result<(), ExitCode> {
self.mode = ReceiverMode::Streaming;

let buffer = mem::take(&mut self.buffer);
for path in buffer {
self.print(&path);
}

self.flush()
}

/// Stop looping.
fn stop(&mut self) -> Result<(), ExitCode> {
if self.mode == ReceiverMode::Buffering {
self.buffer.sort();
self.stream()?;
}

if self.config.quiet {
Err(ExitCode::HasResults(self.num_results > 0))
} else {
Err(ExitCode::Success)
}
}

/// Flush stdout if necessary.
fn flush(&mut self) -> Result<(), ExitCode> {
if self.config.interactive_terminal && self.stdout.flush().is_err() {
// Probably a broken pipe. Exit gracefully.
return Err(ExitCode::GeneralError);
}
Ok(())
}
}

fn spawn_receiver(
config: &Arc<Config>,
wants_to_quit: &Arc<AtomicBool>,
Expand Down Expand Up @@ -218,90 +371,12 @@ fn spawn_receiver(
merge_exitcodes(exit_codes)
}
} else {
let start = time::Instant::now();

// Start in buffering mode
let mut mode = ReceiverMode::Buffering;

// Maximum time to wait before we start streaming to the console.
let max_buffer_time = config.max_buffer_time.unwrap_or(DEFAULT_MAX_BUFFER_TIME);

let stdout = io::stdout();
let stdout = stdout.lock();
let mut stdout = io::BufWriter::new(stdout);

let mut num_results = 0;
let is_interactive = config.interactive_terminal;
let mut buffer = Vec::with_capacity(MAX_BUFFER_LENGTH);
for worker_result in rx {
match worker_result {
WorkerResult::Entry(path) => {
if config.quiet {
return ExitCode::HasResults(true);
}
let stdout = io::BufWriter::new(stdout);

match mode {
ReceiverMode::Buffering => {
buffer.push(path);

// Have we reached the maximum buffer size or maximum buffering time?
if buffer.len() > MAX_BUFFER_LENGTH
|| start.elapsed() > max_buffer_time
{
// Flush the buffer
for path in &buffer {
output::print_entry(
&mut stdout,
path,
&config,
&wants_to_quit,
);
}
buffer.clear();
if is_interactive && stdout.flush().is_err() {
// Probably a broken pipe. Exit gracefully.
return ExitCode::GeneralError;
}
// Start streaming
mode = ReceiverMode::Streaming;
}
}
ReceiverMode::Streaming => {
output::print_entry(&mut stdout, &path, &config, &wants_to_quit);
if is_interactive && stdout.flush().is_err() {
// Probably a broken pipe. Exit gracefully.
return ExitCode::GeneralError;
}
}
}

num_results += 1;
if let Some(max_results) = config.max_results {
if num_results >= max_results {
break;
}
}
}
WorkerResult::Error(err) => {
if show_filesystem_errors {
print_error(err.to_string());
}
}
}
}

// If we have finished fast enough (faster than max_buffer_time), we haven't streamed
// anything to the console, yet. In this case, sort the results and print them:
buffer.sort();
for value in buffer {
output::print_entry(&mut stdout, &value, &config, &wants_to_quit);
}

if config.quiet {
ExitCode::HasResults(false)
} else {
ExitCode::Success
}
let mut rxbuffer = ReceiverBuffer::new(config, wants_to_quit, rx, stdout);
rxbuffer.process()
}
})
}
Expand Down

0 comments on commit 7fe4bfa

Please sign in to comment.