Skip to content

Commit

Permalink
Don't invoke the RustFutureContinuationCallback while holding the mutex
Browse files Browse the repository at this point in the history
As described in the comments, this could lead to a deadlock if the
foreign code immediately polled the future.  This doesn't currently
happen with any of the core bindings, but it might with other bindings
and it seems like a footgun in general.

To accomplish this, I made a separate class to handle the state and
moved the mutex to only wrap that state.  The state mutation happens
with the mutex locked and the callback invocation happens after it's
unlocked.
  • Loading branch information
bendk committed Dec 13, 2023
1 parent 4f4c989 commit 1dcc634
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 32 deletions.
14 changes: 7 additions & 7 deletions uniffi_core/src/ffi/rustfuture/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ where
// This Mutex should never block if our code is working correctly, since there should not be
// multiple threads calling [Self::poll] and/or [Self::complete] at the same time.
future: Mutex<WrappedFuture<F, T, UT>>,
scheduler: Mutex<Scheduler>,
scheduler: Scheduler,
// UT is used as the generic parameter for [LowerReturn].
// Let's model this with PhantomData as a function that inputs a UT value.
_phantom: PhantomData<fn(UT) -> ()>,
Expand All @@ -218,7 +218,7 @@ where
pub(super) fn new(future: F, _tag: UT) -> Arc<Self> {
Arc::new(Self {
future: Mutex::new(WrappedFuture::new(future)),
scheduler: Mutex::new(Scheduler::new()),
scheduler: Scheduler::new(),
_phantom: PhantomData,
})
}
Expand All @@ -232,20 +232,20 @@ where
if ready {
callback(data, RustFuturePoll::Ready)
} else {
self.scheduler.lock().unwrap().store(callback, data);
self.scheduler.store(callback, data);
}
}

pub(super) fn is_cancelled(&self) -> bool {
self.scheduler.lock().unwrap().is_cancelled()
self.scheduler.is_cancelled()
}

pub(super) fn wake(&self) {
self.scheduler.lock().unwrap().wake();
self.scheduler.wake();
}

pub(super) fn cancel(&self) {
self.scheduler.lock().unwrap().cancel();
self.scheduler.cancel();
}

