Skip to content

Commit

Permalink
walk: Switch back to crossbeam-channel
Browse files Browse the repository at this point in the history
Fixes #933.  Fixes #1060.  Fixes #1113.
  • Loading branch information
tavianator authored and sharkdp committed Nov 1, 2022
1 parent 93e5488 commit 5bb7a52
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 18 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ dirs-next = "2.0"
normpath = "0.3.2"
chrono = "0.4"
once_cell = "1.15.0"
crossbeam-channel = "0.5.6"

[dependencies.clap]
version = "3.1"
Expand Down
12 changes: 4 additions & 8 deletions src/exec/job.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::mpsc::Receiver;
use std::sync::{Arc, Mutex};

use crossbeam_channel::Receiver;

use crate::config::Config;
use crate::dir_entry::DirEntry;
use crate::error::print_error;
Expand All @@ -13,7 +14,7 @@ use super::CommandSet;
/// generate a command with the supplied command template. The generated command will then
/// be executed, and this process will continue until the receiver's sender has closed.
pub fn job(
rx: Arc<Mutex<Receiver<WorkerResult>>>,
rx: Receiver<WorkerResult>,
cmd: Arc<CommandSet>,
out_perm: Arc<Mutex<()>>,
config: &Config,
Expand All @@ -23,12 +24,9 @@ pub fn job(

let mut results: Vec<ExitCode> = Vec::new();
loop {
// Create a lock on the shared receiver for this thread.
let lock = rx.lock().unwrap();

// Obtain the next result from the receiver, else if the channel
// has closed, exit from the loop
let dir_entry: DirEntry = match lock.recv() {
let dir_entry: DirEntry = match rx.recv() {
Ok(WorkerResult::Entry(dir_entry)) => dir_entry,
Ok(WorkerResult::Error(err)) => {
if config.show_filesystem_errors {
Expand All @@ -39,8 +37,6 @@ pub fn job(
Err(_) => break,
};

// Drop the lock so that other threads can read from the receiver.
drop(lock);
// Generate a command, execute it and store its exit code.
results.push(cmd.execute(
dir_entry.stripped_path(config),
Expand Down
14 changes: 4 additions & 10 deletions src/walk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ use std::io;
use std::mem;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
use std::{borrow::Cow, io::Write};

use anyhow::{anyhow, Result};
use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender};
use ignore::overrides::OverrideBuilder;
use ignore::{self, WalkBuilder};
use regex::bytes::Regex;
Expand Down Expand Up @@ -51,7 +51,7 @@ pub const DEFAULT_MAX_BUFFER_TIME: Duration = Duration::from_millis(100);
/// path will simply be written to standard output.
pub fn scan(paths: &[PathBuf], pattern: Arc<Regex>, config: Arc<Config>) -> Result<ExitCode> {
let first_path = &paths[0];
let (tx, rx) = channel();
let (tx, rx) = unbounded();

let mut override_builder = OverrideBuilder::new(first_path);

Expand Down Expand Up @@ -222,11 +222,7 @@ impl<W: Write> ReceiverBuffer<W> {
match self.mode {
ReceiverMode::Buffering => {
// Wait at most until we should switch to streaming
let now = Instant::now();
self.deadline
.checked_duration_since(now)
.ok_or(RecvTimeoutError::Timeout)
.and_then(|t| self.rx.recv_timeout(t))
self.rx.recv_deadline(self.deadline)
}
ReceiverMode::Streaming => {
// Wait however long it takes for a result
Expand Down Expand Up @@ -345,15 +341,13 @@ fn spawn_receiver(
if cmd.in_batch_mode() {
exec::batch(rx, cmd, &config)
} else {
let shared_rx = Arc::new(Mutex::new(rx));

let out_perm = Arc::new(Mutex::new(()));

// Each spawned job will store it's thread handle in here.
let mut handles = Vec::with_capacity(threads);
for _ in 0..threads {
let config = Arc::clone(&config);
let rx = Arc::clone(&shared_rx);
let rx = rx.clone();
let cmd = Arc::clone(cmd);
let out_perm = Arc::clone(&out_perm);

Expand Down

0 comments on commit 5bb7a52

Please sign in to comment.