From 6325750d40557cbfa8b1b524ad7bbf41583fa6e1 Mon Sep 17 00:00:00 2001 From: Thomas Goyne Date: Mon, 24 Oct 2022 13:25:56 -0700 Subject: [PATCH] Fix a race condition in error reporting for async open The intended behavior with async open is that sync errors related to the async open are delivered to the completion callback. This didn't work reliably even in the simplest scenario because we activated the sync session before attaching the download completion handler, so a sufficiently fast error could be received too early. This resulted in the async open completion callback getting called with a generic "operation cancelled" error rather than the correct one. To fix this, defer calling `revive_if_needed()` until after we've had the chance to call `wait_for_download_completion()` This requires significantly changing where this call happens. Scenarios where the Realm is being opened on multiple threads at once still aren't guaranteed to report errors correctly. I think that would require a very different approach to how async open works, and probably isn't something people are doing very much in practice. --- CHANGELOG.md | 2 +- src/realm/object-store/audit.mm | 1 + .../object-store/impl/realm_coordinator.cpp | 9 ++-- .../object-store/sync/async_open_task.cpp | 10 ++-- src/realm/object-store/sync/sync_manager.cpp | 36 ++++++------- src/realm/object-store/sync/sync_manager.hpp | 2 +- src/realm/object-store/sync/sync_user.cpp | 50 ++++++++----------- src/realm/object-store/sync/sync_user.hpp | 2 + test/object-store/realm.cpp | 16 +++++- test/object-store/sync/session/session.cpp | 8 +-- .../sync/session/session_util.hpp | 21 ++++++++ test/object-store/util/test_file.cpp | 5 +- test/object-store/util/test_file.hpp | 2 +- 13 files changed, 98 insertions(+), 66 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2eed6335aec..a3964f87577 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,7 @@ ### Fixed * ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?) -* None. +* Fix a race condition which could result in "operation cancelled" errors being delivered to async open callbacks rather than the actual sync error which caused things to fail ([PR #5968](https://github.com/realm/realm-core/pull/5968), since the introduction of async open). ### Breaking changes * None. diff --git a/src/realm/object-store/audit.mm b/src/realm/object-store/audit.mm index 4386504e7a5..b41af72c237 100644 --- a/src/realm/object-store/audit.mm +++ b/src/realm/object-store/audit.mm @@ -807,6 +807,7 @@ bool write_event(Timestamp timestamp, StringData activity, StringData event_type // there's any old ones sitting on disk waiting to be uploaded. scan_for_realms_to_upload(); }); + session->revive_if_needed(); } std::string AuditRealmPool::prefixed_partition(std::string const& partition) diff --git a/src/realm/object-store/impl/realm_coordinator.cpp b/src/realm/object-store/impl/realm_coordinator.cpp index 97dca55def7..436380d092d 100644 --- a/src/realm/object-store/impl/realm_coordinator.cpp +++ b/src/realm/object-store/impl/realm_coordinator.cpp @@ -287,16 +287,20 @@ void RealmCoordinator::do_get_realm(Realm::Config config, std::shared_ptr realm = Realm::make_shared_realm(std::move(config), version, shared_from_this()); m_weak_realm_notifiers.emplace_back(realm, config.cache); - if (realm->config().audit_config) { #ifdef REALM_ENABLE_SYNC + if (m_sync_session && m_sync_session->user()->is_logged_in()) + m_sync_session->revive_if_needed(); + + if (realm->config().audit_config) { if (m_audit_context) m_audit_context->update_metadata(realm->config().audit_config->metadata); else m_audit_context = make_audit_context(m_db, realm->config()); + } #else + if (realm->config().audit_config) REALM_TERMINATE("Cannot use Audit interface if Realm Core is built without Sync"); #endif - } realm_lock.unlock_unchecked(); if (schema) { @@ -408,7 +412,6 @@ void RealmCoordinator::open_db() m_sync_session = m_config.sync_config->user->sync_manager()->get_existing_session(m_config.path); if (m_sync_session) { m_db = SyncSession::Internal::get_db(*m_sync_session); - m_sync_session->revive_if_needed(); init_external_helpers(); return; } diff --git a/src/realm/object-store/sync/async_open_task.cpp b/src/realm/object-store/sync/async_open_task.cpp index a7e645f76f4..b08fdbef8d5 100644 --- a/src/realm/object-store/sync/async_open_task.cpp +++ b/src/realm/object-store/sync/async_open_task.cpp @@ -34,14 +34,14 @@ AsyncOpenTask::AsyncOpenTask(std::shared_ptr<_impl::RealmCoordinator> coordinato void AsyncOpenTask::start(util::UniqueFunction callback) { - util::CheckedLockGuard lock(m_mutex); + util::CheckedUniqueLock lock(m_mutex); if (!m_session) return; - - m_session->revive_if_needed(); + auto session = m_session; + lock.unlock(); std::shared_ptr self(shared_from_this()); - m_session->wait_for_download_completion([callback = std::move(callback), self, this](std::error_code ec) { + session->wait_for_download_completion([callback = std::move(callback), self, this](std::error_code ec) { std::shared_ptr<_impl::RealmCoordinator> coordinator; { util::CheckedLockGuard lock(m_mutex); @@ -68,6 +68,8 @@ void AsyncOpenTask::start(util::UniqueFunctionrevive_if_needed(); } void AsyncOpenTask::cancel() diff --git a/src/realm/object-store/sync/sync_manager.cpp b/src/realm/object-store/sync/sync_manager.cpp index 6420f1ef095..0c69c34dd65 100644 --- a/src/realm/object-store/sync/sync_manager.cpp +++ b/src/realm/object-store/sync/sync_manager.cpp @@ -217,34 +217,36 @@ void SyncManager::reset_for_testing() m_users.clear(); m_current_user = nullptr; } - { - util::CheckedLockGuard lock1(m_mutex); + { + util::CheckedLockGuard lock(m_mutex); // Stop the client. This will abort any uploads that inactive sessions are waiting for. if (m_sync_client) m_sync_client->stop(); + } - { - util::CheckedLockGuard lock2(m_session_mutex); - // Callers of `SyncManager::reset_for_testing` should ensure there are no existing sessions - // prior to calling `reset_for_testing`. - bool no_sessions = !do_has_existing_sessions(); - REALM_ASSERT_RELEASE(no_sessions); - - // Destroy any inactive sessions. - // FIXME: We shouldn't have any inactive sessions at this point! Sessions are expected to - // remain inactive until their final upload completes, at which point they are unregistered - // and destroyed. Our call to `sync::Client::stop` above aborts all uploads, so all sessions - // should have already been destroyed. - m_sessions.clear(); - } + { + util::CheckedLockGuard lock(m_session_mutex); + // Callers of `SyncManager::reset_for_testing` should ensure there are no existing sessions + // prior to calling `reset_for_testing`. + bool no_sessions = !do_has_existing_sessions(); + REALM_ASSERT_RELEASE(no_sessions); + + // Destroy any inactive sessions. + // FIXME: We shouldn't have any inactive sessions at this point! Sessions are expected to + // remain inactive until their final upload completes, at which point they are unregistered + // and destroyed. Our call to `sync::Client::stop` above aborts all uploads, so all sessions + // should have already been destroyed. + m_sessions.clear(); + } + { + util::CheckedLockGuard lock(m_mutex); // Destroy the client now that we have no remaining sessions. m_sync_client = nullptr; // Reset even more state. m_config = {}; - m_sync_route = ""; } diff --git a/src/realm/object-store/sync/sync_manager.hpp b/src/realm/object-store/sync/sync_manager.hpp index fe9fb44c300..abcf8187529 100644 --- a/src/realm/object-store/sync/sync_manager.hpp +++ b/src/realm/object-store/sync/sync_manager.hpp @@ -281,7 +281,7 @@ class SyncManager : public std::enable_shared_from_this { std::vector> m_users GUARDED_BY(m_user_mutex); std::shared_ptr m_current_user GUARDED_BY(m_user_mutex); - mutable std::unique_ptr<_impl::SyncClient> m_sync_client; + mutable std::unique_ptr<_impl::SyncClient> m_sync_client GUARDED_BY(m_mutex); SyncClientConfig m_config GUARDED_BY(m_mutex); diff --git a/src/realm/object-store/sync/sync_user.cpp b/src/realm/object-store/sync/sync_user.cpp index fedcfe98de7..bc570c0808c 100644 --- a/src/realm/object-store/sync/sync_user.cpp +++ b/src/realm/object-store/sync/sync_user.cpp @@ -186,14 +186,7 @@ void SyncUser::update_state_and_tokens(SyncUser::State state, const std::string& // Call set_state() rather than update_state_and_tokens to remove a user. REALM_UNREACHABLE(); case State::LoggedIn: - sessions_to_revive.reserve(m_waiting_sessions.size()); - for (auto& pair : m_waiting_sessions) { - if (auto ptr = pair.second.lock()) { - m_sessions[pair.first] = ptr; - sessions_to_revive.emplace_back(std::move(ptr)); - } - } - m_waiting_sessions.clear(); + sessions_to_revive = revive_sessions(); break; case State::LoggedOut: { REALM_ASSERT(m_access_token == RealmJWT{}); @@ -217,6 +210,20 @@ void SyncUser::update_state_and_tokens(SyncUser::State state, const std::string& emit_change_to_subscribers(*this); } +std::vector> SyncUser::revive_sessions() +{ + std::vector> sessions_to_revive; + sessions_to_revive.reserve(m_waiting_sessions.size()); + for (auto& [path, weak_session] : m_waiting_sessions) { + if (auto ptr = weak_session.lock()) { + m_sessions[path] = ptr; + sessions_to_revive.emplace_back(std::move(ptr)); + } + } + m_waiting_sessions.clear(); + return sessions_to_revive; +} + void SyncUser::update_refresh_token(std::string&& token) { std::vector> sessions_to_revive; @@ -230,16 +237,9 @@ void SyncUser::update_refresh_token(std::string&& token) m_refresh_token = RealmJWT(std::move(token)); break; case State::LoggedOut: { - sessions_to_revive.reserve(m_waiting_sessions.size()); m_refresh_token = RealmJWT(std::move(token)); m_state = State::LoggedIn; - for (auto& pair : m_waiting_sessions) { - if (auto ptr = pair.second.lock()) { - m_sessions[pair.first] = ptr; - sessions_to_revive.emplace_back(std::move(ptr)); - } - } - m_waiting_sessions.clear(); + sessions_to_revive = revive_sessions(); break; } } @@ -272,16 +272,9 @@ void SyncUser::update_access_token(std::string&& token) m_access_token = RealmJWT(std::move(token)); break; case State::LoggedOut: { - sessions_to_revive.reserve(m_waiting_sessions.size()); m_access_token = RealmJWT(std::move(token)); m_state = State::LoggedIn; - for (auto& pair : m_waiting_sessions) { - if (auto ptr = pair.second.lock()) { - m_sessions[pair.first] = ptr; - sessions_to_revive.emplace_back(std::move(ptr)); - } - } - m_waiting_sessions.clear(); + sessions_to_revive = revive_sessions(); break; } } @@ -357,10 +350,10 @@ void SyncUser::log_out() sync_manager_shared = m_sync_manager->shared_from_this(); // Move all active sessions into the waiting sessions pool. If the user is // logged back in, they will automatically be reactivated. - for (auto& pair : m_sessions) { - if (auto ptr = pair.second.lock()) { + for (auto& [path, weak_session] : m_sessions) { + if (auto ptr = weak_session.lock()) { ptr->log_out(); - m_waiting_sessions[pair.first] = ptr; + m_waiting_sessions[path] = std::move(ptr); } } m_sessions.clear(); @@ -459,10 +452,7 @@ void SyncUser::register_session(std::shared_ptr session) util::CheckedUniqueLock lock(m_mutex); switch (m_state) { case State::LoggedIn: - // Immediately ask the session to come online. m_sessions[path] = session; - lock.unlock(); - session->revive_if_needed(); break; case State::LoggedOut: m_waiting_sessions[path] = session; diff --git a/src/realm/object-store/sync/sync_user.hpp b/src/realm/object-store/sync/sync_user.hpp index dff03907e6c..3eaff9ce4f7 100644 --- a/src/realm/object-store/sync/sync_user.hpp +++ b/src/realm/object-store/sync/sync_user.hpp @@ -322,6 +322,8 @@ class SyncUser : public std::enable_shared_from_this, public Subscriba bool do_is_logged_in() const REQUIRES(m_tokens_mutex); + std::vector> revive_sessions() REQUIRES(m_mutex); + std::atomic m_state GUARDED_BY(m_mutex); util::AtomicSharedPtr m_binding_context; diff --git a/test/object-store/realm.cpp b/test/object-store/realm.cpp index 1b3a9ac2f28..1238f1af95b 100644 --- a/test/object-store/realm.cpp +++ b/test/object-store/realm.cpp @@ -847,7 +847,19 @@ TEST_CASE("Get Realm using Async Open", "[asyncOpen]") { } SECTION("cancels download and reports an error on auth error") { - SyncTestFile config(init_sync_manager.app(), "realm"); + struct Transport : realm::app::GenericNetworkTransport { + void send_request_to_server( + const realm::app::Request&, + realm::util::UniqueFunction&& completion) override + { + completion(app::Response{403}); + } + }; + TestSyncManager::Config tsm_config; + tsm_config.transport = std::make_shared(); + TestSyncManager tsm(tsm_config); + + SyncTestFile config(tsm.app(), "realm"); config.sync_config->user->update_refresh_token(std::string(invalid_token)); config.sync_config->user->update_access_token(std::move(invalid_token)); @@ -860,10 +872,10 @@ TEST_CASE("Get Realm using Async Open", "[asyncOpen]") { task->start([&](auto ref, auto error) { std::lock_guard lock(mutex); REQUIRE(error); + REQUIRE_THROWS_WITH(std::rethrow_exception(error), "Client Error: 403"); REQUIRE(!ref); called = true; }); - init_sync_manager.network_callback(app::Response{403}); util::EventLoop::main().run_until([&] { return called.load(); }); diff --git a/test/object-store/sync/session/session.cpp b/test/object-store/sync/session/session.cpp index 96ca3acb73c..e957f56f42a 100644 --- a/test/object-store/sync/session/session.cpp +++ b/test/object-store/sync/session/session.cpp @@ -108,8 +108,8 @@ TEST_CASE("SyncSession: management by SyncUser", "[sync]") { auto session2 = sync_session(user, "/test1c-2", [](auto, auto) {}); // Run the runloop many iterations to see if the sessions spuriously bind. spin_runloop(); - REQUIRE(sessions_are_inactive(*session1)); - REQUIRE(sessions_are_inactive(*session2)); + REQUIRE(session1->state() == SyncSession::State::Inactive); + REQUIRE(session2->state() == SyncSession::State::Inactive); REQUIRE(user->all_sessions().size() == 0); // Log the user back in via the sync manager. user = app->sync_manager()->get_user(user_id, ENCODE_FAKE_JWT("fake_refresh_token"), @@ -137,8 +137,8 @@ TEST_CASE("SyncSession: management by SyncUser", "[sync]") { REQUIRE(user->state() == SyncUser::State::LoggedOut); // Run the runloop many iterations to see if the sessions spuriously rebind. spin_runloop(); - REQUIRE(sessions_are_inactive(*session1)); - REQUIRE(sessions_are_inactive(*session2)); + REQUIRE(session1->state() == SyncSession::State::Inactive); + REQUIRE(session2->state() == SyncSession::State::Inactive); REQUIRE(user->all_sessions().size() == 0); // Log the user back in via the sync manager. user = app->sync_manager()->get_user(user_id, ENCODE_FAKE_JWT("fake_refresh_token"), diff --git a/test/object-store/sync/session/session_util.hpp b/test/object-store/sync/session/session_util.hpp index f055efce06c..0b910335792 100644 --- a/test/object-store/sync/session/session_util.hpp +++ b/test/object-store/sync/session/session_util.hpp @@ -29,6 +29,27 @@ using namespace realm; using namespace realm::util; +namespace Catch { +template <> +struct StringMaker { + static std::string convert(SyncSession::State state) + { + switch (state) { + case SyncSession::State::Active: + return "Active"; + case SyncSession::State::Dying: + return "Dying"; + case SyncSession::State::Inactive: + return "Inactive"; + case SyncSession::State::WaitingForAccessToken: + return "WaitingForAccessToken"; + default: + return "Unknown"; + } + } +}; +} // namespace Catch + inline bool sessions_are_active(const SyncSession& session) { return session.state() == SyncSession::State::Active; diff --git a/test/object-store/util/test_file.cpp b/test/object-store/util/test_file.cpp index 7a76de899c7..2f14b671110 100644 --- a/test/object-store/util/test_file.cpp +++ b/test/object-store/util/test_file.cpp @@ -333,11 +333,10 @@ TestAppSession::~TestAppSession() // MARK: - TestSyncManager TestSyncManager::TestSyncManager(const Config& config, const SyncServer::Config& sync_server_config) - : m_sync_server(sync_server_config) + : transport(config.transport ? config.transport : std::make_shared(network_callback)) + , m_sync_server(sync_server_config) , m_should_teardown_test_directory(config.should_teardown_test_directory) { - if (config.transport) - transport = config.transport; app::App::Config app_config = config.app_config; set_app_config_defaults(app_config, transport); diff --git a/test/object-store/util/test_file.hpp b/test/object-store/util/test_file.hpp index a6366d373dc..b6750134d74 100644 --- a/test/object-store/util/test_file.hpp +++ b/test/object-store/util/test_file.hpp @@ -266,7 +266,7 @@ class TestSyncManager { realm::util::UniqueFunction& network_callback; }; - std::shared_ptr transport = std::make_shared(network_callback); + const std::shared_ptr transport; private: std::shared_ptr m_app;