From 38fc5241695d03e7a63ad0b82045eac7ae5e6b5e Mon Sep 17 00:00:00 2001 From: Thomas Goyne Date: Mon, 24 Oct 2022 13:25:56 -0700 Subject: [PATCH] Fix a race condition in error reporting for async open The intended behavior with async open is that sync errors related to the async open are delivered to the completion callback. This didn't work reliably even in the simplest scenario because we activated the sync session before attaching the download completion handler, so a sufficiently fast error could be received too early. This resulted in the async open completion callback getting called with a generic "operation cancelled" error rather than the correct one. To fix this, defer calling `revive_if_needed()` until after we've had the chance to call `wait_for_download_completion()` This requires significantly changing where this call happens. Scenarios where the Realm is being opened on multiple threads at once still aren't guaranteed to report errors correctly. I think that would require a very different approach to how async open works, and probably isn't something people are doing very much in practice. --- CHANGELOG.md | 1 + src/realm/object-store/audit.mm | 1 + .../object-store/impl/realm_coordinator.cpp | 9 ++-- .../object-store/sync/async_open_task.cpp | 10 ++-- src/realm/object-store/sync/sync_manager.cpp | 6 --- src/realm/object-store/sync/sync_user.cpp | 50 ++++++++----------- src/realm/object-store/sync/sync_user.hpp | 2 + test/object-store/realm.cpp | 16 +++++- test/object-store/sync/session/session.cpp | 8 +-- .../sync/session/session_util.hpp | 21 ++++++++ test/object-store/util/test_file.cpp | 5 +- test/object-store/util/test_file.hpp | 2 +- 12 files changed, 78 insertions(+), 53 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8126486beae..af3aa42a922 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ * ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?) * Calling `SectionedResults::reset_section_callback()` on a `SectionedResults` which had been evaluated would result in an assertion failure the next time the sections are evaluated ([PR #5965](https://github.com/realm/realm-core/pull/5965), since v12.10.0). * Opening an unencrypted file with an encryption key would sometimes report a misleading error message that indicated that the problem was something other than a decryption failure ([PR #5915](https://github.com/realm/realm-core/pull/5915), since 0.86.1). +* Fix a race condition which could result in "operation cancelled" errors being delivered to async open callbacks rather than the actual sync error which caused things to fail ([PR #5968](https://github.com/realm/realm-core/pull/5968), since the introduction of async open). ### Breaking changes * Websocket errors caused by the client sending a websocket message that is too large (i.e. greater than 16MB) now get reported as a `ProtocolError::limits_exceeded` error with a `ClientReset` requested by the server ([#5209](https://github.com/realm/realm-core/issues/5209)). diff --git a/src/realm/object-store/audit.mm b/src/realm/object-store/audit.mm index 4386504e7a5..b41af72c237 100644 --- a/src/realm/object-store/audit.mm +++ b/src/realm/object-store/audit.mm @@ -807,6 +807,7 @@ bool write_event(Timestamp timestamp, StringData activity, StringData event_type // there's any old ones sitting on disk waiting to be uploaded. scan_for_realms_to_upload(); }); + session->revive_if_needed(); } std::string AuditRealmPool::prefixed_partition(std::string const& partition) diff --git a/src/realm/object-store/impl/realm_coordinator.cpp b/src/realm/object-store/impl/realm_coordinator.cpp index 97dca55def7..436380d092d 100644 --- a/src/realm/object-store/impl/realm_coordinator.cpp +++ b/src/realm/object-store/impl/realm_coordinator.cpp @@ -287,16 +287,20 @@ void RealmCoordinator::do_get_realm(Realm::Config config, std::shared_ptr realm = Realm::make_shared_realm(std::move(config), version, shared_from_this()); m_weak_realm_notifiers.emplace_back(realm, config.cache); - if (realm->config().audit_config) { #ifdef REALM_ENABLE_SYNC + if (m_sync_session && m_sync_session->user()->is_logged_in()) + m_sync_session->revive_if_needed(); + + if (realm->config().audit_config) { if (m_audit_context) m_audit_context->update_metadata(realm->config().audit_config->metadata); else m_audit_context = make_audit_context(m_db, realm->config()); + } #else + if (realm->config().audit_config) REALM_TERMINATE("Cannot use Audit interface if Realm Core is built without Sync"); #endif - } realm_lock.unlock_unchecked(); if (schema) { @@ -408,7 +412,6 @@ void RealmCoordinator::open_db() m_sync_session = m_config.sync_config->user->sync_manager()->get_existing_session(m_config.path); if (m_sync_session) { m_db = SyncSession::Internal::get_db(*m_sync_session); - m_sync_session->revive_if_needed(); init_external_helpers(); return; } diff --git a/src/realm/object-store/sync/async_open_task.cpp b/src/realm/object-store/sync/async_open_task.cpp index a7e645f76f4..b08fdbef8d5 100644 --- a/src/realm/object-store/sync/async_open_task.cpp +++ b/src/realm/object-store/sync/async_open_task.cpp @@ -34,14 +34,14 @@ AsyncOpenTask::AsyncOpenTask(std::shared_ptr<_impl::RealmCoordinator> coordinato void AsyncOpenTask::start(util::UniqueFunction callback) { - util::CheckedLockGuard lock(m_mutex); + util::CheckedUniqueLock lock(m_mutex); if (!m_session) return; - - m_session->revive_if_needed(); + auto session = m_session; + lock.unlock(); std::shared_ptr self(shared_from_this()); - m_session->wait_for_download_completion([callback = std::move(callback), self, this](std::error_code ec) { + session->wait_for_download_completion([callback = std::move(callback), self, this](std::error_code ec) { std::shared_ptr<_impl::RealmCoordinator> coordinator; { util::CheckedLockGuard lock(m_mutex); @@ -68,6 +68,8 @@ void AsyncOpenTask::start(util::UniqueFunctionrevive_if_needed(); } void AsyncOpenTask::cancel() diff --git a/src/realm/object-store/sync/sync_manager.cpp b/src/realm/object-store/sync/sync_manager.cpp index 6420f1ef095..285b9b81cf0 100644 --- a/src/realm/object-store/sync/sync_manager.cpp +++ b/src/realm/object-store/sync/sync_manager.cpp @@ -651,13 +651,7 @@ std::shared_ptr SyncManager::get_session(std::shared_ptr db, co // Create the external reference immediately to ensure that the session will become // inactive if an exception is thrown in the following code. auto external_reference = shared_session->external_reference(); - // unlocking m_session_mutex here prevents a deadlock for synchronous network - // transports such as the unit test suite, in the case where the log in request is - // denied by the server: Active -> WaitingForAccessToken -> handle_refresh(401 - // error) -> user.log_out() -> unregister_session (locks m_session_mutex again) - lock.unlock(); config.sync_config->user->register_session(std::move(shared_session)); - return external_reference; } diff --git a/src/realm/object-store/sync/sync_user.cpp b/src/realm/object-store/sync/sync_user.cpp index fedcfe98de7..bc570c0808c 100644 --- a/src/realm/object-store/sync/sync_user.cpp +++ b/src/realm/object-store/sync/sync_user.cpp @@ -186,14 +186,7 @@ void SyncUser::update_state_and_tokens(SyncUser::State state, const std::string& // Call set_state() rather than update_state_and_tokens to remove a user. REALM_UNREACHABLE(); case State::LoggedIn: - sessions_to_revive.reserve(m_waiting_sessions.size()); - for (auto& pair : m_waiting_sessions) { - if (auto ptr = pair.second.lock()) { - m_sessions[pair.first] = ptr; - sessions_to_revive.emplace_back(std::move(ptr)); - } - } - m_waiting_sessions.clear(); + sessions_to_revive = revive_sessions(); break; case State::LoggedOut: { REALM_ASSERT(m_access_token == RealmJWT{}); @@ -217,6 +210,20 @@ void SyncUser::update_state_and_tokens(SyncUser::State state, const std::string& emit_change_to_subscribers(*this); } +std::vector> SyncUser::revive_sessions() +{ + std::vector> sessions_to_revive; + sessions_to_revive.reserve(m_waiting_sessions.size()); + for (auto& [path, weak_session] : m_waiting_sessions) { + if (auto ptr = weak_session.lock()) { + m_sessions[path] = ptr; + sessions_to_revive.emplace_back(std::move(ptr)); + } + } + m_waiting_sessions.clear(); + return sessions_to_revive; +} + void SyncUser::update_refresh_token(std::string&& token) { std::vector> sessions_to_revive; @@ -230,16 +237,9 @@ void SyncUser::update_refresh_token(std::string&& token) m_refresh_token = RealmJWT(std::move(token)); break; case State::LoggedOut: { - sessions_to_revive.reserve(m_waiting_sessions.size()); m_refresh_token = RealmJWT(std::move(token)); m_state = State::LoggedIn; - for (auto& pair : m_waiting_sessions) { - if (auto ptr = pair.second.lock()) { - m_sessions[pair.first] = ptr; - sessions_to_revive.emplace_back(std::move(ptr)); - } - } - m_waiting_sessions.clear(); + sessions_to_revive = revive_sessions(); break; } } @@ -272,16 +272,9 @@ void SyncUser::update_access_token(std::string&& token) m_access_token = RealmJWT(std::move(token)); break; case State::LoggedOut: { - sessions_to_revive.reserve(m_waiting_sessions.size()); m_access_token = RealmJWT(std::move(token)); m_state = State::LoggedIn; - for (auto& pair : m_waiting_sessions) { - if (auto ptr = pair.second.lock()) { - m_sessions[pair.first] = ptr; - sessions_to_revive.emplace_back(std::move(ptr)); - } - } - m_waiting_sessions.clear(); + sessions_to_revive = revive_sessions(); break; } } @@ -357,10 +350,10 @@ void SyncUser::log_out() sync_manager_shared = m_sync_manager->shared_from_this(); // Move all active sessions into the waiting sessions pool. If the user is // logged back in, they will automatically be reactivated. - for (auto& pair : m_sessions) { - if (auto ptr = pair.second.lock()) { + for (auto& [path, weak_session] : m_sessions) { + if (auto ptr = weak_session.lock()) { ptr->log_out(); - m_waiting_sessions[pair.first] = ptr; + m_waiting_sessions[path] = std::move(ptr); } } m_sessions.clear(); @@ -459,10 +452,7 @@ void SyncUser::register_session(std::shared_ptr session) util::CheckedUniqueLock lock(m_mutex); switch (m_state) { case State::LoggedIn: - // Immediately ask the session to come online. m_sessions[path] = session; - lock.unlock(); - session->revive_if_needed(); break; case State::LoggedOut: m_waiting_sessions[path] = session; diff --git a/src/realm/object-store/sync/sync_user.hpp b/src/realm/object-store/sync/sync_user.hpp index dff03907e6c..3eaff9ce4f7 100644 --- a/src/realm/object-store/sync/sync_user.hpp +++ b/src/realm/object-store/sync/sync_user.hpp @@ -322,6 +322,8 @@ class SyncUser : public std::enable_shared_from_this, public Subscriba bool do_is_logged_in() const REQUIRES(m_tokens_mutex); + std::vector> revive_sessions() REQUIRES(m_mutex); + std::atomic m_state GUARDED_BY(m_mutex); util::AtomicSharedPtr m_binding_context; diff --git a/test/object-store/realm.cpp b/test/object-store/realm.cpp index 1b3a9ac2f28..1238f1af95b 100644 --- a/test/object-store/realm.cpp +++ b/test/object-store/realm.cpp @@ -847,7 +847,19 @@ TEST_CASE("Get Realm using Async Open", "[asyncOpen]") { } SECTION("cancels download and reports an error on auth error") { - SyncTestFile config(init_sync_manager.app(), "realm"); + struct Transport : realm::app::GenericNetworkTransport { + void send_request_to_server( + const realm::app::Request&, + realm::util::UniqueFunction&& completion) override + { + completion(app::Response{403}); + } + }; + TestSyncManager::Config tsm_config; + tsm_config.transport = std::make_shared(); + TestSyncManager tsm(tsm_config); + + SyncTestFile config(tsm.app(), "realm"); config.sync_config->user->update_refresh_token(std::string(invalid_token)); config.sync_config->user->update_access_token(std::move(invalid_token)); @@ -860,10 +872,10 @@ TEST_CASE("Get Realm using Async Open", "[asyncOpen]") { task->start([&](auto ref, auto error) { std::lock_guard lock(mutex); REQUIRE(error); + REQUIRE_THROWS_WITH(std::rethrow_exception(error), "Client Error: 403"); REQUIRE(!ref); called = true; }); - init_sync_manager.network_callback(app::Response{403}); util::EventLoop::main().run_until([&] { return called.load(); }); diff --git a/test/object-store/sync/session/session.cpp b/test/object-store/sync/session/session.cpp index 96ca3acb73c..e957f56f42a 100644 --- a/test/object-store/sync/session/session.cpp +++ b/test/object-store/sync/session/session.cpp @@ -108,8 +108,8 @@ TEST_CASE("SyncSession: management by SyncUser", "[sync]") { auto session2 = sync_session(user, "/test1c-2", [](auto, auto) {}); // Run the runloop many iterations to see if the sessions spuriously bind. spin_runloop(); - REQUIRE(sessions_are_inactive(*session1)); - REQUIRE(sessions_are_inactive(*session2)); + REQUIRE(session1->state() == SyncSession::State::Inactive); + REQUIRE(session2->state() == SyncSession::State::Inactive); REQUIRE(user->all_sessions().size() == 0); // Log the user back in via the sync manager. user = app->sync_manager()->get_user(user_id, ENCODE_FAKE_JWT("fake_refresh_token"), @@ -137,8 +137,8 @@ TEST_CASE("SyncSession: management by SyncUser", "[sync]") { REQUIRE(user->state() == SyncUser::State::LoggedOut); // Run the runloop many iterations to see if the sessions spuriously rebind. spin_runloop(); - REQUIRE(sessions_are_inactive(*session1)); - REQUIRE(sessions_are_inactive(*session2)); + REQUIRE(session1->state() == SyncSession::State::Inactive); + REQUIRE(session2->state() == SyncSession::State::Inactive); REQUIRE(user->all_sessions().size() == 0); // Log the user back in via the sync manager. user = app->sync_manager()->get_user(user_id, ENCODE_FAKE_JWT("fake_refresh_token"), diff --git a/test/object-store/sync/session/session_util.hpp b/test/object-store/sync/session/session_util.hpp index f055efce06c..0b910335792 100644 --- a/test/object-store/sync/session/session_util.hpp +++ b/test/object-store/sync/session/session_util.hpp @@ -29,6 +29,27 @@ using namespace realm; using namespace realm::util; +namespace Catch { +template <> +struct StringMaker { + static std::string convert(SyncSession::State state) + { + switch (state) { + case SyncSession::State::Active: + return "Active"; + case SyncSession::State::Dying: + return "Dying"; + case SyncSession::State::Inactive: + return "Inactive"; + case SyncSession::State::WaitingForAccessToken: + return "WaitingForAccessToken"; + default: + return "Unknown"; + } + } +}; +} // namespace Catch + inline bool sessions_are_active(const SyncSession& session) { return session.state() == SyncSession::State::Active; diff --git a/test/object-store/util/test_file.cpp b/test/object-store/util/test_file.cpp index 7a76de899c7..2f14b671110 100644 --- a/test/object-store/util/test_file.cpp +++ b/test/object-store/util/test_file.cpp @@ -333,11 +333,10 @@ TestAppSession::~TestAppSession() // MARK: - TestSyncManager TestSyncManager::TestSyncManager(const Config& config, const SyncServer::Config& sync_server_config) - : m_sync_server(sync_server_config) + : transport(config.transport ? config.transport : std::make_shared(network_callback)) + , m_sync_server(sync_server_config) , m_should_teardown_test_directory(config.should_teardown_test_directory) { - if (config.transport) - transport = config.transport; app::App::Config app_config = config.app_config; set_app_config_defaults(app_config, transport); diff --git a/test/object-store/util/test_file.hpp b/test/object-store/util/test_file.hpp index a6366d373dc..b6750134d74 100644 --- a/test/object-store/util/test_file.hpp +++ b/test/object-store/util/test_file.hpp @@ -266,7 +266,7 @@ class TestSyncManager { realm::util::UniqueFunction& network_callback; }; - std::shared_ptr transport = std::make_shared(network_callback); + const std::shared_ptr transport; private: std::shared_ptr m_app;