Skip to content

Commit

Permalink
Fix a race condition in error reporting for async open (#5968)
Browse files Browse the repository at this point in the history
The intended behavior with async open is that sync errors related to the async
open are delivered to the completion callback. This didn't work reliably even
in the simplest scenario because we activated the sync session before attaching
the download completion handler, so a sufficiently fast error could be received
too early. This resulted in the async open completion callback getting called
with a generic "operation cancelled" error rather than the correct one.

To fix this, defer calling `revive_if_needed()` until after we've had the
chance to call `wait_for_download_completion()` This requires significantly
changing where this call happens.

Scenarios where the Realm is being opened on multiple threads at once still
aren't guaranteed to report errors correctly. I think that would require a very
different approach to how async open works, and probably isn't something people
are doing very much in practice.
  • Loading branch information
tgoyne authored Oct 26, 2022
1 parent cebcc27 commit 7ee0764
Show file tree
Hide file tree
Showing 13 changed files with 98 additions and 66 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

### Fixed
* <How do the end-user experience this issue? what was the impact?> ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?)
* None.
* Fix a race condition which could result in "operation cancelled" errors being delivered to async open callbacks rather than the actual sync error which caused things to fail ([PR #5968](https://github.com/realm/realm-core/pull/5968), since the introduction of async open).

### Breaking changes
* None.
Expand Down
1 change: 1 addition & 0 deletions src/realm/object-store/audit.mm
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,7 @@ bool write_event(Timestamp timestamp, StringData activity, StringData event_type
// there's any old ones sitting on disk waiting to be uploaded.
scan_for_realms_to_upload();
});
session->revive_if_needed();
}

