diff --git a/CHANGELOG.md b/CHANGELOG.md index 2eed6335aec..a3964f87577 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,7 @@ ### Fixed * ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?) -* None. +* Fix a race condition which could result in "operation cancelled" errors being delivered to async open callbacks rather than the actual sync error which caused things to fail ([PR #5968](https://github.com/realm/realm-core/pull/5968), since the introduction of async open). ### Breaking changes * None. diff --git a/src/realm/object-store/audit.mm b/src/realm/object-store/audit.mm index 4386504e7a5..b41af72c237 100644 --- a/src/realm/object-store/audit.mm +++ b/src/realm/object-store/audit.mm @@ -807,6 +807,7 @@ bool write_event(Timestamp timestamp, StringData activity, StringData event_type // there's any old ones sitting on disk waiting to be uploaded. scan_for_realms_to_upload(); }); + session->revive_if_needed(); } std::string AuditRealmPool::prefixed_partition(std::string const& partition) diff --git a/src/realm/object-store/impl/realm_coordinator.cpp b/src/realm/object-store/impl/realm_coordinator.cpp index 97dca55def7..436380d092d 100644 --- a/src/realm/object-store/impl/realm_coordinator.cpp +++ b/src/realm/object-store/impl/realm_coordinator.cpp @@ -287,16 +287,20 @@ void RealmCoordinator::do_get_realm(Realm::Config config, std::shared_ptr realm = Realm::make_shared_realm(std::move(config), version, shared_from_this()); m_weak_realm_notifiers.emplace_back(realm, config.cache); - if (realm->config().audit_config) { #ifdef REALM_ENABLE_SYNC + if (m_sync_session && m_sync_session->user()->is_logged_in()) + m_sync_session->revive_if_needed(); + + if (realm->config().audit_config) { if (m_audit_context) m_audit_context->update_metadata(realm->config().audit_config->metadata); else m_audit_context = make_audit_context(m_db, realm->config()); + } #else + if (realm->config().audit_config) REALM_TERMINATE("Cannot use Audit interface if Realm Core is built without Sync"); #endif - } realm_lock.unlock_unchecked(); if (schema) { @@ -408,7 +412,6 @@ void RealmCoordinator::open_db() m_sync_session = m_config.sync_config->user->sync_manager()->get_existing_session(m_config.path); if (m_sync_session) { m_db = SyncSession::Internal::get_db(*m_sync_session); - m_sync_session->revive_if_needed(); init_external_helpers(); return; } diff --git a/src/realm/object-store/sync/async_open_task.cpp b/src/realm/object-store/sync/async_open_task.cpp index a7e645f76f4..b08fdbef8d5 100644 --- a/src/realm/object-store/sync/async_open_task.cpp +++ b/src/realm/object-store/sync/async_open_task.cpp @@ -34,14 +34,14 @@ AsyncOpenTask::AsyncOpenTask(std::shared_ptr<_impl::RealmCoordinator> coordinato void AsyncOpenTask::start(util::UniqueFunction callback) { - util::CheckedLockGuard lock(m_mutex); + util::CheckedUniqueLock lock(m_mutex); if (!m_session) return; - - m_session->revive_if_needed(); + auto session = m_session; + lock.unlock(); std::shared_ptr self(shared_from_this()); - m_session->wait_for_download_completion([callback = std::move(callback), self, this](std::error_code ec) { + session->wait_for_download_completion([callback = std::move(callback), self, this](std::error_code ec) { std::shared_ptr<_impl::RealmCoordinator> coordinator; { util::CheckedLockGuard lock(m_mutex); @@ -68,6 +68,8 @@ void AsyncOpenTask::start(util::UniqueFunctionrevive_if_needed(); } void AsyncOpenTask::cancel() diff --git a/src/realm/object-store/sync/sync_manager.cpp b/src/realm/object-store/sync/sync_manager.cpp index 6420f1ef095..0c69c34dd65 100644 --- a/src/realm/object-store/sync/sync_manager.cpp +++ b/src/realm/object-store/sync/sync_manager.cpp @@ -217,34 +217,36 @@ void SyncManager::reset_for_testing() m_users.clear(); m_current_user = nullptr; } - { - util::CheckedLockGuard lock1(m_mutex); + { + util::CheckedLockGuard lock(m_mutex); // Stop the client. This will abort any uploads that inactive sessions are waiting for. if (m_sync_client) m_sync_client->stop(); + } - { - util::CheckedLockGuard lock2(m_session_mutex); - // Callers of `SyncManager::reset_for_testing` should ensure there are no existing sessions - // prior to calling `reset_for_testing`. - bool no_sessions = !do_has_existing_sessions(); - REALM_ASSERT_RELEASE(no_sessions); - - // Destroy any inactive sessions. - // FIXME: We shouldn't have any inactive sessions at this point! Sessions are expected to - // remain inactive until their final upload completes, at which point they are unregistered - // and destroyed. Our call to `sync::Client::stop` above aborts all uploads, so all sessions - // should have already been destroyed. - m_sessions.clear(); - } + { + util::CheckedLockGuard lock(m_session_mutex); + // Callers of `SyncManager::reset_for_testing` should ensure there are no existing sessions + // prior to calling `reset_for_testing`. + bool no_sessions = !do_has_existing_sessions(); + REALM_ASSERT_RELEASE(no_sessions); + + // Destroy any inactive sessions. + // FIXME: We shouldn't have any inactive sessions at this point! Sessions are expected to + // remain inactive until their final upload completes, at which point they are unregistered + // and destroyed. Our call to `sync::Client::stop` above aborts all uploads, so all sessions + // should have already been destroyed. + m_sessions.clear(); + } + { + util::CheckedLockGuard lock(m_mutex); // Destroy the client now that we have no remaining sessions. m_sync_client = nullptr; // Reset even more state. m_config = {}; - m_sync_route = ""; } diff --git a/src/realm/object-store/sync/sync_manager.hpp b/src/realm/object-store/sync/sync_manager.hpp index fe9fb44c300..abcf8187529 100644 --- a/src/realm/object-store/sync/sync_manager.hpp +++ b/src/realm/object-store/sync/sync_manager.hpp @@ -281,7 +281,7 @@ class SyncManager : public std::enable_shared_from_this { std::vector> m_users GUARDED_BY(m_user_mutex); std::shared_ptr m_current_user GUARDED_BY(m_user_mutex); - mutable std::unique_ptr<_impl::SyncClient> m_sync_client; + mutable std::unique_ptr<_impl::SyncClient> m_sync_client GUARDED_BY(m_mutex); SyncClientConfig m_config GUARDED_BY(m_mutex); diff --git a/src/realm/object-store/sync/sync_user.cpp b/src/realm/object-store/sync/sync_user.cpp index fedcfe98de7..bc570c0808c 100644 --- a/src/realm/object-store/sync/sync_user.cpp +++ b/src/realm/object-store/sync/sync_user.cpp @@ -186,14 +186,7 @@ void SyncUser::update_state_and_tokens(SyncUser::State state, const std::string& // Call set_state() rather than update_state_and_tokens to remove a user. REALM_UNREACHABLE(); case State::LoggedIn: - sessions_to_revive.reserve(m_waiting_sessions.size()); - for (auto& pair : m_waiting_sessions) { - if (auto ptr = pair.second.lock()) { - m_sessions[pair.first] = ptr; - sessions_to_revive.emplace_back(std::move(ptr)); - } - } - m_waiting_sessions.clear(); + sessions_to_revive = revive_sessions(); break; case State::LoggedOut: { REALM_ASSERT(m_access_token == RealmJWT{}); @@ -217,6 +210,20 @@ void SyncUser::update_state_and_tokens(SyncUser::State state, const std::string& emit_change_to_subscribers(*this); } +std::vector> SyncUser::revive_sessions() +{ + std::vector> sessions_to_revive; + sessions_to_revive.reserve(m_waiting_sessions.size()); + for (auto& [path, weak_session] : m_waiting_sessions) { + if (auto ptr = weak_session.lock()) { + m_sessions[path] = ptr; + sessions_to_revive.emplace_back(std::move(ptr)); + } + } + m_waiting_sessions.clear(); + return sessions_to_revive; +} + void SyncUser::update_refresh_token(std::string&& token) { std::vector> sessions_to_revive; @@ -230,16 +237,9 @@ void SyncUser::update_refresh_token(std::string&& token) m_refresh_token = RealmJWT(std::move(token)); break; case State::LoggedOut: { - sessions_to_revive.reserve(m_waiting_sessions.size()); m_refresh_token = RealmJWT(std::move(token)); m_state = State::LoggedIn; - for (auto& pair : m_waiting_sessions) { - if (auto ptr = pair.second.lock()) { - m_sessions[pair.first] = ptr; - sessions_to_revive.emplace_back(std::move(ptr)); - } - } - m_waiting_sessions.clear(); + sessions_to_revive = revive_sessions(); break; } } @@ -272,16 +272,9 @@ void SyncUser::update_access_token(std::string&& token) m_access_token = RealmJWT(std::move(token)); break; case State::LoggedOut: { - sessions_to_revive.reserve(m_waiting_sessions.size()); m_access_token = RealmJWT(std::move(token)); m_state = State::LoggedIn; - for (auto& pair : m_waiting_sessions) { - if (auto ptr = pair.second.lock()) { - m_sessions[pair.first] = ptr; - sessions_to_revive.emplace_back(std::move(ptr)); - } - } - m_waiting_sessions.clear(); + sessions_to_revive = revive_sessions(); break; } } @@ -357,10 +350,10 @@ void SyncUser::log_out() sync_manager_shared = m_sync_manager->shared_from_this(); // Move all active sessions into the waiting sessions pool. If the user is // logged back in, they will automatically be reactivated. - for (auto& pair : m_sessions) { - if (auto ptr = pair.second.lock()) { + for (auto& [path, weak_session] : m_sessions) { + if (auto ptr = weak_session.lock()) { ptr->log_out(); - m_waiting_sessions[pair.first] = ptr; + m_waiting_sessions[path] = std::move(ptr); } } m_sessions.clear(); @@ -459,10 +452,7 @@ void SyncUser::register_session(std::shared_ptr session) util::CheckedUniqueLock lock(m_mutex); switch (m_state) { case State::LoggedIn: - // Immediately ask the session to come online. m_sessions[path] = session; - lock.unlock(); - session->revive_if_needed(); break; case State::LoggedOut: m_waiting_sessions[path] = session; diff --git a/src/realm/object-store/sync/sync_user.hpp b/src/realm/object-store/sync/sync_user.hpp index dff03907e6c..3eaff9ce4f7 100644 --- a/src/realm/object-store/sync/sync_user.hpp +++ b/src/realm/object-store/sync/sync_user.hpp @@ -322,6 +322,8 @@ class SyncUser : public std::enable_shared_from_this, public Subscriba bool do_is_logged_in() const REQUIRES(m_tokens_mutex); + std::vector> revive_sessions() REQUIRES(m_mutex); + std::atomic m_state GUARDED_BY(m_mutex); util::AtomicSharedPtr m_binding_context; diff --git a/test/object-store/realm.cpp b/test/object-store/realm.cpp index 1b3a9ac2f28..1238f1af95b 100644 --- a/test/object-store/realm.cpp +++ b/test/object-store/realm.cpp @@ -847,7 +847,19 @@ TEST_CASE("Get Realm using Async Open", "[asyncOpen]") { } SECTION("cancels download and reports an error on auth error") { - SyncTestFile config(init_sync_manager.app(), "realm"); + struct Transport : realm::app::GenericNetworkTransport { + void send_request_to_server( + const realm::app::Request&, + realm::util::UniqueFunction&& completion) override + { + completion(app::Response{403}); + } + }; + TestSyncManager::Config tsm_config; + tsm_config.transport = std::make_shared(); + TestSyncManager tsm(tsm_config); + + SyncTestFile config(tsm.app(), "realm"); config.sync_config->user->update_refresh_token(std::string(invalid_token)); config.sync_config->user->update_access_token(std::move(invalid_token)); @@ -860,10 +872,10 @@ TEST_CASE("Get Realm using Async Open", "[asyncOpen]") { task->start([&](auto ref, auto error) { std::lock_guard lock(mutex); REQUIRE(error); + REQUIRE_THROWS_WITH(std::rethrow_exception(error), "Client Error: 403"); REQUIRE(!ref); called = true; }); - init_sync_manager.network_callback(app::Response{403}); util::EventLoop::main().run_until([&] { return called.load(); }); diff --git a/test/object-store/sync/session/session.cpp b/test/object-store/sync/session/session.cpp index 96ca3acb73c..e957f56f42a 100644 --- a/test/object-store/sync/session/session.cpp +++ b/test/object-store/sync/session/session.cpp @@ -108,8 +108,8 @@ TEST_CASE("SyncSession: management by SyncUser", "[sync]") { auto session2 = sync_session(user, "/test1c-2", [](auto, auto) {}); // Run the runloop many iterations to see if the sessions spuriously bind. spin_runloop(); - REQUIRE(sessions_are_inactive(*session1)); - REQUIRE(sessions_are_inactive(*session2)); + REQUIRE(session1->state() == SyncSession::State::Inactive); + REQUIRE(session2->state() == SyncSession::State::Inactive); REQUIRE(user->all_sessions().size() == 0); // Log the user back in via the sync manager. user = app->sync_manager()->get_user(user_id, ENCODE_FAKE_JWT("fake_refresh_token"), @@ -137,8 +137,8 @@ TEST_CASE("SyncSession: management by SyncUser", "[sync]") { REQUIRE(user->state() == SyncUser::State::LoggedOut); // Run the runloop many iterations to see if the sessions spuriously rebind. spin_runloop(); - REQUIRE(sessions_are_inactive(*session1)); - REQUIRE(sessions_are_inactive(*session2)); + REQUIRE(session1->state() == SyncSession::State::Inactive); + REQUIRE(session2->state() == SyncSession::State::Inactive); REQUIRE(user->all_sessions().size() == 0); // Log the user back in via the sync manager. user = app->sync_manager()->get_user(user_id, ENCODE_FAKE_JWT("fake_refresh_token"), diff --git a/test/object-store/sync/session/session_util.hpp b/test/object-store/sync/session/session_util.hpp index f055efce06c..0b910335792 100644 --- a/test/object-store/sync/session/session_util.hpp +++ b/test/object-store/sync/session/session_util.hpp @@ -29,6 +29,27 @@ using namespace realm; using namespace realm::util; +namespace Catch { +template <> +struct StringMaker { + static std::string convert(SyncSession::State state) + { + switch (state) { + case SyncSession::State::Active: + return "Active"; + case SyncSession::State::Dying: + return "Dying"; + case SyncSession::State::Inactive: + return "Inactive"; + case SyncSession::State::WaitingForAccessToken: + return "WaitingForAccessToken"; + default: + return "Unknown"; + } + } +}; +} // namespace Catch + inline bool sessions_are_active(const SyncSession& session) { return session.state() == SyncSession::State::Active; diff --git a/test/object-store/util/test_file.cpp b/test/object-store/util/test_file.cpp index 7a76de899c7..2f14b671110 100644 --- a/test/object-store/util/test_file.cpp +++ b/test/object-store/util/test_file.cpp @@ -333,11 +333,10 @@ TestAppSession::~TestAppSession() // MARK: - TestSyncManager TestSyncManager::TestSyncManager(const Config& config, const SyncServer::Config& sync_server_config) - : m_sync_server(sync_server_config) + : transport(config.transport ? config.transport : std::make_shared(network_callback)) + , m_sync_server(sync_server_config) , m_should_teardown_test_directory(config.should_teardown_test_directory) { - if (config.transport) - transport = config.transport; app::App::Config app_config = config.app_config; set_app_config_defaults(app_config, transport); diff --git a/test/object-store/util/test_file.hpp b/test/object-store/util/test_file.hpp index a6366d373dc..b6750134d74 100644 --- a/test/object-store/util/test_file.hpp +++ b/test/object-store/util/test_file.hpp @@ -266,7 +266,7 @@ class TestSyncManager { realm::util::UniqueFunction& network_callback; }; - std::shared_ptr transport = std::make_shared(network_callback); + const std::shared_ptr transport; private: std::shared_ptr m_app;