Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow usage on wasm32-unknown-unknown target #194

Merged
merged 6 commits into from
Sep 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 103 additions & 71 deletions src/evaluate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,13 @@ use crate::png::STD_STRATEGY;
use crate::png::STD_WINDOW;
#[cfg(not(feature = "parallel"))]
use crate::rayon;
#[cfg(not(feature = "parallel"))]
use crate::rayon::prelude::*;
use crate::Deadline;
#[cfg(feature = "parallel")]
use rayon;
#[cfg(feature = "parallel")]
use rayon::prelude::*;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
#[cfg(feature = "parallel")]
use std::sync::mpsc::*;
use std::sync::Arc;
use std::thread;
Expand All @@ -35,36 +33,112 @@ struct Candidate {
nth: usize,
}

#[derive(Default)]
struct Comparator {
best_result: Option<Candidate>,
}

impl Comparator {
fn evaluate(&mut self, new: Candidate) {
// a tie-breaker is required to make evaluation deterministic
let is_best = if let Some(ref old) = self.best_result {
// ordering is important - later file gets to use bias over earlier, but not the other way
// (this way bias=0 replaces, but doesn't forbid later optimizations)
let new_len = (new.image.idat_data.len() as f64
* if new.nth > old.nth {
f64::from(new.bias)
} else {
1.0
}) as usize;
let old_len = (old.image.idat_data.len() as f64
* if new.nth < old.nth {
f64::from(old.bias)
} else {
1.0
}) as usize;
// choose smallest compressed, or if compresses the same, smallest uncompressed, or cheaper filter
let new = (
new_len,
new.image.raw.data.len(),
new.image.raw.ihdr.bit_depth,
new.filter,
new.nth,
);
let old = (
old_len,
old.image.raw.data.len(),
old.image.raw.ihdr.bit_depth,
old.filter,
old.nth,
);
// <= instead of < is important, because best_candidate_size has been set already,
// so the current result may be comparing its size with itself
new <= old
} else {
true
};
if is_best {
self.best_result = if new.is_reduction { Some(new) } else { None };
}
}

fn get_result(self) -> Option<PngData> {
self.best_result.map(|res| res.image)
}
}

/// Collect image versions and pick one that compresses best
pub(crate) struct Evaluator {
deadline: Arc<Deadline>,
nth: AtomicUsize,
best_candidate_size: Arc<AtomicMin>,
/// images are sent to the thread for evaluation
eval_send: Option<SyncSender<Candidate>>,
#[cfg(feature = "parallel")]
eval_send: SyncSender<Candidate>,
// the thread helps evaluate images asynchronously
#[cfg(feature = "parallel")]
eval_thread: thread::JoinHandle<Option<PngData>>,
// in non-parallel mode, images are evaluated synchronously
#[cfg(not(feature = "parallel"))]
eval_comparator: std::cell::RefCell<Comparator>,
}