std::string AuditRealmPool::prefixed_partition(std::string const& partition)
Expand Down
9 changes: 6 additions & 3 deletions src/realm/object-store/impl/realm_coordinator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,16 +287,20 @@ void RealmCoordinator::do_get_realm(Realm::Config config, std::shared_ptr<Realm>
realm = Realm::make_shared_realm(std::move(config), version, shared_from_this());
m_weak_realm_notifiers.emplace_back(realm, config.cache);

if (realm->config().audit_config) {
#ifdef REALM_ENABLE_SYNC
if (m_sync_session && m_sync_session->user()->is_logged_in())
m_sync_session->revive_if_needed();

if (realm->config().audit_config) {
if (m_audit_context)
m_audit_context->update_metadata(realm->config().audit_config->metadata);
else
m_audit_context = make_audit_context(m_db, realm->config());
}
#else
if (realm->config().audit_config)
REALM_TERMINATE("Cannot use Audit interface if Realm Core is built without Sync");
#endif
}

realm_lock.unlock_unchecked();
if (schema) {
Expand Down Expand Up @@ -408,7 +412,6 @@ void RealmCoordinator::open_db()
m_sync_session = m_config.sync_config->user->sync_manager()->get_existing_session(m_config.path);
if (m_sync_session) {
m_db = SyncSession::Internal::get_db(*m_sync_session);
m_sync_session->revive_if_needed();
init_external_helpers();
return;
}
Expand Down
10 changes: 6 additions & 4 deletions src/realm/object-store/sync/async_open_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ AsyncOpenTask::AsyncOpenTask(std::shared_ptr<_impl::RealmCoordinator> coordinato

void AsyncOpenTask::start(util::UniqueFunction<void(ThreadSafeReference, std::exception_ptr)> callback)
{
util::CheckedLockGuard lock(m_mutex);
util::CheckedUniqueLock lock(m_mutex);
if (!m_session)
return;

m_session->revive_if_needed();
auto session = m_session;
lock.unlock();

std::shared_ptr<AsyncOpenTask> self(shared_from_this());
m_session->wait_for_download_completion([callback = std::move(callback), self, this](std::error_code ec) {
session->wait_for_download_completion([callback = std::move(callback), self, this](std::error_code ec) {
std::shared_ptr<_impl::RealmCoordinator> coordinator;
{
util::CheckedLockGuard lock(m_mutex);
Expand All @@ -68,6 +68,8 @@ void AsyncOpenTask::start(util::UniqueFunction<void(ThreadSafeReference, std::ex
}
callback(std::move(realm), nullptr);
});

session->revive_if_needed();
}

void AsyncOpenTask::cancel()
Expand Down
36 changes: 19 additions & 17 deletions src/realm/object-store/sync/sync_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,34 +217,36 @@ void SyncManager::reset_for_testing()
m_users.clear();
m_current_user = nullptr;
}
{
util::CheckedLockGuard lock1(m_mutex);

{
util::CheckedLockGuard lock(m_mutex);
// Stop the client. This will abort any uploads that inactive sessions are waiting for.
if (m_sync_client)
m_sync_client->stop();
}

{
util::CheckedLockGuard lock2(m_session_mutex);
// Callers of `SyncManager::reset_for_testing` should ensure there are no existing sessions
// prior to calling `reset_for_testing`.
bool no_sessions = !do_has_existing_sessions();
REALM_ASSERT_RELEASE(no_sessions);

// Destroy any inactive sessions.
// FIXME: We shouldn't have any inactive sessions at this point! Sessions are expected to
// remain inactive until their final upload completes, at which point they are unregistered
// and destroyed. Our call to `sync::Client::stop` above aborts all uploads, so all sessions
// should have already been destroyed.
m_sessions.clear();
}
{
util::CheckedLockGuard lock(m_session_mutex);
// Callers of `SyncManager::reset_for_testing` should ensure there are no existing sessions
// prior to calling `reset_for_testing`.
bool no_sessions = !do_has_existing_sessions();
REALM_ASSERT_RELEASE(no_sessions);

// Destroy any inactive sessions.
// FIXME: We shouldn't have any inactive sessions at this point! Sessions are expected to
// remain inactive until their final upload completes, at which point they are unregistered
// and destroyed. Our call to `sync::Client::stop` above aborts all uploads, so all sessions
// should have already been destroyed.
m_sessions.clear();
}

{
util::CheckedLockGuard lock(m_mutex);
// Destroy the client now that we have no remaining sessions.
m_sync_client = nullptr;

// Reset even more state.
m_config = {};

m_sync_route = "";
}

Expand Down
2 changes: 1 addition & 1 deletion src/realm/object-store/sync/sync_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ class SyncManager : public std::enable_shared_from_this<SyncManager> {
std::vector<std::shared_ptr<SyncUser>> m_users GUARDED_BY(m_user_mutex);
std::shared_ptr<SyncUser> m_current_user GUARDED_BY(m_user_mutex);

mutable std::unique_ptr<_impl::SyncClient> m_sync_client;
mutable std::unique_ptr<_impl::SyncClient> m_sync_client GUARDED_BY(m_mutex);

SyncClientConfig m_config GUARDED_BY(m_mutex);

Expand Down
50 changes: 20 additions & 30 deletions src/realm/object-store/sync/sync_user.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,14 +186,7 @@ void SyncUser::update_state_and_tokens(SyncUser::State state, const std::string&
// Call set_state() rather than update_state_and_tokens to remove a user.
REALM_UNREACHABLE();
case State::LoggedIn:
sessions_to_revive.reserve(m_waiting_sessions.size());
for (auto& pair : m_waiting_sessions) {
if (auto ptr = pair.second.lock()) {
m_sessions[pair.first] = ptr;
sessions_to_revive.emplace_back(std::move(ptr));
}
}
m_waiting_sessions.clear();
sessions_to_revive = revive_sessions();
break;
case State::LoggedOut: {
REALM_ASSERT(m_access_token == RealmJWT{});
Expand All @@ -217,6 +210,20 @@ void SyncUser::update_state_and_tokens(SyncUser::State state, const std::string&
emit_change_to_subscribers(*this);
}

std::vector<std::shared_ptr<SyncSession>> SyncUser::revive_sessions()
{
std::vector<std::shared_ptr<SyncSession>> sessions_to_revive;
sessions_to_revive.reserve(m_waiting_sessions.size());
for (auto& [path, weak_session] : m_waiting_sessions) {
if (auto ptr = weak_session.lock()) {
m_sessions[path] = ptr;
sessions_to_revive.emplace_back(std::move(ptr));
}
}
m_waiting_sessions.clear();
return sessions_to_revive;
}

void SyncUser::update_refresh_token(std::string&& token)
{
std::vector<std::shared_ptr<SyncSession>> sessions_to_revive;
Expand All @@ -230,16 +237,9 @@ void SyncUser::update_refresh_token(std::string&& token)
m_refresh_token = RealmJWT(std::move(token));
break;
case State::LoggedOut: {
sessions_to_revive.reserve(m_waiting_sessions.size());
m_refresh_token = RealmJWT(std::move(token));
m_state = State::LoggedIn;
for (auto& pair : m_waiting_sessions) {
if (auto ptr = pair.second.lock()) {
m_sessions[pair.first] = ptr;
sessions_to_revive.emplace_back(std::move(ptr));
}
}
m_waiting_sessions.clear();
sessions_to_revive = revive_sessions();
break;
}
}
Expand Down Expand Up @@ -272,16 +272,9 @@ void SyncUser::update_access_token(std::string&& token)
m_access_token = RealmJWT(std::move(token));
break;
case State::LoggedOut: {
sessions_to_revive.reserve(m_waiting_sessions.size());
m_access_token = RealmJWT(std::move(token));
m_state = State::LoggedIn;
for (auto& pair : m_waiting_sessions) {
if (auto ptr = pair.second.lock()) {
m_sessions[pair.first] = ptr;
sessions_to_revive.emplace_back(std::move(ptr));
}
}
m_waiting_sessions.clear();
sessions_to_revive = revive_sessions();
break;
}
}
Expand Down Expand Up @@ -357,10 +350,10 @@ void SyncUser::log_out()
sync_manager_shared = m_sync_manager->shared_from_this();
// Move all active sessions into the waiting sessions pool. If the user is
// logged back in, they will automatically be reactivated.
for (auto& pair : m_sessions) {
if (auto ptr = pair.second.lock()) {
for (auto& [path, weak_session] : m_sessions) {
if (auto ptr = weak_session.lock()) {
ptr->log_out();
m_waiting_sessions[pair.first] = ptr;
m_waiting_sessions[path] = std::move(ptr);
}
}
m_sessions.clear();
Expand Down Expand Up @@ -459,10 +452,7 @@ void SyncUser::register_session(std::shared_ptr<SyncSession> session)
util::CheckedUniqueLock lock(m_mutex);
switch (m_state) {
case State::LoggedIn:
// Immediately ask the session to come online.
m_sessions[path] = session;
lock.unlock();
session->revive_if_needed();
break;
case State::LoggedOut:
m_waiting_sessions[path] = session;
Expand Down
2 changes: 2 additions & 0 deletions src/realm/object-store/sync/sync_user.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,8 @@ class SyncUser : public std::enable_shared_from_this<SyncUser>, public Subscriba

bool do_is_logged_in() const REQUIRES(m_tokens_mutex);

std::vector<std::shared_ptr<SyncSession>> revive_sessions() REQUIRES(m_mutex);

std::atomic<State> m_state GUARDED_BY(m_mutex);

util::AtomicSharedPtr<SyncUserContext> m_binding_context;
Expand Down
16 changes: 14 additions & 2 deletions test/object-store/realm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,19 @@ TEST_CASE("Get Realm using Async Open", "[asyncOpen]") {
}

SECTION("cancels download and reports an error on auth error") {
SyncTestFile config(init_sync_manager.app(), "realm");
struct Transport : realm::app::GenericNetworkTransport {
void send_request_to_server(
const realm::app::Request&,
realm::util::UniqueFunction<void(const realm::app::Response&)>&& completion) override
{
completion(app::Response{403});
}
};
TestSyncManager::Config tsm_config;
tsm_config.transport = std::make_shared<Transport>();
TestSyncManager tsm(tsm_config);

SyncTestFile config(tsm.app(), "realm");
config.sync_config->user->update_refresh_token(std::string(invalid_token));
config.sync_config->user->update_access_token(std::move(invalid_token));

Expand All @@ -860,10 +872,10 @@ TEST_CASE("Get Realm using Async Open", "[asyncOpen]") {
task->start([&](auto ref, auto error) {
std::lock_guard<std::mutex> lock(mutex);
REQUIRE(error);
REQUIRE_THROWS_WITH(std::rethrow_exception(error), "Client Error: 403");
REQUIRE(!ref);
called = true;
});
init_sync_manager.network_callback(app::Response{403});
util::EventLoop::main().run_until([&] {
return called.load();
});
Expand Down
8 changes: 4 additions & 4 deletions test/object-store/sync/session/session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ TEST_CASE("SyncSession: management by SyncUser", "[sync]") {
auto session2 = sync_session(user, "/test1c-2", [](auto, auto) {});
// Run the runloop many iterations to see if the sessions spuriously bind.
spin_runloop();
REQUIRE(sessions_are_inactive(*session1));
REQUIRE(sessions_are_inactive(*session2));
REQUIRE(session1->state() == SyncSession::State::Inactive);
REQUIRE(session2->state() == SyncSession::State::Inactive);
REQUIRE(user->all_sessions().size() == 0);
// Log the user back in via the sync manager.
user = app->sync_manager()->get_user(user_id, ENCODE_FAKE_JWT("fake_refresh_token"),
Expand Down Expand Up @@ -137,8 +137,8 @@ TEST_CASE("SyncSession: management by SyncUser", "[sync]") {
REQUIRE(user->state() == SyncUser::State::LoggedOut);
// Run the runloop many iterations to see if the sessions spuriously rebind.
spin_runloop();
REQUIRE(sessions_are_inactive(*session1));
REQUIRE(sessions_are_inactive(*session2));
REQUIRE(session1->state() == SyncSession::State::Inactive);
REQUIRE(session2->state() == SyncSession::State::Inactive);
REQUIRE(user->all_sessions().size() == 0);
// Log the user back in via the sync manager.
user = app->sync_manager()->get_user(user_id, ENCODE_FAKE_JWT("fake_refresh_token"),
Expand Down
21 changes: 21 additions & 0 deletions test/object-store/sync/session/session_util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,27 @@
using namespace realm;
using namespace realm::util;

namespace Catch {
template <>
struct StringMaker<SyncSession::State> {
static std::string convert(SyncSession::State state)
{
switch (state) {
case SyncSession::State::Active:
return "Active";
case SyncSession::State::Dying:
return "Dying";
case SyncSession::State::Inactive:
return "Inactive";
case SyncSession::State::WaitingForAccessToken:
return "WaitingForAccessToken";
default:
return "Unknown";
}
}
};
} // namespace Catch

inline bool sessions_are_active(const SyncSession& session)
{
return session.state() == SyncSession::State::Active;
Expand Down
5 changes: 2 additions & 3 deletions test/object-store/util/test_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -333,11 +333,10 @@ TestAppSession::~TestAppSession()
// MARK: - TestSyncManager

TestSyncManager::TestSyncManager(const Config& config, const SyncServer::Config& sync_server_config)
: m_sync_server(sync_server_config)
: transport(config.transport ? config.transport : std::make_shared<Transport>(network_callback))
, m_sync_server(sync_server_config)
, m_should_teardown_test_directory(config.should_teardown_test_directory)
{
if (config.transport)
transport = config.transport;
app::App::Config app_config = config.app_config;
set_app_config_defaults(app_config, transport);

Expand Down
2 changes: 1 addition & 1 deletion test/object-store/util/test_file.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ class TestSyncManager {

realm::util::UniqueFunction<void(const realm::app::Response&)>& network_callback;
};
std::shared_ptr<realm::app::GenericNetworkTransport> transport = std::make_shared<Transport>(network_callback);
const std::shared_ptr<realm::app::GenericNetworkTransport> transport;

private:
std::shared_ptr<realm::app::App> m_app;
Expand Down

0 comments on commit 7ee0764

Please sign in to comment.