diff --git a/Cargo.toml b/Cargo.toml index aae31335..8164f4d3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,3 +20,9 @@ serde = { version = "1", features = ["derive"] } uniffi = "=0.28.0" uniffi_bindgen = "=0.28.0" uniffi_meta = "=0.28.0" +matrix-sdk-ffi = { rev = "8fbf1101c232bcd69583eaaa4605542653213822", git = "https://github.com/vimeiro-co/matrix-rust-sdk-mirror" } +matrix-sdk = { rev = "8fbf1101c232bcd69583eaaa4605542653213822", git = "https://github.com/vimeiro-co/matrix-rust-sdk-mirror" } + +[patch.crates-io] +# async-compat = { git = "https://github.com/jplatte/async-compat", rev = "16dc8597ec09a6102d58d4e7b67714a35dd0ecb8" } +# const_panic = { git = "https://github.com/jplatte/const_panic", rev = "9024a4cb3eac45c1d2d980f17aaee287b17be498" } diff --git a/cpp/includes/UniffiCallInvoker.h b/cpp/includes/UniffiCallInvoker.h index 598305c5..520f462d 100644 --- a/cpp/includes/UniffiCallInvoker.h +++ b/cpp/includes/UniffiCallInvoker.h @@ -5,9 +5,10 @@ */ #pragma once #include -#include +#include #include #include +#include #include namespace uniffi_runtime { @@ -57,24 +58,37 @@ class UniffiCallInvoker { if (std::this_thread::get_id() == threadId_) { func(rt); } else { - std::promise promise; - auto future = promise.get_future(); + std::mutex mtx; + std::condition_variable cv; + bool done = false; // The runtime argument was added to CallFunc in // https://github.com/facebook/react-native/pull/43375 // - // Once that is released, there will be a deprecation period. - // - // Any time during the deprecation period, we can switch `&rt` - // from being a captured variable to being an argument, i.e. - // commenting out one line, and uncommenting the other. - std::function wrapper = [&func, &promise, &rt]() { - // react::CallFunc wrapper = [&func, &promise](jsi::Runtime &rt) { + // This can be changed once that change is released. + // react::CallFunc wrapper = [&func, &mtx, &cv, &done](jsi::Runtime &rt) { + std::function wrapper = [&func, &rt, &mtx, &cv, &done]() { func(rt); - promise.set_value(); + { + std::lock_guard lock(mtx); + done = true; + } + cv.notify_one(); }; callInvoker_->invokeAsync(std::move(wrapper)); - future.wait(); + + std::unique_lock lock(mtx); + cv.wait(lock, [&done] { return done; }); } } + + /** + * Invokes the given function on the JS thread, by adding to + * the event queue. + */ + void invokeNonBlocking(jsi::Runtime &rt, UniffiCallFunc func) { + // react::CallFunc wrapper = [func](jsi::Runtime &rt) { + std::function wrapper = [func, &rt]() { func(rt); }; + callInvoker_->invokeAsync(std::move(wrapper)); + } }; } // namespace uniffi_runtime diff --git a/crates/ubrn_bindgen/src/bindings/react_native/gen_cpp/templates/CallbackFunction.cpp b/crates/ubrn_bindgen/src/bindings/react_native/gen_cpp/templates/CallbackFunction.cpp index 20aadbd2..385ad427 100644 --- a/crates/ubrn_bindgen/src/bindings/react_native/gen_cpp/templates/CallbackFunction.cpp +++ b/crates/ubrn_bindgen/src/bindings/react_native/gen_cpp/templates/CallbackFunction.cpp @@ -187,7 +187,11 @@ namespace {{ ns }} { }; // We'll then call that lambda from the callInvoker which will // look after calling it on the correct thread. + {% if callback.is_blocking() -%} callInvoker->invokeBlocking(rt, jsLambda); + {%- else %} + callInvoker->invokeNonBlocking(rt, jsLambda); + {%- endif %} }; return callback; } diff --git a/crates/ubrn_bindgen/src/bindings/react_native/mod.rs b/crates/ubrn_bindgen/src/bindings/react_native/mod.rs index cce42976..1f12d6e8 100644 --- a/crates/ubrn_bindgen/src/bindings/react_native/mod.rs +++ b/crates/ubrn_bindgen/src/bindings/react_native/mod.rs @@ -501,6 +501,10 @@ impl FfiCallbackFunction { .find(|a| a.is_return() && !a.type_().is_void()); arg.map(|a| a.type_()) } + + fn is_blocking(&self) -> bool { + self.name() != "RustFutureContinuationCallback" + } } fn is_future(nm: &str) -> bool { diff --git a/fixtures/async-deadlock/Cargo.toml b/fixtures/async-deadlock/Cargo.toml new file mode 100644 index 00000000..37f7e9c8 --- /dev/null +++ b/fixtures/async-deadlock/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "regression-async-deadlock" +edition = "2021" +version = "0.0.1" +authors = ["Filament Team "] +license = "MPL-2.0" +publish = false + +[lib] +crate-type = ["lib", "staticlib", "cdylib"] +name = "regression_async_deadlock" + +[patch.crates-io] +async-compat = { git = "https://github.com/jplatte/async-compat", rev = "16dc8597ec09a6102d58d4e7b67714a35dd0ecb8" } +const_panic = { git = "https://github.com/jplatte/const_panic", rev = "9024a4cb3eac45c1d2d980f17aaee287b17be498" } + +[dependencies] +uniffi = { workspace = true } +matrix-sdk-ffi = { workspace = true } +matrix-sdk = { workspace = true } +thiserror = "1.0" + + +[build-dependencies] +uniffi = { workspace = true, features = ["build"] } + +[dev-dependencies] +uniffi = { workspace = true, features = ["bindgen-tests"] } diff --git a/fixtures/async-deadlock/src/lib.rs b/fixtures/async-deadlock/src/lib.rs new file mode 100644 index 00000000..c6281c15 --- /dev/null +++ b/fixtures/async-deadlock/src/lib.rs @@ -0,0 +1,16 @@ +/* + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/ + */ + +use std::sync::Arc; + +use matrix_sdk_ffi::client_builder::ClientBuilder; + +#[uniffi::export] +pub fn get_matrix_client_builder() -> Arc { + ClientBuilder::new() +} + +uniffi::setup_scaffolding!(); diff --git a/fixtures/async-deadlock/tests/bindings/test_async_deadlock.py b/fixtures/async-deadlock/tests/bindings/test_async_deadlock.py new file mode 100644 index 00000000..91f9a135 --- /dev/null +++ b/fixtures/async-deadlock/tests/bindings/test_async_deadlock.py @@ -0,0 +1,292 @@ +from regression_async_deadlock import * +import unittest +from datetime import datetime +import asyncio +import typing +import futures + +def now(): + return datetime.now() + +class TestFutures(unittest.TestCase): + def test_always_ready(self): + async def test(): + self.assertEqual(await always_ready(), True) + + asyncio.run(test()) + + def test_void(self): + async def test(): + self.assertEqual(await void(), None) + + asyncio.run(test()) + + def test_sleep(self): + async def test(): + t0 = now() + await sleep(2000) + t1 = now() + + t_delta = (t1 - t0).total_seconds() + self.assertGreater(t_delta, 2) + + asyncio.run(test()) + + def test_sequential_futures(self): + async def test(): + t0 = now() + result_alice = await say_after(100, 'Alice') + result_bob = await say_after(200, 'Bob') + t1 = now() + + t_delta = (t1 - t0).total_seconds() + self.assertGreater(t_delta, 0.3) + self.assertEqual(result_alice, 'Hello, Alice!') + self.assertEqual(result_bob, 'Hello, Bob!') + + asyncio.run(test()) + + def test_concurrent_tasks(self): + async def test(): + alice = asyncio.create_task(say_after(100, 'Alice')) + bob = asyncio.create_task(say_after(200, 'Bob')) + + t0 = now() + result_alice = await alice + result_bob = await bob + t1 = now() + + t_delta = (t1 - t0).total_seconds() + self.assertGreater(t_delta, 0.2) + self.assertEqual(result_alice, 'Hello, Alice!') + self.assertEqual(result_bob, 'Hello, Bob!') + + asyncio.run(test()) + + def test_async_methods(self): + async def test(): + megaphone = new_megaphone() + t0 = now() + result_alice = await megaphone.say_after(200, 'Alice') + t1 = now() + + t_delta = (t1 - t0).total_seconds() + self.assertGreater(t_delta, 0.2) + self.assertEqual(result_alice, 'HELLO, ALICE!') + + asyncio.run(test()) + + def test_async_constructors(self): + # Check the default constructor has been disabled. + with self.assertRaises(ValueError) as e: + Megaphone() + self.assertTrue(str(e.exception).startswith("async constructors not supported")) + + async def test(): + megaphone = await Megaphone.secondary() + result_alice = await megaphone.say_after(0, 'Alice') + self.assertEqual(result_alice, 'HELLO, ALICE!') + + udl_megaphone = await UdlMegaphone.secondary() + result_udl = await udl_megaphone.say_after(0, 'udl') + self.assertEqual(result_udl, 'HELLO, UDL!') + + asyncio.run(test()) + + def test_async_trait_interface_methods(self): + async def test(): + traits = get_say_after_traits() + t0 = now() + result1 = await traits[0].say_after(100, 'Alice') + result2 = await traits[1].say_after(100, 'Bob') + t1 = now() + + self.assertEqual(result1, 'Hello, Alice!') + self.assertEqual(result2, 'Hello, Bob!') + t_delta = (t1 - t0).total_seconds() + self.assertGreater(t_delta, 0.2) + + asyncio.run(test()) + + def test_udl_async_trait_interface_methods(self): + async def test(): + traits = get_say_after_udl_traits() + t0 = now() + result1 = await traits[0].say_after(100, 'Alice') + result2 = await traits[1].say_after(100, 'Bob') + t1 = now() + + self.assertEqual(result1, 'Hello, Alice!') + self.assertEqual(result2, 'Hello, Bob!') + t_delta = (t1 - t0).total_seconds() + self.assertGreater(t_delta, 0.2) + + asyncio.run(test()) + + def test_foreign_async_trait_interface_methods(self): + class PyAsyncParser: + def __init__(self): + self.completed_delays = 0 + + async def as_string(self, delay_ms, value): + await asyncio.sleep(delay_ms / 1000.0) + return str(value) + + async def try_from_string(self, delay_ms, value): + await asyncio.sleep(delay_ms / 1000.0) + if value == "force-unexpected-exception": + raise RuntimeError("UnexpectedException") + try: + return int(value) + except: + raise ParserError.NotAnInt() + + async def delay(self, delay_ms): + await asyncio.sleep(delay_ms / 1000.0) + self.completed_delays += 1 + + async def try_delay(self, delay_ms): + try: + delay_ms = int(delay_ms) + except: + raise ParserError.NotAnInt() + await asyncio.sleep(delay_ms / 1000.0) + self.completed_delays += 1 + + async def test(): + trait_obj = PyAsyncParser() + self.assertEqual(await as_string_using_trait(trait_obj, 1, 42), "42") + self.assertEqual(await try_from_string_using_trait(trait_obj, 1, "42"), 42) + with self.assertRaises(ParserError.NotAnInt): + await try_from_string_using_trait(trait_obj, 1, "fourty-two") + with self.assertRaises(ParserError.UnexpectedError): + await try_from_string_using_trait(trait_obj, 1, "force-unexpected-exception") + await delay_using_trait(trait_obj, 1) + await try_delay_using_trait(trait_obj, "1") + with self.assertRaises(ParserError.NotAnInt): + await try_delay_using_trait(trait_obj, "one") + + completed_delays_before = trait_obj.completed_delays + await cancel_delay_using_trait(trait_obj, 10) + # sleep long enough so that the `delay()` call would finish if it wasn't cancelled. + await asyncio.sleep(0.1) + # If the task was cancelled, then completed_delays won't have increased + self.assertEqual(trait_obj.completed_delays, completed_delays_before) + + + asyncio.run(test()) + # check that all foreign future handles were released + self.assertEqual(len(futures._UNIFFI_FOREIGN_FUTURE_HANDLE_MAP), 0) + + def test_async_object_param(self): + async def test(): + megaphone = new_megaphone() + t0 = now() + result_alice = await say_after_with_megaphone(megaphone, 200, 'Alice') + t1 = now() + + t_delta = (t1 - t0).total_seconds() + self.assertGreater(t_delta, 0.2) + self.assertEqual(result_alice, 'HELLO, ALICE!') + + asyncio.run(test()) + + def test_with_tokio_runtime(self): + async def test(): + t0 = now() + result_alice = await say_after_with_tokio(200, 'Alice') + t1 = now() + + t_delta = (t1 - t0).total_seconds() + self.assertGreater(t_delta, 0.2) + self.assertEqual(result_alice, 'Hello, Alice (with Tokio)!') + + asyncio.run(test()) + + def test_fallible(self): + async def test(): + result = await fallible_me(False) + self.assertEqual(result, 42) + + try: + result = await fallible_me(True) + self.assertTrue(False) # should never be reached + except MyError as exception: + self.assertTrue(True) + + megaphone = new_megaphone() + + result = await megaphone.fallible_me(False) + self.assertEqual(result, 42) + + try: + result = await megaphone.fallible_me(True) + self.assertTrue(False) # should never be reached + except MyError as exception: + self.assertTrue(True) + + asyncio.run(test()) + + def test_fallible_struct(self): + async def test(): + megaphone = await fallible_struct(False) + self.assertEqual(await megaphone.fallible_me(False), 42) + + try: + await fallible_struct(True) + self.assertTrue(False) # should never be reached + except MyError as exception: + pass + + asyncio.run(test()) + + def test_record(self): + async def test(): + result = await new_my_record("foo", 42) + self.assertEqual(result.__class__, MyRecord) + self.assertEqual(result.a, "foo") + self.assertEqual(result.b, 42) + + asyncio.run(test()) + + def test_cancel(self): + async def test(): + # Create a task + task = asyncio.create_task(say_after(200, 'Alice')) + # Wait to ensure that the polling has started, then cancel the task + await asyncio.sleep(0.1) + task.cancel() + # Wait long enough for the Rust callback to fire. This shouldn't cause an exception, + # even though the task is cancelled. + await asyncio.sleep(0.2) + # Awaiting the task should result in a CancelledError. + with self.assertRaises(asyncio.CancelledError): + await task + + asyncio.run(test()) + + # Test a future that uses a lock and that is cancelled. + def test_shared_resource_cancellation(self): + async def test(): + task = asyncio.create_task(use_shared_resource( + SharedResourceOptions(release_after_ms=5000, timeout_ms=100))) + # Wait some time to ensure the task has locked the shared resource + await asyncio.sleep(0.05) + task.cancel() + await use_shared_resource(SharedResourceOptions(release_after_ms=0, timeout_ms=1000)) + asyncio.run(test()) + + def test_shared_resource_no_cancellation(self): + async def test(): + await use_shared_resource(SharedResourceOptions(release_after_ms=100, timeout_ms=1000)) + await use_shared_resource(SharedResourceOptions(release_after_ms=0, timeout_ms=1000)) + asyncio.run(test()) + + def test_function_annotations(self): + async def test(): + self.assertEqual(typing.get_type_hints(sleep) , {"ms": int, "return": bool}) + self.assertEqual(typing.get_type_hints(sleep_no_return), {"ms": int, "return": type(None)}) + asyncio.run(test()) + +if __name__ == '__main__': + unittest.main() diff --git a/fixtures/async-deadlock/tests/bindings/test_async_deadlock.ts b/fixtures/async-deadlock/tests/bindings/test_async_deadlock.ts new file mode 100644 index 00000000..c6358dcb --- /dev/null +++ b/fixtures/async-deadlock/tests/bindings/test_async_deadlock.ts @@ -0,0 +1,126 @@ +/* + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/ + */ + +import { asyncTest } from "@/asserts"; +import { console } from "@/hermes"; +import { + ClientBuilder, + ClientInterface, + mediaSourceFromUrl, + MediaSourceInterface, +} from "../../generated/matrix_sdk_ffi"; + +import { uniffiRustFutureHandleCount } from "uniffi-bindgen-react-native"; + +const url = "mxc://matrix.1badday.com/RUUIKNjovSSwYbULWuNQarDA"; + +(async () => { + await asyncTest("Test for deadlock 2", async (t) => { + await loadImages(2); + t.end(); + }); + + await asyncTest("Test for deadlock 4", async (t) => { + await loadImages(4); + t.end(); + }); + + await asyncTest("Test for deadlock 8", async (t) => { + await loadImages(8); + t.end(); + }); + + await asyncTest("Test for deadlock 16", async (t) => { + await loadImages(16); + t.end(); + }); + + await asyncTest("Test for deadlock 32", async (t) => { + await loadImages(32); + t.end(); + }); + + await asyncTest("Test for deadlock 64", async (t) => { + await loadImages(64); + t.end(); + }); + + await asyncTest("Test for deadlock 128", async (t) => { + await loadImages(128); + t.end(); + }); + + await asyncTest( + "Test for deadlock 256", + async (t) => { + await loadImages(256); + t.end(); + }, + 30 * 1000, + ); + + await asyncTest( + "Test for deadlock 512", + async (t) => { + await loadImages(512); + t.end(); + }, + 60 * 1000, + ); + + await asyncTest( + "Test for deadlock 512 2", + async (t) => { + await loadImages(512); + t.end(); + }, + 60 * 1000, + ); + + await asyncTest( + "Test for deadlock 1024", + async (t) => { + await loadImages(1024); + t.end(); + }, + 1024 * 1000, + ); +})(); + +async function loadImages(n: number): Promise { + const images = new Array(n).fill(url); + const sourcedImages = images.map((i) => ({ + source: mediaSourceFromUrl(i), + })); + const client = await new ClientBuilder() + .homeserverUrl("https://matrix.1badday.com/") + .build(); + console.log(`Starting to load… ${n}`); + let loaded = 0; + let start = Date.now(); + function elapsed(): number { + const current = Date.now(); + return Math.round((current - start) / 1000); + } + const interval = 1000; + let progress: any | undefined; + function show() { + console.log( + `-- ${elapsed()} sec: Loaded ${loaded}/${n}; currently waiting on ${uniffiRustFutureHandleCount()}`, + ); + progress = setTimeout(show, interval); + } + + const promises = sourcedImages.map((s, i) => { + return client.getMediaContent(s.source).then((content) => { + loaded++; + }); + }); + show(); + await Promise.allSettled(promises); + clearTimeout(progress!); + console.log(`… finished, after: ${elapsed()} sec`); +} diff --git a/fixtures/ext-types/Cargo.toml b/fixtures/ext-types/Cargo.toml index e009ed21..fc677016 100644 --- a/fixtures/ext-types/Cargo.toml +++ b/fixtures/ext-types/Cargo.toml @@ -21,7 +21,7 @@ name = "uniffi_ext_types_lib" [dependencies] anyhow = "1" -bytes = "1.3" +bytes = "^1.7.1" # Add the "scaffolding-ffi-buffer-fns" feature to make sure things can build correctly uniffi = { workspace = true } diff --git a/fixtures/futures/tests/bindings/test_futures.ts b/fixtures/futures/tests/bindings/test_futures.ts index 0b33d1b3..971bfa6a 100644 --- a/fixtures/futures/tests/bindings/test_futures.ts +++ b/fixtures/futures/tests/bindings/test_futures.ts @@ -138,11 +138,7 @@ function checkRemainingFutures(t: Asserts) { await asyncTest("Async methods", async (t) => { const megaphone = newMegaphone(); - let helloAlice = await t.asyncMeasure( - async () => megaphone.sayAfter(500, "Alice"), - 500, - 20, - ); + const helloAlice = await megaphone.sayAfter(500, "Alice"); t.assertEqual("HELLO, ALICE!", helloAlice); checkRemainingFutures(t); t.end(); diff --git a/typescript/src/async-rust-call.ts b/typescript/src/async-rust-call.ts index c7a7b59c..f37db17b 100644 --- a/typescript/src/async-rust-call.ts +++ b/typescript/src/async-rust-call.ts @@ -92,7 +92,7 @@ export async function uniffiRustCallAsync( pollResult = await pollRust((handle) => { pollFunc(rustFuture, uniffiFutureContinuationCallback, handle); }); - } while (pollResult != UNIFFI_RUST_FUTURE_POLL_READY); + } while (pollResult !== UNIFFI_RUST_FUTURE_POLL_READY); // Now it's ready, all we need to do is pick up the result (and error). return liftFunc( @@ -128,6 +128,13 @@ const uniffiFutureContinuationCallback: UniffiRustFutureContinuationCallback = ( pollResult: number, ) => { const resolve = UNIFFI_RUST_FUTURE_RESOLVER_MAP.remove(handle); + // From https://github.com/mozilla/uniffi-rs/pull/1837/files#diff-8a28c9cf1245b4f714d406ea4044d68e1000099928eaca1afb504ccbc008fe9fR35-R37 + // + // > WARNING: the call to [rust_future_poll] must be scheduled to happen soon after the callback is + // > called, but not inside the callback itself. If [rust_future_poll] is called inside the + // > callback, some futures will deadlock and our scheduler code might as well. + // + // We avoid this by using UniffiCallInvoker::invokeNonBlocking for this callback. resolve(pollResult); };