Skip to content

Commit 3d14cbf

Browse files
committed
make thread.yield a no-op in non-blocking contexts
Per the proposed spec changes, `thread.yield` should return control to the guest immediately without allowing any other thread to run. Similarly, when an async-lifted export or callback returns `CALLBACK_CODE_YIELD`, we should call the callback again immediately without allowing another thread to run. Signed-off-by: Joel Dice <joel.dice@fermyon.com>
1 parent 4693143 commit 3d14cbf

File tree

1 file changed

+90
-56
lines changed

1 file changed

+90
-56
lines changed

crates/wasmtime/src/runtime/component/concurrent.rs

Lines changed: 90 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -600,7 +600,10 @@ enum GuestCallKind {
600600
},
601601
/// Indicates that a new guest task call is pending and may be executed
602602
/// using the specified closure.
603-
StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
603+
///
604+
/// If the closure returns `Ok(Some(call))`, the `call` should be run
605+
/// immediately using `handle_guest_call`.
606+
StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>),
604607
StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
605608
}
606609

@@ -818,53 +821,58 @@ pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
818821

819822
/// Execute the specified guest call.
820823
fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
821-
match call.kind {
822-
GuestCallKind::DeliverEvent { instance, set } => {
823-
let (event, waitable) = instance
824-
.get_event(store, call.thread.task, set, true)?
825-
.unwrap();
826-
let state = store.concurrent_state_mut();
827-
let task = state.get_mut(call.thread.task)?;
828-
let runtime_instance = task.instance;
829-
let handle = waitable.map(|(_, v)| v).unwrap_or(0);
824+
let mut next = Some(call);
825+
while let Some(call) = next.take() {
826+
match call.kind {
827+
GuestCallKind::DeliverEvent { instance, set } => {
828+
let (event, waitable) = instance
829+
.get_event(store, call.thread.task, set, true)?
830+
.unwrap();
831+
let state = store.concurrent_state_mut();
832+
let task = state.get_mut(call.thread.task)?;
833+
let runtime_instance = task.instance;
834+
let handle = waitable.map(|(_, v)| v).unwrap_or(0);
830835

831-
log::trace!(
832-
"use callback to deliver event {event:?} to {:?} for {waitable:?}",
833-
call.thread,
834-
);
836+
log::trace!(
837+
"use callback to deliver event {event:?} to {:?} for {waitable:?}",
838+
call.thread,
839+
);
835840

836-
let old_thread = state.guest_thread.replace(call.thread);
837-
log::trace!(
838-
"GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
839-
call.thread
840-
);
841+
let old_thread = state.guest_thread.replace(call.thread);
842+
log::trace!(
843+
"GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
844+
call.thread
845+
);
841846

842-
store.maybe_push_call_context(call.thread.task)?;
847+
store.maybe_push_call_context(call.thread.task)?;
843848

844-
let state = store.concurrent_state_mut();
845-
state.enter_instance(runtime_instance);
849+
let state = store.concurrent_state_mut();
850+
state.enter_instance(runtime_instance);
846851

847-
let callback = state.get_mut(call.thread.task)?.callback.take().unwrap();
852+
let callback = state.get_mut(call.thread.task)?.callback.take().unwrap();
848853

849-
let code = callback(store, runtime_instance, event, handle)?;
854+
let code = callback(store, runtime_instance, event, handle)?;
850855

851-
let state = store.concurrent_state_mut();
856+
let state = store.concurrent_state_mut();
852857

853-
state.get_mut(call.thread.task)?.callback = Some(callback);
854-
state.exit_instance(runtime_instance)?;
858+
state.get_mut(call.thread.task)?.callback = Some(callback);
859+
state.exit_instance(runtime_instance)?;
855860

856-
store.maybe_pop_call_context(call.thread.task)?;
861+
store.maybe_pop_call_context(call.thread.task)?;
857862

858-
instance.handle_callback_code(store, call.thread, runtime_instance, code)?;
863+
next = instance.handle_callback_code(store, call.thread, runtime_instance, code)?;
859864

860-
store.concurrent_state_mut().guest_thread = old_thread;
861-
log::trace!("GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread");
862-
}
863-
GuestCallKind::StartImplicit(fun) => {
864-
fun(store)?;
865-
}
866-
GuestCallKind::StartExplicit(fun) => {
867-
fun(store)?;
865+
store.concurrent_state_mut().guest_thread = old_thread;
866+
log::trace!(
867+
"GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread"
868+
);
869+
}
870+
GuestCallKind::StartImplicit(fun) => {
871+
next = fun(store)?;
872+
}
873+
GuestCallKind::StartExplicit(fun) => {
874+
fun(store)?;
875+
}
868876
}
869877
}
870878

@@ -1589,13 +1597,16 @@ impl Instance {
15891597

15901598
/// Handle the `CallbackCode` returned from an async-lifted export or its
15911599
/// callback.
1600+
///
1601+
/// If this returns `Ok(Some(call))`, then `call` should be run immediately
1602+
/// using `handle_guest_call`.
15921603
fn handle_callback_code(
15931604
self,
15941605
store: &mut StoreOpaque,
15951606
guest_thread: QualifiedThreadId,
15961607
runtime_instance: RuntimeComponentInstanceIndex,
15971608
code: u32,
1598-
) -> Result<()> {
1609+
) -> Result<Option<GuestCall>> {
15991610
let (code, set) = unpack_callback_code(code);
16001611

16011612
log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
@@ -1613,7 +1624,7 @@ impl Instance {
16131624
Ok(TableId::<WaitableSet>::new(set))
16141625
};
16151626

1616-
match code {
1627+
Ok(match code {
16171628
callback_code::EXIT => {
16181629
log::trace!("implicit thread {guest_thread:?} completed");
16191630
self.cleanup_thread(store, guest_thread, runtime_instance)?;
@@ -1633,20 +1644,30 @@ impl Instance {
16331644
task.callback = None;
16341645
}
16351646
}
1647+
None
16361648
}
16371649
callback_code::YIELD => {
1638-
// Push this thread onto the "low priority" queue so it runs after
1639-
// any other threads have had a chance to run.
16401650
let task = state.get_mut(guest_thread.task)?;
16411651
assert!(task.event.is_none());
16421652
task.event = Some(Event::None);
1643-
state.push_low_priority(WorkItem::GuestCall(GuestCall {
1653+
let call = GuestCall {
16441654
thread: guest_thread,
16451655
kind: GuestCallKind::DeliverEvent {
16461656
instance: self,
16471657
set: None,
16481658
},
1649-
}));
1659+
};
1660+
if state.may_block(guest_thread.task) {
1661+
// Push this thread onto the "low priority" queue so it runs
1662+
// after any other threads have had a chance to run.
1663+
state.push_low_priority(WorkItem::GuestCall(call));
1664+
None
1665+
} else {
1666+
// Yielding in a non-blocking context is defined as a no-op
1667+
// according to the spec, so we must run this thread
1668+
// immediately without allowing any others to run.
1669+
Some(call)
1670+
}
16501671
}
16511672
callback_code::WAIT | callback_code::POLL => {
16521673
state.check_blocking_for(guest_thread.task)?;
@@ -1698,11 +1719,10 @@ impl Instance {
16981719
_ => unreachable!(),
16991720
}
17001721
}
1722+
None
17011723
}
17021724
_ => bail!("unsupported callback code: {code}"),
1703-
}
1704-
1705-
Ok(())
1725+
})
17061726
}
17071727

17081728
fn cleanup_thread(
@@ -1872,10 +1892,9 @@ impl Instance {
18721892
// function returns a `i32` result.
18731893
let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
18741894

1875-
self.handle_callback_code(store, guest_thread, callee_instance, code)?;
1876-
1877-
Ok(())
1878-
}) as Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>
1895+
self.handle_callback_code(store, guest_thread, callee_instance, code)
1896+
})
1897+
as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>
18791898
} else {
18801899
let token = StoreToken::new(store.as_context_mut());
18811900
Box::new(move |store: &mut dyn VMStore| {
@@ -2011,7 +2030,7 @@ impl Instance {
20112030
}
20122031
}
20132032

2014-
Ok(())
2033+
Ok(None)
20152034
})
20162035
};
20172036