impl Evaluator {
pub fn new(deadline: Arc<Deadline>) -> Self {
#[cfg(feature = "parallel")]
let (tx, rx) = sync_channel(4);
Self {
deadline,
best_candidate_size: Arc::new(AtomicMin::new(None)),
nth: AtomicUsize::new(0),
eval_send: Some(tx),
eval_thread: thread::spawn(move || Self::evaluate_images(rx)),
#[cfg(feature = "parallel")]
eval_send: tx,
#[cfg(feature = "parallel")]
eval_thread: thread::spawn(move || {
let mut comparator = Comparator::default();
for candidate in rx {
comparator.evaluate(candidate);
}
comparator.get_result()
}),
#[cfg(not(feature = "parallel"))]
eval_comparator: Default::default(),
}
}

/// Wait for all evaluations to finish and return smallest reduction
/// Or `None` if all reductions were worse than baseline.
pub fn get_result(mut self) -> Option<PngData> {
let _ = self.eval_send.take(); // disconnect the sender, breaking the loop in the thread
#[cfg(feature = "parallel")]
pub fn get_result(self) -> Option<PngData> {
drop(self.eval_send); // disconnect the sender, breaking the loop in the thread
self.eval_thread.join().expect("eval thread")
}

#[cfg(not(feature = "parallel"))]
pub fn get_result(self) -> Option<PngData> {
self.eval_comparator.into_inner().get_result()
}

/// Set baseline image. It will be used only to measure minimum compression level required
pub fn set_baseline(&self, image: Arc<PngImage>) {
self.try_image_inner(image, 1.0, false)
Expand All @@ -84,6 +158,7 @@ impl Evaluator {
let best_candidate_size = self.best_candidate_size.clone();
// sends it off asynchronously for compression,
// but results will be collected via the message queue
#[cfg(feature = "parallel")]
let eval_send = self.eval_send.clone();
rayon::spawn(move || {
let filters_iter = STD_FILTERS.par_iter().with_max_len(1);
Expand All @@ -106,71 +181,28 @@ impl Evaluator {
) {
best_candidate_size.set_min(idat_data.len());
// the rest is shipped to the evavluation/collection thread
eval_send
.as_ref()
.expect("not finished yet")
.send(Candidate {
image: PngData {
idat_data,
raw: Arc::clone(&image),
},
bias,
filter,
is_reduction,
nth,
})
.expect("send");
let new = Candidate {
image: PngData {
idat_data,
raw: Arc::clone(&image),
},
bias,
filter,
is_reduction,
nth,
};

#[cfg(feature = "parallel")]
{
eval_send.send(new).expect("send");
}

#[cfg(not(feature = "parallel"))]
{
self.eval_comparator.borrow_mut().evaluate(new);
}
}
});
});
}

/// Main loop of evaluation thread
fn evaluate_images(from_channel: Receiver<Candidate>) -> Option<PngData> {
let mut best_result: Option<Candidate> = None;
// ends when the last sender is dropped
for new in from_channel.iter() {
// a tie-breaker is required to make evaluation deterministic
let is_best = if let Some(ref old) = best_result {
// ordering is important - later file gets to use bias over earlier, but not the other way
// (this way bias=0 replaces, but doesn't forbid later optimizations)
let new_len = (new.image.idat_data.len() as f64
* if new.nth > old.nth {
f64::from(new.bias)
} else {
1.0
}) as usize;
let old_len = (old.image.idat_data.len() as f64
* if new.nth < old.nth {
f64::from(old.bias)
} else {
1.0
}) as usize;
// choose smallest compressed, or if compresses the same, smallest uncompressed, or cheaper filter
let new = (
new_len,
new.image.raw.data.len(),
new.image.raw.ihdr.bit_depth,
new.filter,
new.nth,
);
let old = (
old_len,
old.image.raw.data.len(),
old.image.raw.ihdr.bit_depth,
old.filter,
old.nth,
);
// <= instead of < is important, because best_candidate_size has been set already,
// so the current result may be comparing its size with itself
new <= old
} else {
true
};
if is_best {
best_result = if new.is_reduction { Some(new) } else { None };
}
}
best_result.map(|res| res.image)
}
}
28 changes: 17 additions & 11 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -779,31 +779,37 @@ fn perform_reductions(
try_alpha_reductions(png, &opts.alphas, eval);
}

/// Keep track of processing timeout
pub(crate) struct Deadline {
struct DeadlineImp {
shssoichiro marked this conversation as resolved.
Show resolved Hide resolved
start: Instant,
timeout: Option<Duration>,
timeout: Duration,
print_message: AtomicBool,
}

/// Keep track of processing timeout
pub(crate) struct Deadline {
imp: Option<DeadlineImp>,
}

impl Deadline {
pub fn new(timeout: Option<Duration>, verbose: bool) -> Self {
Self {
start: Instant::now(),
timeout,
print_message: AtomicBool::new(verbose),
imp: timeout.map(|timeout| DeadlineImp {
start: Instant::now(),
timeout,
print_message: AtomicBool::new(verbose),
})
}
}

/// True if the timeout has passed, and no new work should be done.
///
/// If the verbose option is on, it also prints a timeout message once.
pub fn passed(&self) -> bool {
if let Some(timeout) = self.timeout {
let elapsed = self.start.elapsed();
if elapsed > timeout {
if self.print_message.load(Ordering::Relaxed) {
self.print_message.store(false, Ordering::Relaxed);
if let Some(imp) = &self.imp {
let elapsed = imp.start.elapsed();
if elapsed > imp.timeout {
if imp.print_message.load(Ordering::Relaxed) {
imp.print_message.store(false, Ordering::Relaxed);
eprintln!("Timed out after {} second(s)", elapsed.as_secs());
}
return true;
Expand Down