Skip to content

Commit

Permalink
Fix a race condition in error reporting for async open
Browse files Browse the repository at this point in the history
The intended behavior with async open is that sync errors related to the async
open are delivered to the completion callback. This didn't work reliably even
in the simplest scenario because we activated the sync session before attaching
the download completion handler, so a sufficiently fast error could be received
too early. This resulted in the async open completion callback getting called
with a generic "operation cancelled" error rather than the correct one.

To fix this, defer calling `revive_if_needed()` until after we've had the
chance to call `wait_for_download_completion()` This requires significantly
changing where this call happens.

Scenarios where the Realm is being opened on multiple threads at once still
aren't guaranteed to report errors correctly. I think that would require a very
different approach to how async open works, and probably isn't something people
are doing very much in practice.
  • Loading branch information
tgoyne committed Oct 24, 2022
1 parent 64ede25 commit 8d330bf
Show file tree
Hide file tree
Showing 12 changed files with 78 additions and 53 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
* Calling `SectionedResults::reset_section_callback()` on a `SectionedResults` which had been evaluated would result in an assertion failure the next time the sections are evaluated ([PR #5965](https://github.com/realm/realm-core/pull/5965), since v12.10.0).
* Opening an unencrypted file with an encryption key would sometimes report a misleading error message that indicated that the problem was something other than a decryption failure ([PR #5915](https://github.com/realm/realm-core/pull/5915), since 0.86.1).
* Fix a rare deadlock which could occur when closing a synchronized Realm immediately after committing a write transaction when the sync worker thread has also just finished processing a changeset from the server ([PR #5948](https://github.com/realm/realm-core/pull/5948)).
* Fix a race condition which could result in "operation cancelled" errors being delivered to async open callbacks rather than the actual sync error which caused things to fail ([PR #5968](https://github.com/realm/realm-core/pull/5968), since the introduction of async open).

### Breaking changes
* Websocket errors caused by the client sending a websocket message that is too large (i.e. greater than 16MB) now get reported as a `ProtocolError::limits_exceeded` error with a `ClientReset` requested by the server ([#5209](https://github.com/realm/realm-core/issues/5209)).
Expand Down
1 change: 1 addition & 0 deletions src/realm/object-store/audit.mm
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,7 @@ bool write_event(Timestamp timestamp, StringData activity, StringData event_type
// there's any old ones sitting on disk waiting to be uploaded.
scan_for_realms_to_upload();
});
session->revive_if_needed();
}

std::string AuditRealmPool::prefixed_partition(std::string const& partition)
Expand Down
9 changes: 6 additions & 3 deletions src/realm/object-store/impl/realm_coordinator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,16 +287,20 @@ void RealmCoordinator::do_get_realm(Realm::Config config, std::shared_ptr<Realm>
realm = Realm::make_shared_realm(std::move(config), version, shared_from_this());
m_weak_realm_notifiers.emplace_back(realm, config.cache);

if (realm->config().audit_config) {
#ifdef REALM_ENABLE_SYNC
if (m_sync_session && m_sync_session->user()->is_logged_in())
m_sync_session->revive_if_needed();

if (realm->config().audit_config) {
if (m_audit_context)
m_audit_context->update_metadata(realm->config().audit_config->metadata);
else
m_audit_context = make_audit_context(m_db, realm->config());
}
#else
if (realm->config().audit_config)
REALM_TERMINATE("Cannot use Audit interface if Realm Core is built without Sync");
#endif
}

realm_lock.unlock_unchecked();
if (schema) {
Expand Down Expand Up @@ -408,7 +412,6 @@ void RealmCoordinator::open_db()
m_sync_session = m_config.sync_config->user->sync_manager()->get_existing_session(m_config.path);
if (m_sync_session) {
m_db = SyncSession::Internal::get_db(*m_sync_session);
m_sync_session->revive_if_needed();
init_external_helpers();
return;
}
Expand Down
10 changes: 6 additions & 4 deletions src/realm/object-store/sync/async_open_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ AsyncOpenTask::AsyncOpenTask(std::shared_ptr<_impl::RealmCoordinator> coordinato

void AsyncOpenTask::start(util::UniqueFunction<void(ThreadSafeReference, std::exception_ptr)> callback)
{
util::CheckedLockGuard lock(m_mutex);
util::CheckedUniqueLock lock(m_mutex);
if (!m_session)
return;

m_session->revive_if_needed();
auto session = m_session;
lock.unlock();

std::shared_ptr<AsyncOpenTask> self(shared_from_this());
m_session->wait_for_download_completion([callback = std::move(callback), self, this](std::error_code ec) {
session->wait_for_download_completion([callback = std::move(callback), self, this](std::error_code ec) {
std::shared_ptr<_impl::RealmCoordinator> coordinator;
{
util::CheckedLockGuard lock(m_mutex);
Expand All @@ -68,6 +68,8 @@ void AsyncOpenTask::start(util::UniqueFunction<void(ThreadSafeReference, std::ex
}
callback(std::move(realm), nullptr);
});

session->revive_if_needed();
}

void AsyncOpenTask::cancel()
Expand Down
6 changes: 0 additions & 6 deletions src/realm/object-store/sync/sync_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -651,13 +651,7 @@ std::shared_ptr<SyncSession> SyncManager::get_session(std::shared_ptr<DB> db, co
// Create the external reference immediately to ensure that the session will become
// inactive if an exception is thrown in the following code.
auto external_reference = shared_session->external_reference();
// unlocking m_session_mutex here prevents a deadlock for synchronous network
// transports such as the unit test suite, in the case where the log in request is
// denied by the server: Active -> WaitingForAccessToken -> handle_refresh(401
// error) -> user.log_out() -> unregister_session (locks m_session_mutex again)
lock.unlock();
config.sync_config->user->register_session(std::move(shared_session));

return external_reference;
}

Expand Down
50 changes: 20 additions & 30 deletions src/realm/object-store/sync/sync_user.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,14 +186,7 @@ void SyncUser::update_state_and_tokens(SyncUser::State state, const std::string&
// Call set_state() rather than update_state_and_tokens to remove a user.
REALM_UNREACHABLE();
case State::LoggedIn:
sessions_to_revive.reserve(m_waiting_sessions.size());
for (auto& pair : m_waiting_sessions) {
if (auto ptr = pair.second.lock()) {
m_sessions[pair.first] = ptr;
sessions_to_revive.emplace_back(std::move(ptr));
}
}
m_waiting_sessions.clear();
sessions_to_revive = revive_sessions();
break;
case State::LoggedOut: {
REALM_ASSERT(m_access_token == RealmJWT{});
Expand All @@ -217,6 +210,20 @@ void SyncUser::update_state_and_tokens(SyncUser::State state, const std::string&
emit_change_to_subscribers(*this);
}

std::vector<std::shared_ptr<SyncSession>> SyncUser::revive_sessions()
{
std::vector<std::shared_ptr<SyncSession>> sessions_to_revive;
sessions_to_revive.reserve(m_waiting_sessions.size());
for (auto& [path, weak_session] : m_waiting_sessions) {
if (auto ptr = weak_session.lock()) {
m_sessions[path] = ptr;
sessions_to_revive.emplace_back(std::move(ptr));
}
}
m_waiting_sessions.clear();
return sessions_to_revive;
}

void SyncUser::update_refresh_token(std::string&& token)
{
std::vector<std::shared_ptr<SyncSession>> sessions_to_revive;
Expand All @@ -230,16 +237,9 @@ void SyncUser::update_refresh_token(std::string&& token)
m_refresh_token = RealmJWT(std::move(token));
break;
case State::LoggedOut: {
sessions_to_revive.reserve(m_waiting_sessions.size());
m_refresh_token = RealmJWT(std::move(token));
m_state = State::LoggedIn;
for (auto& pair : m_waiting_sessions) {
if (auto ptr = pair.second.lock()) {
m_sessions[pair.first] = ptr;
sessions_to_revive.emplace_back(std::move(ptr));
}
}
m_waiting_sessions.clear();
sessions_to_revive = revive_sessions();
break;
}
}
Expand Down Expand Up @@ -272,16 +272,9 @@ void SyncUser::update_access_token(std::string&& token)
m_access_token = RealmJWT(std::move(token));
break;
case State::LoggedOut: {
sessions_to_revive.reserve(m_waiting_sessions.size());
m_access_token = RealmJWT(std::move(token));
m_state = State::LoggedIn;
for (auto& pair : m_waiting_sessions) {
if (auto ptr = pair.second.lock()) {
m_sessions[pair.first] = ptr;
sessions_to_revive.emplace_back(std::move(ptr));
}
}
m_waiting_sessions.clear();
sessions_to_revive = revive_sessions();
break;
}
}
Expand Down Expand Up @@ -357,10 +350,10 @@ void SyncUser::log_out()
sync_manager_shared = m_sync_manager->shared_from_this();
// Move all active sessions into the waiting sessions pool. If the user is
// logged back in, they will automatically be reactivated.
for (auto& pair : m_sessions) {
if (auto ptr = pair.second.lock()) {
for (auto& [path, weak_session] : m_sessions) {
if (auto ptr = weak_session.lock()) {
ptr->log_out();
m_waiting_sessions[pair.first] = ptr;
m_waiting_sessions[path] = std::move(ptr);
}
}
m_sessions.clear();
Expand Down Expand Up @@ -459,10 +452,7 @@ void SyncUser::register_session(std::shared_ptr<SyncSession> session)
util::CheckedUniqueLock lock(m_mutex);
switch (m_state) {
case State::LoggedIn:
// Immediately ask the session to come online.
m_sessions[path] = session;
lock.unlock();
session->revive_if_needed();
break;
case State::LoggedOut:
m_waiting_sessions[path] = session;
Expand Down
2 changes: 2 additions & 0 deletions src/realm/object-store/sync/sync_user.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,8 @@ class SyncUser : public std::enable_shared_from_this<SyncUser>, public Subscriba

bool do_is_logged_in() const REQUIRES(m_tokens_mutex);

std::vector<std::shared_ptr<SyncSession>> revive_sessions() REQUIRES(m_mutex);

std::atomic<State> m_state GUARDED_BY(m_mutex);

util::AtomicSharedPtr<SyncUserContext> m_binding_context;
Expand Down
16 changes: 14 additions & 2 deletions test/object-store/realm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,19 @@ TEST_CASE("Get Realm using Async Open", "[asyncOpen]") {
}

SECTION("cancels download and reports an error on auth error") {
SyncTestFile config(init_sync_manager.app(), "realm");
struct Transport : realm::app::GenericNetworkTransport {
void send_request_to_server(
const realm::app::Request&,
realm::util::UniqueFunction<void(const realm::app::Response&)>&& completion) override
{
completion(app::Response{403});
}
};
TestSyncManager::Config tsm_config;
tsm_config.transport = std::make_shared<Transport>();
TestSyncManager tsm(tsm_config);

SyncTestFile config(tsm.app(), "realm");
config.sync_config->user->update_refresh_token(std::string(invalid_token));
config.sync_config->user->update_access_token(std::move(invalid_token));

Expand All @@ -860,10 +872,10 @@ TEST_CASE("Get Realm using Async Open", "[asyncOpen]") {
task->start([&](auto ref, auto error) {
std::lock_guard<std::mutex> lock(mutex);
REQUIRE(error);
REQUIRE_THROWS_WITH(std::rethrow_exception(error), "Client Error: 403");
REQUIRE(!ref);
called = true;
});
init_sync_manager.network_callback(app::Response{403});
util::EventLoop::main().run_until([&] {
return called.load();
});
Expand Down
8 changes: 4 additions & 4 deletions test/object-store/sync/session/session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ TEST_CASE("SyncSession: management by SyncUser", "[sync]") {
auto session2 = sync_session(user, "/test1c-2", [](auto, auto) {});
// Run the runloop many iterations to see if the sessions spuriously bind.
spin_runloop();
REQUIRE(sessions_are_inactive(*session1));
REQUIRE(sessions_are_inactive(*session2));
REQUIRE(session1->state() == SyncSession::State::Inactive);
REQUIRE(session2->state() == SyncSession::State::Inactive);
REQUIRE(user->all_sessions().size() == 0);
// Log the user back in via the sync manager.
user = app->sync_manager()->get_user(user_id, ENCODE_FAKE_JWT("fake_refresh_token"),
Expand Down Expand Up @@ -137,8 +137,8 @@ TEST_CASE("SyncSession: management by SyncUser", "[sync]") {
REQUIRE(user->state() == SyncUser::State::LoggedOut);
// Run the runloop many iterations to see if the sessions spuriously rebind.
spin_runloop();
REQUIRE(sessions_are_inactive(*session1));
REQUIRE(sessions_are_inactive(*session2));
REQUIRE(session1->state() == SyncSession::State::Inactive);
REQUIRE(session2->state() == SyncSession::State::Inactive);
REQUIRE(user->all_sessions().size() == 0);
// Log the user back in via the sync manager.
user = app->sync_manager()->get_user(user_id, ENCODE_FAKE_JWT("fake_refresh_token"),
Expand Down
21 changes: 21 additions & 0 deletions test/object-store/sync/session/session_util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,27 @@
using namespace realm;
using namespace realm::util;

namespace Catch {
template <>
struct StringMaker<SyncSession::State> {
static std::string convert(SyncSession::State state)
{
switch (state) {
case SyncSession::State::Active:
return "Active";
case SyncSession::State::Dying:
return "Dying";
case SyncSession::State::Inactive:
return "Inactive";
case SyncSession::State::WaitingForAccessToken:
return "WaitingForAccessToken";
default:
return "Unknown";
}
}
};
} // namespace Catch

inline bool sessions_are_active(const SyncSession& session)
{
return session.state() == SyncSession::State::Active;
Expand Down
5 changes: 2 additions & 3 deletions test/object-store/util/test_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -333,11 +333,10 @@ TestAppSession::~TestAppSession()
// MARK: - TestSyncManager

TestSyncManager::TestSyncManager(const Config& config, const SyncServer::Config& sync_server_config)
: m_sync_server(sync_server_config)
: transport(config.transport ? config.transport : std::make_shared<Transport>(network_callback))
, m_sync_server(sync_server_config)
, m_should_teardown_test_directory(config.should_teardown_test_directory)
{
if (config.transport)
transport = config.transport;
app::App::Config app_config = config.app_config;
set_app_config_defaults(app_config, transport);

Expand Down
2 changes: 1 addition & 1 deletion test/object-store/util/test_file.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ class TestSyncManager {

realm::util::UniqueFunction<void(const realm::app::Response&)>& network_callback;
};
std::shared_ptr<realm::app::GenericNetworkTransport> transport = std::make_shared<Transport>(network_callback);
const std::shared_ptr<realm::app::GenericNetworkTransport> transport;

private:
std::shared_ptr<realm::app::App> m_app;
Expand Down

0 comments on commit 8d330bf

Please sign in to comment.