@@ -3079,9 +3098,20 @@ impl Instance {
30793098
) -> Result<WaitResult> {
30803099
self.id().get(store).check_may_leave(caller)?;
30813100

3082-
if to_thread.is_none() && !yielding {
3083-
// This is a `thread.suspend` call
3084-
store.concurrent_state_mut().check_blocking()?;
3101+
if to_thread.is_none() {
3102+
let state = store.concurrent_state_mut();
3103+
if yielding {
3104+
// This is a `thread.yield` call
3105+
if !state.may_block(state.guest_thread.unwrap().task) {
3106+
// The spec defines `thread.yield` to be a no-op in a
3107+
// non-blocking context, so we return immediately without giving
3108+
// any other thread a chance to run.
3109+
return Ok(WaitResult::Completed);
3110+
}
3111+
} else {
3112+
// This is a `thread.suspend` call
3113+
state.check_blocking()?;
3114+
}
30853115
}
30863116

30873117
// There could be a pending cancellation from a previous uncancellable wait
@@ -4795,13 +4825,17 @@ impl ConcurrentState {
47954825
}
47964826

47974827
fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> {
4798-
let task = self.get_mut(task).unwrap();
4799-
if task.async_function || task.returned_or_cancelled() {
4828+
if self.may_block(task) {
48004829
Ok(())
48014830
} else {
48024831
Err(Trap::CannotBlockSyncTask.into())
48034832
}
48044833
}
4834+
4835+
fn may_block(&mut self, task: TableId<GuestTask>) -> bool {
4836+
let task = self.get_mut(task).unwrap();
4837+
task.async_function || task.returned_or_cancelled()
4838+
}
48054839
}
48064840

48074841
/// Provide a type hint to compiler about the shape of a parameter lower

0 commit comments

Comments
 (0)