-
Notifications
You must be signed in to change notification settings - Fork 30
[WIP] Implementation of atomic_wait / atomic_notify #589
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev/wasm-threads
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -17,7 +17,7 @@ use alloc::vec; | |||||||||||||
| use alloc::vec::Vec; | ||||||||||||||
|
|
||||||||||||||
| #[cfg(feature = "threads")] | ||||||||||||||
| use portable_atomic::{AtomicU16, AtomicU32, AtomicU64, AtomicU8, Ordering}; | ||||||||||||||
| use portable_atomic::{AtomicBool, AtomicU16, AtomicU32, AtomicU64, AtomicU8, Ordering}; | ||||||||||||||
|
|
||||||||||||||
| use crate::error::*; | ||||||||||||||
| use crate::module::*; | ||||||||||||||
|
|
@@ -61,6 +61,7 @@ pub struct Store<'m> { | |||||||||||||
| // functions in `funcs` is stored to limit normal linking to that part. | ||||||||||||||
| func_default: Option<(&'m str, usize)>, | ||||||||||||||
| threads: Vec<Continuation<'m>>, | ||||||||||||||
| lock: spin::Mutex<()>, | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| #[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] | ||||||||||||||
|
|
@@ -102,6 +103,7 @@ impl<'m> Default for Store<'m> { | |||||||||||||
| funcs: vec![], | ||||||||||||||
| func_default: None, | ||||||||||||||
| threads: vec![], | ||||||||||||||
| lock: spin::Mutex::new(()), | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
|
|
@@ -120,42 +122,45 @@ impl<'m> Store<'m> { | |||||||||||||
| pub fn instantiate( | ||||||||||||||
| &mut self, module: Module<'m>, memory: &'m mut [u8], | ||||||||||||||
| ) -> Result<InstId, Error> { | ||||||||||||||
| let lock = self.lock.lock(); | ||||||||||||||
| let inst_id = self.insts.len(); | ||||||||||||||
| self.insts.push(Instance::default()); | ||||||||||||||
| self.last_inst().module = module; | ||||||||||||||
| for import in self.last_inst().module.imports() { | ||||||||||||||
| let type_ = import.type_(&self.last_inst().module); | ||||||||||||||
| drop(lock); | ||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a reason not to take the lock for the whole instantiation? It seems it would simplify things and we don't need concurrent instantiation. |
||||||||||||||
| self.last_inst(inst_id).module = module; | ||||||||||||||
| for import in self.last_inst(inst_id).module.imports() { | ||||||||||||||
| let type_ = import.type_(&self.last_inst(inst_id).module); | ||||||||||||||
| let id = self.resolve(&import, type_)?; | ||||||||||||||
| match import.desc { | ||||||||||||||
| ImportDesc::Func(_) => self.last_inst().funcs.ext.push(id), | ||||||||||||||
| ImportDesc::Table(_) => self.last_inst().tables.ext.push(id), | ||||||||||||||
| ImportDesc::Mem(_) => self.last_inst().mems.ext.push(id), | ||||||||||||||
| ImportDesc::Global(_) => self.last_inst().globals.ext.push(id), | ||||||||||||||
| ImportDesc::Func(_) => self.last_inst(inst_id).funcs.ext.push(id), | ||||||||||||||
| ImportDesc::Table(_) => self.last_inst(inst_id).tables.ext.push(id), | ||||||||||||||
| ImportDesc::Mem(_) => self.last_inst(inst_id).mems.ext.push(id), | ||||||||||||||
| ImportDesc::Global(_) => self.last_inst(inst_id).globals.ext.push(id), | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
| if let Some(mut parser) = self.last_inst().module.section(SectionId::Table) { | ||||||||||||||
| if let Some(mut parser) = self.last_inst(inst_id).module.section(SectionId::Table) { | ||||||||||||||
| for _ in 0 .. parser.parse_vec().into_ok() { | ||||||||||||||
| (self.last_inst().tables.int).push(Table::new(parser.parse_tabletype().into_ok())); | ||||||||||||||
| (self.last_inst(inst_id).tables.int) | ||||||||||||||
| .push(Table::new(parser.parse_tabletype().into_ok())); | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
| if let Some(mut parser) = self.last_inst().module.section(SectionId::Memory) { | ||||||||||||||
| if let Some(mut parser) = self.last_inst(inst_id).module.section(SectionId::Memory) { | ||||||||||||||
| match parser.parse_vec().into_ok() { | ||||||||||||||
| 0 => (), | ||||||||||||||
| 1 => { | ||||||||||||||
| let limits = parser.parse_memtype().into_ok(); | ||||||||||||||
| self.last_inst().mems.int.init(memory, limits)?; | ||||||||||||||
| self.last_inst(inst_id).mems.int.init(memory, limits)?; | ||||||||||||||
| } | ||||||||||||||
| _ => unimplemented!(), | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
| if let Some(mut parser) = self.last_inst().module.section(SectionId::Global) { | ||||||||||||||
| if let Some(mut parser) = self.last_inst(inst_id).module.section(SectionId::Global) { | ||||||||||||||
| for _ in 0 .. parser.parse_vec().into_ok() { | ||||||||||||||
| parser.parse_globaltype().into_ok(); | ||||||||||||||
| let value = Thread::const_expr(self, inst_id, &mut parser); | ||||||||||||||
| self.last_inst().globals.int.push(Global::new(value)); | ||||||||||||||
| self.last_inst(inst_id).globals.int.push(Global::new(value)); | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
| if let Some(mut parser) = self.last_inst().module.section(SectionId::Element) { | ||||||||||||||
| if let Some(mut parser) = self.last_inst(inst_id).module.section(SectionId::Element) { | ||||||||||||||
| for _ in 0 .. parser.parse_vec().into_ok() { | ||||||||||||||
| // TODO: This is inefficient because we only need init for active segments. | ||||||||||||||
| let mut elem = ComputeElem::new(self, inst_id); | ||||||||||||||
|
|
@@ -171,10 +176,10 @@ impl<'m> Store<'m> { | |||||||||||||
| } | ||||||||||||||
| ElemMode::Declarative => true, | ||||||||||||||
| }; | ||||||||||||||
| self.last_inst().elems.push(drop); | ||||||||||||||
| self.last_inst(inst_id).elems.push(drop); | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
| if let Some(mut parser) = self.last_inst().module.section(SectionId::Data) { | ||||||||||||||
| if let Some(mut parser) = self.last_inst(inst_id).module.section(SectionId::Data) { | ||||||||||||||
| for _ in 0 .. parser.parse_vec().into_ok() { | ||||||||||||||
| let mut data = ComputeData::new(self, inst_id); | ||||||||||||||
| parser.parse_data(&mut data).into_ok(); | ||||||||||||||
|
|
@@ -188,10 +193,10 @@ impl<'m> Store<'m> { | |||||||||||||
| true | ||||||||||||||
| } | ||||||||||||||
| }; | ||||||||||||||
| self.last_inst().datas.push(drop); | ||||||||||||||
| self.last_inst(inst_id).datas.push(drop); | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
| if let Some(mut parser) = self.last_inst().module.section(SectionId::Start) { | ||||||||||||||
| if let Some(mut parser) = self.last_inst(inst_id).module.section(SectionId::Start) { | ||||||||||||||
| let x = parser.parse_funcidx().into_ok(); | ||||||||||||||
| let ptr = self.func_ptr(inst_id, x); | ||||||||||||||
| let inst_id = ptr.instance().unwrap_wasm(); | ||||||||||||||
|
|
@@ -213,6 +218,8 @@ impl<'m> Store<'m> { | |||||||||||||
| pub fn invoke<'a>( | ||||||||||||||
| &'a mut self, inst: InstId, name: &str, args: Vec<Val>, | ||||||||||||||
| ) -> Result<RunResult<'a, 'm>, Error> { | ||||||||||||||
| #[cfg(feature = "debug")] | ||||||||||||||
| eprintln!("DBK invoke fn {:?}, instid: {:?}", name, inst); | ||||||||||||||
| let inst_id = self.inst_id(inst)?; | ||||||||||||||
| let inst = &self.insts[inst_id]; | ||||||||||||||
| let ptr = match inst.module.export(name).ok_or_else(not_found)? { | ||||||||||||||
|
|
@@ -500,8 +507,8 @@ struct Continuation<'m> { | |||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| impl<'m> Store<'m> { | ||||||||||||||
| fn last_inst(&mut self) -> &mut Instance<'m> { | ||||||||||||||
| self.insts.last_mut().unwrap() | ||||||||||||||
| fn last_inst(&mut self, inst_id: usize) -> &mut Instance<'m> { | ||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. An |
||||||||||||||
| &mut self.insts[inst_id] | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| fn inst_id(&self, inst: InstId) -> Result<usize, Error> { | ||||||||||||||
|
|
@@ -556,6 +563,8 @@ impl<'m> Store<'m> { | |||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| fn resolve(&mut self, import: &Import<'m>, imp_type_: ExternType<'m>) -> Result<Ptr, Error> { | ||||||||||||||
| #[cfg(feature = "debug")] | ||||||||||||||
| eprintln!("DBK import {:?}", import); | ||||||||||||||
| let host_name = HostName { module: import.module, name: import.name }; | ||||||||||||||
| let mut found = None; | ||||||||||||||
| let funcs_len = match self.func_default { | ||||||||||||||
|
|
@@ -822,6 +831,8 @@ impl<'m> Thread<'m> { | |||||||||||||
| } | ||||||||||||||
| LocalGet(x) => { | ||||||||||||||
| let v = self.frame().locals[x as usize]; | ||||||||||||||
| #[cfg(feature = "debug")] | ||||||||||||||
| eprintln!("DBK local read: {:?}, expected: {:?}", x, v); | ||||||||||||||
| self.push_value(v); | ||||||||||||||
| } | ||||||||||||||
| LocalSet(x) => { | ||||||||||||||
|
|
@@ -979,12 +990,18 @@ impl<'m> Thread<'m> { | |||||||||||||
| AtomicFence => (), | ||||||||||||||
| #[cfg(feature = "threads")] | ||||||||||||||
| AtomicLoad(n, m) => { | ||||||||||||||
| self.atomic_load(store.mem(inst_id, 0), NumType::i(n), n.into(), m)? | ||||||||||||||
| match self.atomic_load(store.mem(inst_id, 0), NumType::i(n), n.into(), m) { | ||||||||||||||
| Ok(_) => Ok(()), | ||||||||||||||
| Err(e) => Err(e), | ||||||||||||||
| }? | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| #[cfg(feature = "threads")] | ||||||||||||||
| AtomicLoad_(b, m) => { | ||||||||||||||
| self.atomic_load(store.mem(inst_id, 0), NumType::i(b.into()), b.into(), m)? | ||||||||||||||
| match self.atomic_load(store.mem(inst_id, 0), NumType::i(b.into()), b.into(), m) { | ||||||||||||||
| Ok(_) => Ok(()), | ||||||||||||||
| Err(e) => Err(e), | ||||||||||||||
| }? | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| #[cfg(feature = "threads")] | ||||||||||||||
|
|
@@ -1148,9 +1165,9 @@ impl<'m> Thread<'m> { | |||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| fn mem_slice<'a>( | ||||||||||||||
| &mut self, mem: &'a mut Memory<'m>, m: MemArg, i: u32, len: usize, align: bool, | ||||||||||||||
| ) -> Option<&'a mut [u8]> { | ||||||||||||||
| fn mem_slice_addr<'a>( | ||||||||||||||
| &mut self, mem: &'a Memory<'m>, m: MemArg, i: u32, len: usize, align: bool, | ||||||||||||||
| ) -> Option<u32> { | ||||||||||||||
| let ea = i.checked_add(m.offset)?; | ||||||||||||||
| if ea.checked_add(len as u32)? > mem.len() { | ||||||||||||||
| memory_too_small(ea as usize, len, mem); | ||||||||||||||
|
|
@@ -1159,6 +1176,13 @@ impl<'m> Thread<'m> { | |||||||||||||
| if align && ea % len as u32 != 0 { | ||||||||||||||
| return None; | ||||||||||||||
| } | ||||||||||||||
| Some(ea) | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| fn mem_slice<'a>( | ||||||||||||||
| &mut self, mem: &'a mut Memory<'m>, m: MemArg, i: u32, len: usize, align: bool, | ||||||||||||||
| ) -> Option<&'a mut [u8]> { | ||||||||||||||
| let ea = self.mem_slice_addr(mem, m, i, len, align)?; | ||||||||||||||
| Some(&mut mem.data[ea as usize ..][.. len]) | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
|
|
@@ -1194,6 +1218,8 @@ impl<'m> Thread<'m> { | |||||||||||||
| (NumType::F64, 64, _) => convert!(F64, u64, u64), | ||||||||||||||
| _ => unreachable!(), | ||||||||||||||
| }; | ||||||||||||||
| #[cfg(feature = "debug")] | ||||||||||||||
| eprintln!("DBK load: {:?}, val: {:?}", i, c); | ||||||||||||||
| self.push_value(c); | ||||||||||||||
| Ok(()) | ||||||||||||||
| } | ||||||||||||||
|
|
@@ -1431,11 +1457,13 @@ impl<'m> Thread<'m> { | |||||||||||||
| let i = self.pop_value().unwrap_i32(); | ||||||||||||||
|
|
||||||||||||||
| // Trap if memory access is OOB. | ||||||||||||||
| let _mem = match self.mem_slice(mem, m, i, 4, true) { | ||||||||||||||
| let _mem = match self.mem_slice_addr(mem, m, i, 4, true) { | ||||||||||||||
| None => return Err(trap()), | ||||||||||||||
| Some(x) => x, | ||||||||||||||
| }; | ||||||||||||||
| self.push_value(count); | ||||||||||||||
|
|
||||||||||||||
| let notified = mem.share_data.notify(_mem, count.unwrap_i32()); | ||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: |
||||||||||||||
| self.push_value(Val::I32(notified)); | ||||||||||||||
| Ok(()) | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
|
|
@@ -1448,23 +1476,36 @@ impl<'m> Thread<'m> { | |||||||||||||
| } | ||||||||||||||
| let _timeout = self.pop_value().unwrap_i64(); | ||||||||||||||
| let expected = self.pop_value(); | ||||||||||||||
| self.atomic_load(mem, t, n, m)?; | ||||||||||||||
| let ea = self.atomic_load(mem, t, n, m)?; | ||||||||||||||
| let read = self.pop_value(); | ||||||||||||||
| #[cfg(feature = "debug")] | ||||||||||||||
| eprintln!("DBK wait value read: {:?}, expected: {:?}", read, expected); | ||||||||||||||
| if read != expected { | ||||||||||||||
| self.push_value(Val::I32(1)); | ||||||||||||||
| #[cfg(feature = "debug")] | ||||||||||||||
| eprintln!("DBK wait value read: {:?} != expected: {:?}", read, expected); | ||||||||||||||
| return Ok(()); | ||||||||||||||
| } | ||||||||||||||
| let waiting_thread = mem.share_data.wait(ea); | ||||||||||||||
| while !waiting_thread.notify.load(portable_atomic::Ordering::SeqCst) {} | ||||||||||||||
| self.push_value(Val::I32(0)); | ||||||||||||||
| Ok(()) | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| #[cfg(feature = "threads")] | ||||||||||||||
| fn atomic_load( | ||||||||||||||
| &mut self, mem: &mut Memory<'m>, t: NumType, n: usize, m: MemArg, | ||||||||||||||
| ) -> Result<(), Error> { | ||||||||||||||
| &mut self, mem_param: &mut Memory<'m>, t: NumType, n: usize, m: MemArg, | ||||||||||||||
| ) -> Result<u32, Error> { | ||||||||||||||
| let i = self.pop_value().unwrap_i32(); | ||||||||||||||
| let mem = match self.mem_slice(mem, m, i, n / 8, true) { | ||||||||||||||
| let ea = match self.mem_slice_addr(mem_param, m, i, n / 8, true) { | ||||||||||||||
| None => return Err(trap()), | ||||||||||||||
| Some(x) => x, | ||||||||||||||
| }; | ||||||||||||||
| let mem = match self.mem_slice(mem_param, m, i, n / 8, true) { | ||||||||||||||
| None => return Err(trap()), | ||||||||||||||
| Some(x) => x, | ||||||||||||||
| }; | ||||||||||||||
|
|
||||||||||||||
| macro_rules! convert { | ||||||||||||||
| ($T:ident, $t:ident, $s:ident) => {{ | ||||||||||||||
| let ptr = mem.as_mut_ptr() as *mut $s; | ||||||||||||||
|
|
@@ -1483,7 +1524,7 @@ impl<'m> Thread<'m> { | |||||||||||||
| _ => unreachable!(), | ||||||||||||||
| }; | ||||||||||||||
| self.push_value(c); | ||||||||||||||
| Ok(()) | ||||||||||||||
| Ok(ea) | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| #[cfg(feature = "threads")] | ||||||||||||||
|
|
@@ -1651,6 +1692,56 @@ impl Table { | |||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| #[cfg(feature = "threads")] | ||||||||||||||
| #[derive(Debug, Default)] | ||||||||||||||
| #[allow(dead_code)] | ||||||||||||||
| struct WaitingThread { | ||||||||||||||
| address: u32, | ||||||||||||||
| thread_id: u32, | ||||||||||||||
| pub notify: AtomicBool, | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| #[cfg(feature = "threads")] | ||||||||||||||
| #[derive(Debug, Default)] | ||||||||||||||
| struct ShareData { | ||||||||||||||
| queue: Vec<WaitingThread>, | ||||||||||||||
| lock: spin::Mutex<()>, | ||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here, in Rust the data protected by the mutex is usually in the mutex. |
||||||||||||||
| max_thread_id: u32, | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| #[cfg(feature = "threads")] | ||||||||||||||
| impl ShareData { | ||||||||||||||
| fn wait(&mut self, address: u32) -> &WaitingThread { | ||||||||||||||
| let _lock = self.lock.lock(); | ||||||||||||||
| self.queue.push(WaitingThread { | ||||||||||||||
| address, | ||||||||||||||
| thread_id: self.max_thread_id, | ||||||||||||||
| notify: false.into(), | ||||||||||||||
| }); | ||||||||||||||
| self.max_thread_id += 1; | ||||||||||||||
| return self.queue.last().unwrap(); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| fn notify(&mut self, address: u32, number: u32) -> u32 { | ||||||||||||||
| let _lock = self.lock.lock(); | ||||||||||||||
| let mut notified = 0; | ||||||||||||||
| for thread in self.queue.iter_mut() { | ||||||||||||||
| #[cfg(feature = "debug")] | ||||||||||||||
| eprintln!("DBK notify {:?}", thread); | ||||||||||||||
| if notified >= number { | ||||||||||||||
| break; | ||||||||||||||
| } | ||||||||||||||
| if thread.address == address && !thread.notify.load(portable_atomic::Ordering::SeqCst) { | ||||||||||||||
| thread.notify.store(true, portable_atomic::Ordering::SeqCst); | ||||||||||||||
| notified += 1 | ||||||||||||||
| } | ||||||||||||||
| #[cfg(feature = "debug")] | ||||||||||||||
| eprintln!("DBK notify {:?}", thread); | ||||||||||||||
| } | ||||||||||||||
| notified | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| #[derive(Debug, Default)] | ||||||||||||||
| struct Memory<'m> { | ||||||||||||||
| // May be shorter than the maximum length for the module, but not larger. | ||||||||||||||
|
|
@@ -1665,6 +1756,8 @@ struct Memory<'m> { | |||||||||||||
| // https://github.com/google/wasefire/pull/513#discussion_r1652977484 | ||||||||||||||
| #[cfg(feature = "threads")] | ||||||||||||||
| share: bool, | ||||||||||||||
| #[cfg(feature = "threads")] | ||||||||||||||
| share_data: ShareData, | ||||||||||||||
|
Comment on lines
1757
to
+1760
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: if it doesn't make sense to have
Suggested change
|
||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| impl<'m> Memory<'m> { | ||||||||||||||
|
|
||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason not to split the store into mutable and immutable part and put the mutable part in the mutex?
Maybe the
threadsfield could becomeMutex<Vec<Mutex<Vec<Continuation>>>>where the outerMutex<Vecis the different concurrent threads and the innerMutex<Vecis the "call-stack" (with respect to host functions) of the given thread.