pub(super) fn complete(&self, call_status: &mut RustCallStatus) -> T::ReturnType {
Expand All @@ -254,7 +254,7 @@ where

pub(super) fn free(self: Arc<Self>) {
// Call cancel() to send any leftover data to the continuation callback
self.scheduler.lock().unwrap().cancel();
self.scheduler.cancel();
// Ensure we drop our inner future, releasing all held references
self.future.lock().unwrap().free();
}
Expand Down
134 changes: 109 additions & 25 deletions uniffi_core/src/ffi/rustfuture/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */

use std::mem;
use std::{mem, sync::Mutex};

use super::{RustFutureContinuationCallback, RustFuturePoll};

Expand All @@ -20,7 +20,12 @@ use super::{RustFutureContinuationCallback, RustFuturePoll};
/// state, invoking any future callbacks as soon as they're stored.
#[derive(Debug)]
pub(super) enum Scheduler {
pub(super) struct Scheduler {
state: Mutex<SchedulerState>,
}

#[derive(Debug)]
pub(super) enum SchedulerState {
/// No continuations set, neither wake() nor cancel() called.
Empty,
/// `wake()` was called when there was no continuation set. The next time `store` is called,
Expand All @@ -33,58 +38,137 @@ pub(super) enum Scheduler {
Set(RustFutureContinuationCallback, *const ()),
}

impl Scheduler {
pub(super) fn new() -> Self {
Self::Empty
/// Encapsulates a call to a RustFutureContinuationCallback
struct CallbackCall {
callback: RustFutureContinuationCallback,
data: *const (),
poll_data: RustFuturePoll,
}

impl CallbackCall {
fn new(
callback: RustFutureContinuationCallback,
data: *const (),
poll_data: RustFuturePoll,
) -> Self {
Self {
callback,
data,
poll_data,
}
}

/// Store new continuation data if we are in the `Empty` state. If we are in the `Waked` or
/// `Cancelled` state, call the continuation immediately with the data.
pub(super) fn store(&mut self, callback: RustFutureContinuationCallback, data: *const ()) {
fn invoke(self) {
(self.callback)(self.data, self.poll_data)
}
}

/// The SchedulerState impl contains all the ways to mutate the inner state field.
///
/// These methods determine if we should call the continuation callback, but don't directly call
/// it. This is important, since if we called the callback, the foreign code might poll the future
/// again, which could lead to a deadlock if the Mutex around us was still locked.
impl SchedulerState {
fn store(
&mut self,
callback: RustFutureContinuationCallback,
data: *const (),
) -> Option<CallbackCall> {
match self {
Self::Empty => *self = Self::Set(callback, data),
Self::Empty => {
*self = Self::Set(callback, data);
None
}
Self::Set(old_callback, old_data) => {
log::error!(
"store: observed `Self::Set` state. Is poll() being called from multiple threads at once?"
"store: observed `SchedulerState::Set` state. Is poll() being called from multiple threads at once?"
);
old_callback(*old_data, RustFuturePoll::Ready);
let call = CallbackCall::new(*old_callback, *old_data, RustFuturePoll::Ready);
*self = Self::Set(callback, data);
Some(call)
}
Self::Waked => {
*self = Self::Empty;
callback(data, RustFuturePoll::MaybeReady);
}
Self::Cancelled => {
callback(data, RustFuturePoll::Ready);
Some(CallbackCall::new(
callback,
data,
RustFuturePoll::MaybeReady,
))
}
Self::Cancelled => Some(CallbackCall::new(callback, data, RustFuturePoll::Ready)),
}
}

pub(super) fn wake(&mut self) {
fn wake(&mut self) -> Option<CallbackCall> {
match self {
// If we had a continuation set, then call it and transition to the `Empty` state.
Self::Set(callback, old_data) => {
SchedulerState::Set(callback, old_data) => {
let old_data = *old_data;
let callback = *callback;
*self = Self::Empty;
callback(old_data, RustFuturePoll::MaybeReady);
*self = SchedulerState::Empty;
Some(CallbackCall::new(
callback,
old_data,
RustFuturePoll::MaybeReady,
))
}
// If we were in the `Empty` state, then transition to `Waked`. The next time `store`
// is called, we will immediately call the continuation.
Self::Empty => *self = Self::Waked,
SchedulerState::Empty => {
*self = SchedulerState::Waked;
None
}
// This is a no-op if we were in the `Cancelled` or `Waked` state.
_ => (),
_ => None,
}
}

fn cancel(&mut self) -> Option<CallbackCall> {
if let SchedulerState::Set(callback, old_data) =
mem::replace(self, SchedulerState::Cancelled)
{
Some(CallbackCall::new(callback, old_data, RustFuturePoll::Ready))
} else {
None
}
}
}

impl Scheduler {
pub(super) fn new() -> Self {
Self {
state: Mutex::new(SchedulerState::Empty),
}
}

pub(super) fn cancel(&mut self) {
if let Self::Set(callback, old_data) = mem::replace(self, Self::Cancelled) {
callback(old_data, RustFuturePoll::Ready);
/// Call a method on the inner state field
///
/// If it returns a callback to invoke, then make the call after releasing the mutex.
fn call_state_method(&self, f: impl Fn(&mut SchedulerState) -> Option<CallbackCall>) {
let mut state = self.state.lock().unwrap();
let callback_call = f(&mut state);
drop(state);
if let Some(callback_call) = callback_call {
callback_call.invoke()
}
}

/// Store new continuation data if we are in the `Empty` state. If we are in the `Waked` or
/// `Cancelled` state, call the continuation immediately with the data.
pub(super) fn store(&self, callback: RustFutureContinuationCallback, data: *const ()) {
self.call_state_method(|state| state.store(callback, data))
}

pub(super) fn wake(&self) {
self.call_state_method(SchedulerState::wake)
}

pub(super) fn cancel(&self) {
self.call_state_method(SchedulerState::cancel)
}

pub(super) fn is_cancelled(&self) -> bool {
matches!(self, Self::Cancelled)
matches!(*self.state.lock().unwrap(), SchedulerState::Cancelled)
}
}

Expand Down

0 comments on commit 1dcc634

Please sign in to comment.