diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f825d0bf..5b872ecf 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -57,9 +57,6 @@ jobs: echo '--- test portable' echo cargo test --release --features=portable - echo '--- test no-threads' - echo - cargo test --release --features=no-threads echo '--- test serde-secret' echo cargo test --release --features=serde-secret diff --git a/bindings/rust/Cargo.toml b/bindings/rust/Cargo.toml index ce6ba53e..1724e93a 100644 --- a/bindings/rust/Cargo.toml +++ b/bindings/rust/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "blst" -version = "0.3.11" +version = "0.4.0" authors = ["sean-sn "] edition = "2018" license = "Apache-2.0" @@ -33,9 +33,6 @@ portable = [] # Enable ADX even if the host CPU doesn't support it. # Binary can be executed on Broadwell+ and Ryzen+ systems. force-adx = [] -# Suppress multi-threading. -# Engaged on wasm32 target architecture automatically. -no-threads = [] # Add support for serializing SecretKey, not suitable for production. serde-secret = ["serde"] @@ -46,11 +43,9 @@ glob = "0.3" [dependencies] zeroize = { version = "^1.1", features = ["zeroize_derive"] } +rayon = "1.8.1" serde = { version = "1.0.152", optional = true } -[target.'cfg(not(any(target_arch="wasm32", target_os="none", target_os="unknown", target_os="uefi")))'.dependencies] -threadpool = "^1.8.1" - [dev-dependencies] rand = "0.8" rand_chacha = "0.3" diff --git a/bindings/rust/build.rs b/bindings/rust/build.rs index 44bf2644..c3c67bb7 100644 --- a/bindings/rust/build.rs +++ b/bindings/rust/build.rs @@ -49,9 +49,6 @@ fn main() { if !target_no_std { println!("cargo:rustc-cfg=feature=\"std\""); - if target_arch.eq("wasm32") || target_os.eq("unknown") { - println!("cargo:rustc-cfg=feature=\"no-threads\""); - } } println!("cargo:rerun-if-env-changed=BLST_TEST_NO_STD"); diff --git a/bindings/rust/src/lib.rs b/bindings/rust/src/lib.rs index 9083bd8a..0ddb74f2 100644 --- a/bindings/rust/src/lib.rs +++ b/bindings/rust/src/lib.rs @@ -18,78 +18,11 @@ use core::ptr; use zeroize::Zeroize; #[cfg(feature = "std")] -use std::sync::{atomic::*, mpsc::channel, Arc}; +use std::sync::{atomic::*, mpsc::channel, Mutex}; #[cfg(feature = "serde")] use serde::{Deserialize, Deserializer, Serialize, Serializer}; -trait ThreadPoolExt { - fn joined_execute<'any, F>(&self, job: F) - where - F: FnOnce() + Send + 'any; -} - -#[cfg(all(not(feature = "no-threads"), feature = "std"))] -mod mt { - use super::*; - use core::mem::transmute; - use std::sync::{Mutex, Once}; - use threadpool::ThreadPool; - - pub fn da_pool() -> ThreadPool { - static INIT: Once = Once::new(); - static mut POOL: *const Mutex = - 0 as *const Mutex; - - INIT.call_once(|| { - let pool = Mutex::new(ThreadPool::default()); - unsafe { POOL = transmute(Box::new(pool)) }; - }); - unsafe { (*POOL).lock().unwrap().clone() } - } - - type Thunk<'any> = Box; - - impl ThreadPoolExt for ThreadPool { - fn joined_execute<'scope, F>(&self, job: F) - where - F: FnOnce() + Send + 'scope, - { - // Bypass 'lifetime limitations by brute force. It works, - // because we explicitly join the threads... - self.execute(unsafe { - transmute::, Thunk<'static>>(Box::new(job)) - }) - } - } -} - -#[cfg(all(feature = "no-threads", feature = "std"))] -mod mt { - use super::*; - - pub struct EmptyPool {} - - pub fn da_pool() -> EmptyPool { - EmptyPool {} - } - - impl EmptyPool { - pub fn max_count(&self) -> usize { - 1 - } - } - - impl ThreadPoolExt for EmptyPool { - fn joined_execute<'scope, F>(&self, job: F) - where - F: FnOnce() + Send + 'scope, - { - job() - } - } -} - include!("bindings.rs"); impl PartialEq for blst_p1 { @@ -161,11 +94,16 @@ impl blst_fp12 { if n_elems != p.len() || n_elems == 0 { panic!("inputs' lengths mismatch"); } - let qs: [*const _; 2] = [&q[0], ptr::null()]; - let ps: [*const _; 2] = [&p[0], ptr::null()]; + let qs = [q.as_ptr(), ptr::null()]; + let ps = [p.as_ptr(), ptr::null()]; let mut out = MaybeUninit::::uninit(); unsafe { - blst_miller_loop_n(out.as_mut_ptr(), &qs[0], &ps[0], n_elems); + blst_miller_loop_n( + out.as_mut_ptr(), + qs.as_ptr(), + ps.as_ptr(), + n_elems, + ); out.assume_init() } } @@ -177,33 +115,33 @@ impl blst_fp12 { panic!("inputs' lengths mismatch"); } - let pool = mt::da_pool(); - - let mut n_workers = pool.max_count(); + let n_workers = rayon::current_num_threads(); if n_workers == 1 { - let qs: [*const _; 2] = [&q[0], ptr::null()]; - let ps: [*const _; 2] = [&p[0], ptr::null()]; + let qs = [q.as_ptr(), ptr::null()]; + let ps = [p.as_ptr(), ptr::null()]; let mut out = MaybeUninit::::uninit(); unsafe { - blst_miller_loop_n(out.as_mut_ptr(), &qs[0], &ps[0], n_elems); + blst_miller_loop_n( + out.as_mut_ptr(), + qs.as_ptr(), + ps.as_ptr(), + n_elems, + ); return out.assume_init(); } } - let (tx, rx) = channel(); - let counter = Arc::new(AtomicUsize::new(0)); - + let ret = Mutex::new(None::); + let counter = AtomicUsize::new(0); let stride = core::cmp::min((n_elems + n_workers - 1) / n_workers, 16); - n_workers = core::cmp::min((n_elems + stride - 1) / stride, n_workers); - for _ in 0..n_workers { - let tx = tx.clone(); - let counter = counter.clone(); - pool.joined_execute(move || { + rayon::scope(|scope| { + scope.spawn_broadcast(|_scope, _ctx| { + let mut processed = 0; let mut acc = blst_fp12::default(); let mut tmp = MaybeUninit::::uninit(); - let mut qs: [*const _; 2] = [ptr::null(), ptr::null()]; - let mut ps: [*const _; 2] = [ptr::null(), ptr::null()]; + let mut qs = [ptr::null(), ptr::null()]; + let mut ps = [ptr::null(), ptr::null()]; loop { let work = counter.fetch_add(stride, Ordering::Relaxed); @@ -214,21 +152,34 @@ impl blst_fp12 { qs[0] = &q[work]; ps[0] = &p[work]; unsafe { - blst_miller_loop_n(tmp.as_mut_ptr(), &qs[0], &ps[0], n); + blst_miller_loop_n( + tmp.as_mut_ptr(), + qs.as_ptr(), + ps.as_ptr(), + n, + ); acc *= tmp.assume_init(); } + + processed += 1; } - tx.send(acc).expect("disaster"); + if processed > 0 { + let mut ret = ret.lock().unwrap(); + match ret.as_mut() { + Some(ret) => { + *ret *= acc; + } + None => { + ret.replace(acc); + } + } + } }); - } - - let mut acc = rx.recv().unwrap(); - for _ in 1..n_workers { - acc *= rx.recv().unwrap(); - } + }); - acc + let mut ret = ret.lock().unwrap(); + ret.take().unwrap() } pub fn final_exp(&self) -> Self { @@ -278,7 +229,7 @@ impl blst_scalar { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Pairing { v: Box<[u64]>, } @@ -1107,18 +1058,14 @@ macro_rules! sig_variant_impl { // TODO - check msg uniqueness? - let pool = mt::da_pool(); - let (tx, rx) = channel(); - let counter = Arc::new(AtomicUsize::new(0)); - let valid = Arc::new(AtomicBool::new(true)); - - let n_workers = core::cmp::min(pool.max_count(), n_elems); - for _ in 0..n_workers { - let tx = tx.clone(); - let counter = counter.clone(); - let valid = valid.clone(); + let counter = AtomicUsize::new(0); + let valid = AtomicBool::new(true); + let acc = Mutex::new(None::); + let mut gtsig = blst_fp12::default(); - pool.joined_execute(move || { + rayon::scope(|scope| { + scope.spawn_broadcast(|_scope, _ctx| { + let mut processed = 0; let mut pairing = Pairing::new($hash_or_encode, dst); while valid.load(Ordering::Relaxed) { @@ -1126,6 +1073,7 @@ macro_rules! sig_variant_impl { if work >= n_elems { break; } + if pairing.aggregate( &pks[work].point, pks_validate, @@ -1135,37 +1083,49 @@ macro_rules! sig_variant_impl { &[], ) != BLST_ERROR::BLST_SUCCESS { - valid.store(false, Ordering::Relaxed); + valid.store(false, Ordering::Release); break; } + + processed += 1; } - if valid.load(Ordering::Relaxed) { + + if processed > 0 && valid.load(Ordering::Relaxed) { pairing.commit(); + + let mut acc = acc.lock().unwrap(); + match acc.as_mut() { + Some(acc) => { + acc.merge(&pairing); + } + None => { + acc.replace(pairing); + } + } } - tx.send(pairing).expect("disaster"); }); - } - - if sig_groupcheck && valid.load(Ordering::Relaxed) { - match self.validate(false) { - Err(_err) => valid.store(false, Ordering::Relaxed), - _ => (), - } - } + scope.spawn(|_scope| { + if sig_groupcheck && self.validate(false).is_err() { + valid.store(false, Ordering::Release); + } + }); + scope.spawn(|_scope| { + Pairing::aggregated(&mut gtsig, &self.point); + }); + }); - let mut gtsig = blst_fp12::default(); - if valid.load(Ordering::Relaxed) { - Pairing::aggregated(&mut gtsig, &self.point); + if !valid.load(Ordering::Acquire) { + return BLST_ERROR::BLST_VERIFY_FAIL; } - let mut acc = rx.recv().unwrap(); - for _ in 1..n_workers { - acc.merge(&rx.recv().unwrap()); - } + let acc = match acc.lock().unwrap().take() { + Some(acc) => acc, + None => { + return BLST_ERROR::BLST_VERIFY_FAIL; + } + }; - if valid.load(Ordering::Relaxed) - && acc.finalverify(Some(>sig)) - { + if acc.finalverify(Some(>sig)) { BLST_ERROR::BLST_SUCCESS } else { BLST_ERROR::BLST_VERIFY_FAIL @@ -1231,18 +1191,13 @@ macro_rules! sig_variant_impl { // TODO - check msg uniqueness? - let pool = mt::da_pool(); - let (tx, rx) = channel(); - let counter = Arc::new(AtomicUsize::new(0)); - let valid = Arc::new(AtomicBool::new(true)); - - let n_workers = core::cmp::min(pool.max_count(), n_elems); - for _ in 0..n_workers { - let tx = tx.clone(); - let counter = counter.clone(); - let valid = valid.clone(); + let counter = AtomicUsize::new(0); + let valid = AtomicBool::new(true); + let acc = Mutex::new(None::); - pool.joined_execute(move || { + rayon::scope(|scope| { + scope.spawn_broadcast(|_scope, _ctx| { + let mut processed = 0; let mut pairing = Pairing::new($hash_or_encode, dst); // TODO - engage multi-point mul-n-add for larger @@ -1264,23 +1219,36 @@ macro_rules! sig_variant_impl { &[], ) != BLST_ERROR::BLST_SUCCESS { - valid.store(false, Ordering::Relaxed); + valid.store(false, Ordering::Release); break; } + + processed += 1; } - if valid.load(Ordering::Relaxed) { + if processed > 0 && valid.load(Ordering::Relaxed) { pairing.commit(); + + let mut acc = acc.lock().unwrap(); + match acc.as_mut() { + Some(acc) => { + acc.merge(&pairing); + } + None => { + acc.replace(pairing); + } + } } - tx.send(pairing).expect("disaster"); - }); - } + }) + }); - let mut acc = rx.recv().unwrap(); - for _ in 1..n_workers { - acc.merge(&rx.recv().unwrap()); - } + let acc = match acc.lock().unwrap().take() { + Some(acc) => acc, + None => { + return BLST_ERROR::BLST_VERIFY_FAIL; + } + }; - if valid.load(Ordering::Relaxed) && acc.finalverify(None) { + if valid.load(Ordering::Acquire) && acc.finalverify(None) { BLST_ERROR::BLST_SUCCESS } else { BLST_ERROR::BLST_VERIFY_FAIL diff --git a/bindings/rust/src/pippenger-no_std.rs b/bindings/rust/src/pippenger-no_std.rs index c316e87a..284ab8d6 100644 --- a/bindings/rust/src/pippenger-no_std.rs +++ b/bindings/rust/src/pippenger-no_std.rs @@ -51,11 +51,12 @@ macro_rules! pippenger_mult_impl { let mut ret = Self { points: Vec::with_capacity(npoints), }; - #[allow(clippy::uninit_vec)] - unsafe { ret.points.set_len(npoints) }; - let p: [*const $point; 2] = [&points[0], ptr::null()]; - unsafe { $to_affines(&mut ret.points[0], &p[0], npoints) }; + let p = [points.as_ptr(), ptr::null()]; + unsafe { + $to_affines(ret.points.as_mut_ptr(), p.as_ptr(), npoints); + ret.points.set_len(npoints); + } ret } @@ -67,23 +68,20 @@ macro_rules! pippenger_mult_impl { panic!("scalars length mismatch"); } - let p: [*const $point_affine; 2] = - [&self.points[0], ptr::null()]; - let s: [*const u8; 2] = [&scalars[0], ptr::null()]; + let p = [self.points.as_ptr(), ptr::null()]; + let s = [scalars.as_ptr(), ptr::null()]; let mut ret = <$point>::default(); unsafe { let mut scratch: Vec = Vec::with_capacity($scratch_sizeof(npoints) / 8); - #[allow(clippy::uninit_vec)] - scratch.set_len(scratch.capacity()); $multi_scalar_mult( &mut ret, - &p[0], + p.as_ptr(), npoints, - &s[0], + s.as_ptr(), nbits, - &mut scratch[0], + scratch.as_mut_ptr(), ); } ret @@ -92,9 +90,9 @@ macro_rules! pippenger_mult_impl { pub fn add(&self) -> $point { let npoints = self.points.len(); - let p: [*const _; 2] = [&self.points[0], ptr::null()]; + let p = [self.points.as_ptr(), ptr::null()]; let mut ret = <$point>::default(); - unsafe { $add(&mut ret, &p[0], npoints) }; + unsafe { $add(&mut ret, p.as_ptr(), npoints) }; ret } diff --git a/bindings/rust/src/pippenger-test_mod.rs b/bindings/rust/src/pippenger-test_mod.rs index 4874a12e..7f338df6 100644 --- a/bindings/rust/src/pippenger-test_mod.rs +++ b/bindings/rust/src/pippenger-test_mod.rs @@ -25,8 +25,7 @@ macro_rules! pippenger_test_mod { let mut scalars = Box::new([0u8; nbytes * npoints]); ChaCha20Rng::from_seed([0u8; 32]).fill_bytes(scalars.as_mut()); - let mut points: Vec<$point> = Vec::with_capacity(npoints); - unsafe { points.set_len(points.capacity()) }; + let mut points = vec![<$point>::default(); npoints]; let mut naive = <$point>::default(); for i in 0..npoints { @@ -61,8 +60,7 @@ macro_rules! pippenger_test_mod { let mut scalars = Box::new([0u8; nbytes * npoints]); ChaCha20Rng::from_seed([0u8; 32]).fill_bytes(scalars.as_mut()); - let mut points: Vec<$point> = Vec::with_capacity(npoints); - unsafe { points.set_len(points.capacity()) }; + let mut points = vec![<$point>::default(); npoints]; let mut naive = <$point>::default(); for i in 0..npoints { diff --git a/bindings/rust/src/pippenger.rs b/bindings/rust/src/pippenger.rs index bdaec95a..a378983b 100644 --- a/bindings/rust/src/pippenger.rs +++ b/bindings/rust/src/pippenger.rs @@ -5,7 +5,6 @@ use core::num::Wrapping; use core::ops::{Index, IndexMut}; use core::slice::SliceIndex; -use std::sync::Barrier; struct tile { x: usize, @@ -74,41 +73,53 @@ macro_rules! pippenger_mult_impl { let mut ret = Self { points: Vec::with_capacity(npoints), }; - unsafe { ret.points.set_len(npoints) }; - let pool = mt::da_pool(); - let ncpus = pool.max_count(); + let ncpus = rayon::current_num_threads(); if ncpus < 2 || npoints < 768 { - let p: [*const $point; 2] = [&points[0], ptr::null()]; - unsafe { $to_affines(&mut ret.points[0], &p[0], npoints) }; + let p = [points.as_ptr(), ptr::null()]; + unsafe { + $to_affines( + ret.points.as_mut_ptr(), + p.as_ptr(), + ret.points.capacity(), + ); + ret.points.set_len(ret.points.capacity()); + }; return ret; } let mut nslices = (npoints + 511) / 512; nslices = core::cmp::min(nslices, ncpus); - let wg = Arc::new((Barrier::new(2), AtomicUsize::new(nslices))); + // TODO: Use pointer arithmetic once Rust 1.75 can be used + #[allow(clippy::uninit_vec)] + unsafe { + ret.points.set_len(ret.points.capacity()); + } let (mut delta, mut rem) = (npoints / nslices + 1, Wrapping(npoints % nslices)); - let mut x = 0usize; - while x < npoints { - let out = &mut ret.points[x]; - let inp = &points[x]; - - delta -= (rem == Wrapping(0)) as usize; - rem -= Wrapping(1); - x += delta; - - let wg = wg.clone(); - pool.joined_execute(move || { - let p: [*const $point; 2] = [inp, ptr::null()]; - unsafe { $to_affines(out, &p[0], delta) }; - if wg.1.fetch_sub(1, Ordering::AcqRel) == 1 { - wg.0.wait(); + rayon::scope(|scope| { + let mut ret_points = ret.points.as_mut_slice(); + let mut points = points; + while !points.is_empty() { + if rem == Wrapping(0) { + delta -= 1; } - }); - } - wg.0.wait(); + rem -= Wrapping(1); + + let out; + (out, ret_points) = ret_points.split_at_mut(delta); + let inp; + (inp, points) = points.split_at(delta); + + scope.spawn(move |_scope| { + let p = [inp.as_ptr(), ptr::null()]; + unsafe { + $to_affines(out.as_mut_ptr(), p.as_ptr(), delta) + }; + }); + } + }); ret } @@ -121,39 +132,39 @@ macro_rules! pippenger_mult_impl { panic!("scalars length mismatch"); } - let pool = mt::da_pool(); - let ncpus = pool.max_count(); + let ncpus = rayon::current_num_threads(); if ncpus < 2 || npoints < 32 { - let p: [*const $point_affine; 2] = - [&self.points[0], ptr::null()]; - let s: [*const u8; 2] = [&scalars[0], ptr::null()]; + let p = [self.points.as_ptr(), ptr::null()]; + let s = [scalars.as_ptr(), ptr::null()]; + let mut ret = <$point>::default(); unsafe { let mut scratch: Vec = Vec::with_capacity($scratch_sizeof(npoints) / 8); - #[allow(clippy::uninit_vec)] - scratch.set_len(scratch.capacity()); - let mut ret = <$point>::default(); + $multi_scalar_mult( &mut ret, - &p[0], + p.as_ptr(), npoints, - &s[0], + s.as_ptr(), nbits, - &mut scratch[0], + scratch.as_mut_ptr(), ); - return ret; } + return ret; } let (nx, ny, window) = breakdown(nbits, pippenger_window_size(npoints), ncpus); // |grid[]| holds "coordinates" and place for result - let mut grid: Vec<(tile, Cell<$point>)> = - Vec::with_capacity(nx * ny); + let mut grid = + Vec::<(tile, Cell<$point>)>::with_capacity(nx * ny); + // TODO: Use pointer arithmetic once Rust 1.75 can be used #[allow(clippy::uninit_vec)] - unsafe { grid.set_len(grid.capacity()) }; + unsafe { + grid.set_len(grid.capacity()); + } let dx = npoints / nx; let mut y = window * (ny - 1); let mut total = 0usize; @@ -181,22 +192,15 @@ macro_rules! pippenger_mult_impl { let points = &self.points[..]; let sz = unsafe { $scratch_sizeof(0) / 8 }; - let mut row_sync: Vec = Vec::with_capacity(ny); - row_sync.resize_with(ny, Default::default); - let row_sync = Arc::new(row_sync); - let counter = Arc::new(AtomicUsize::new(0)); + let mut row_sync = Vec::with_capacity(ny); + row_sync.resize_with(ny, AtomicUsize::default); + let counter = AtomicUsize::new(0); let (tx, rx) = channel(); - let n_workers = core::cmp::min(ncpus, total); - for _ in 0..n_workers { - let tx = tx.clone(); - let counter = counter.clone(); - let row_sync = row_sync.clone(); - - pool.joined_execute(move || { + rayon::scope(|scope| { + scope.spawn_broadcast(move |_scope, _ctx| { let mut scratch = vec![0u64; sz << (window - 1)]; - let mut p: [*const $point_affine; 2] = - [ptr::null(), ptr::null()]; - let mut s: [*const u8; 2] = [ptr::null(), ptr::null()]; + let mut p = [ptr::null(), ptr::null()]; + let mut s = [ptr::null(), ptr::null()]; loop { let work = counter.fetch_add(1, Ordering::Relaxed); @@ -211,11 +215,11 @@ macro_rules! pippenger_mult_impl { unsafe { $tile_mult( grid[work].1.as_ptr(), - &p[0], + p.as_ptr(), grid[work].0.dx, - &s[0], + s.as_ptr(), nbits, - &mut scratch[0], + scratch.as_mut_ptr(), y, window, ); @@ -228,7 +232,7 @@ macro_rules! pippenger_mult_impl { } } }); - } + }); let mut ret = <$point>::default(); let mut rows = vec![false; ny]; @@ -265,29 +269,26 @@ macro_rules! pippenger_mult_impl { pub fn add(&self) -> $point { let npoints = self.points.len(); - let pool = mt::da_pool(); - let ncpus = pool.max_count(); + let ncpus = rayon::current_num_threads(); if ncpus < 2 || npoints < 384 { - let p: [*const _; 2] = [&self.points[0], ptr::null()]; + let p = [self.points.as_ptr(), ptr::null()]; let mut ret = <$point>::default(); - unsafe { $add(&mut ret, &p[0], npoints) }; + unsafe { $add(&mut ret, p.as_ptr(), npoints) }; return ret; } - let (tx, rx) = channel(); - let counter = Arc::new(AtomicUsize::new(0)); + let ret = Mutex::new(None::<$point>); + let counter = AtomicUsize::new(0); let nchunks = (npoints + 255) / 256; let chunk = npoints / nchunks + 1; - let n_workers = core::cmp::min(ncpus, nchunks); - for _ in 0..n_workers { - let tx = tx.clone(); - let counter = counter.clone(); - - pool.joined_execute(move || { + rayon::scope(|scope| { + let ret = &ret; + scope.spawn_broadcast(move |_scope, _ctx| { + let mut processed = 0; let mut acc = <$point>::default(); let mut chunk = chunk; - let mut p: [*const _; 2] = [ptr::null(), ptr::null()]; + let mut p = [ptr::null(), ptr::null()]; loop { let work = @@ -301,22 +302,27 @@ macro_rules! pippenger_mult_impl { } unsafe { let mut t = MaybeUninit::<$point>::uninit(); - $add(t.as_mut_ptr(), &p[0], chunk); + $add(t.as_mut_ptr(), p.as_ptr(), chunk); $add_or_double(&mut acc, &acc, t.as_ptr()); }; + processed += 1; } - tx.send(acc).expect("disaster"); - }); - } - - let mut ret = rx.recv().unwrap(); - for _ in 1..n_workers { - unsafe { - $add_or_double(&mut ret, &ret, &rx.recv().unwrap()) - }; - } + if processed > 0 { + let mut ret = ret.lock().unwrap(); + match ret.as_mut() { + Some(ret) => { + unsafe { $add_or_double(ret, ret, &acc) }; + } + None => { + ret.replace(acc); + } + } + } + }) + }); - ret + let mut ret = ret.lock().unwrap(); + ret.take().unwrap() } }