Skip to content

Commit

Permalink
Wait for sampling thread to exit (#391)
Browse files Browse the repository at this point in the history
  • Loading branch information
benfred authored May 16, 2021
1 parent 37b60cc commit 1315f07
Showing 1 changed file with 17 additions and 6 deletions.
23 changes: 17 additions & 6 deletions src/sampler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use crate::version::Version;

pub struct Sampler {
pub version: Option<Version>,
rx: Receiver<Sample>,
rx: Option<Receiver<Sample>>,
sampling_thread: Option<thread::JoinHandle<()>>,
}

pub struct Sample {
Expand All @@ -39,7 +40,7 @@ impl Sampler {
let (tx, rx): (Sender<Sample>, Receiver<Sample>) = mpsc::channel();
let (initialized_tx, initialized_rx): (Sender<Result<Version, Error>>, Receiver<Result<Version, Error>>) = mpsc::channel();
let config = config.clone();
thread::spawn(move || {
let sampling_thread = thread::spawn(move || {
// We need to create this object inside the thread here since PythonSpy objects don't
// have the Send trait implemented on linux
let mut spy = match PythonSpy::retry_new(pid, &config, 20) {
Expand Down Expand Up @@ -77,7 +78,7 @@ impl Sampler {
});

let version = initialized_rx.recv()??;
Ok(Sampler{rx, version: Some(version)})
Ok(Sampler{rx: Some(rx), version: Some(version), sampling_thread: Some(sampling_thread)})
}

/// Creates a new sampler object that samples any python process in the
Expand Down Expand Up @@ -144,7 +145,7 @@ impl Sampler {
// Create a new thread to generate samples
let config = config.clone();
let (tx, rx): (Sender<Sample>, Receiver<Sample>) = mpsc::channel();
std::thread::spawn(move || {
let sampling_thread = std::thread::spawn(move || {
for sleep in Timer::new(config.sampling_rate as f64) {
let mut traces = Vec::new();
let mut sampling_errors = None;
Expand Down Expand Up @@ -198,14 +199,24 @@ impl Sampler {
}
}
});
Ok(Sampler{rx, version: None})

Ok(Sampler{rx: Some(rx), version: None, sampling_thread: Some(sampling_thread)})
}
}

impl Iterator for Sampler {
type Item = Sample;
fn next(&mut self) -> Option<Self::Item> {
self.rx.recv().ok()
self.rx.as_ref().unwrap().recv().ok()
}
}

impl Drop for Sampler {
fn drop(&mut self) {
self.rx = None;
if let Some(t) = self.sampling_thread.take() {
t.join().unwrap();
}
}
}

Expand Down

0 comments on commit 1315f07

Please sign in to comment.