Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a race condition in error reporting for async open #5968

Merged
merged 1 commit into from
Oct 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

### Fixed
* <How do the end-user experience this issue? what was the impact?> ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?)
* None.
* Fix a race condition which could result in "operation cancelled" errors being delivered to async open callbacks rather than the actual sync error which caused things to fail ([PR #5968](https://github.com/realm/realm-core/pull/5968), since the introduction of async open).

### Breaking changes
* None.
Expand Down
1 change: 1 addition & 0 deletions src/realm/object-store/audit.mm
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,7 @@ bool write_event(Timestamp timestamp, StringData activity, StringData event_type
// there's any old ones sitting on disk waiting to be uploaded.
scan_for_realms_to_upload();
});
session->revive_if_needed();
}

std::string AuditRealmPool::prefixed_partition(std::string const& partition)
Expand Down
9 changes: 6 additions & 3 deletions src/realm/object-store/impl/realm_coordinator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,16 +287,20 @@ void RealmCoordinator::do_get_realm(Realm::Config config, std::shared_ptr<Realm>
realm = Realm::make_shared_realm(std::move(config), version, shared_from_this());
m_weak_realm_notifiers.emplace_back(realm, config.cache);

if (realm->config().audit_config) {
#ifdef REALM_ENABLE_SYNC
if (m_sync_session && m_sync_session->user()->is_logged_in())
m_sync_session->revive_if_needed();

if (realm->config().audit_config) {
if (m_audit_context)
m_audit_context->update_metadata(realm->config().audit_config->metadata);
else
m_audit_context = make_audit_context(m_db, realm->config());
}
#else
if (realm->config().audit_config)
REALM_TERMINATE("Cannot use Audit interface if Realm Core is built without Sync");
#endif
}

realm_lock.unlock_unchecked();
if (schema) {
Expand Down Expand Up @@ -408,7 +412,6 @@ void RealmCoordinator::open_db()
m_sync_session = m_config.sync_config->user->sync_manager()->get_existing_session(m_config.path);
if (m_sync_session) {
m_db = SyncSession::Internal::get_db(*m_sync_session);
m_sync_session->revive_if_needed();
init_external_helpers();
return;
}
Expand Down
10 changes: 6 additions & 4 deletions src/realm/object-store/sync/async_open_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ AsyncOpenTask::AsyncOpenTask(std::shared_ptr<_impl::RealmCoordinator> coordinato

void AsyncOpenTask::start(util::UniqueFunction<void(ThreadSafeReference, std::exception_ptr)> callback)
{
util::CheckedLockGuard lock(m_mutex);
util::CheckedUniqueLock lock(m_mutex);
if (!m_session)
return;

m_session->revive_if_needed();
auto session = m_session;
lock.unlock();

std::shared_ptr<AsyncOpenTask> self(shared_from_this());
m_session->wait_for_download_completion([callback = std::move(callback), self, this](std::error_code ec) {
session->wait_for_download_completion([callback = std::move(callback), self, this](std::error_code ec) {
std::shared_ptr<_impl::RealmCoordinator> coordinator;
{
util::CheckedLockGuard lock(m_mutex);
Expand All @@ -68,6 +68,8 @@ void AsyncOpenTask::start(util::UniqueFunction<void(ThreadSafeReference, std::ex
}
callback(std::move(realm), nullptr);
});

session->revive_if_needed();
}

void AsyncOpenTask::cancel()
Expand Down
36 changes: 19 additions & 17 deletions src/realm/object-store/sync/sync_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,34 +217,36 @@ void SyncManager::reset_for_testing()
m_users.clear();
m_current_user = nullptr;
}
{
util::CheckedLockGuard lock1(m_mutex);

{
util::CheckedLockGuard lock(m_mutex);
// Stop the client. This will abort any uploads that inactive sessions are waiting for.
if (m_sync_client)
m_sync_client->stop();
}

{
util::CheckedLockGuard lock2(m_session_mutex);
// Callers of `SyncManager::reset_for_testing` should ensure there are no existing sessions
// prior to calling `reset_for_testing`.
bool no_sessions = !do_has_existing_sessions();
REALM_ASSERT_RELEASE(no_sessions);

// Destroy any inactive sessions.
// FIXME: We shouldn't have any inactive sessions at this point! Sessions are expected to
// remain inactive until their final upload completes, at which point they are unregistered
// and destroyed. Our call to `sync::Client::stop` above aborts all uploads, so all sessions
// should have already been destroyed.
m_sessions.clear();
}
{
util::CheckedLockGuard lock(m_session_mutex);
// Callers of `SyncManager::reset_for_testing` should ensure there are no existing sessions
// prior to calling `reset_for_testing`.
bool no_sessions = !do_has_existing_sessions();
REALM_ASSERT_RELEASE(no_sessions);

// Destroy any inactive sessions.
// FIXME: We shouldn't have any inactive sessions at this point! Sessions are expected to
// remain inactive until their final upload completes, at which point they are unregistered
// and destroyed. Our call to `sync::Client::stop` above aborts all uploads, so all sessions
// should have already been destroyed.
m_sessions.clear();
}

{
util::CheckedLockGuard lock(m_mutex);
// Destroy the client now that we have no remaining sessions.
m_sync_client = nullptr;

// Reset even more state.
m_config = {};

m_sync_route = "";
}

Expand Down
2 changes: 1 addition & 1 deletion src/realm/object-store/sync/sync_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ class SyncManager : public std::enable_shared_from_this<SyncManager> {
std::vector<std::shared_ptr<SyncUser>> m_users GUARDED_BY(m_user_mutex);
std::shared_ptr<SyncUser> m_current_user GUARDED_BY(m_user_mutex);

mutable std::unique_ptr<_impl::SyncClient> m_sync_client;
mutable std::unique_ptr<_impl::SyncClient> m_sync_client GUARDED_BY(m_mutex);

SyncClientConfig m_config GUARDED_BY(m_mutex);

Expand Down
50 changes: 20 additions & 30 deletions src/realm/object-store/sync/sync_user.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,14 +186,7 @@ void SyncUser::update_state_and_tokens(SyncUser::State state, const std::string&
// Call set_state() rather than update_state_and_tokens to remove a user.
REALM_UNREACHABLE();
case State::LoggedIn:
sessions_to_revive.reserve(m_waiting_sessions.size());
for (auto& pair : m_waiting_sessions) {
if (auto ptr = pair.second.lock()) {
m_sessions[pair.first] = ptr;
sessions_to_revive.emplace_back(std::move(ptr));
}
}
m_waiting_sessions.clear();
sessions_to_revive = revive_sessions();
break;
case State::LoggedOut: {
REALM_ASSERT(m_access_token == RealmJWT{});
Expand All @@ -217,6 +210,20 @@ void SyncUser::update_state_and_tokens(SyncUser::State state, const std::string&
emit_change_to_subscribers(*this);
}

std::vector<std::shared_ptr<SyncSession>> SyncUser::revive_sessions()
{
std::vector<std::shared_ptr<SyncSession>> sessions_to_revive;
sessions_to_revive.reserve(m_waiting_sessions.size());
for (auto& [path, weak_session] : m_waiting_sessions) {
if (auto ptr = weak_session.lock()) {
m_sessions[path] = ptr;
sessions_to_revive.emplace_back(std::move(ptr));
}
}
m_waiting_sessions.clear();
return sessions_to_revive;
}

void SyncUser::update_refresh_token(std::string&& token)
{
std::vector<std::shared_ptr<SyncSession>> sessions_to_revive;
Expand All @@ -230,16 +237,9 @@ void SyncUser::update_refresh_token(std::string&& token)
m_refresh_token = RealmJWT(std::move(token));
break;
case State::LoggedOut: {
sessions_to_revive.reserve(m_waiting_sessions.size());
m_refresh_token = RealmJWT(std::move(token));
m_state = State::LoggedIn;
for (auto& pair : m_waiting_sessions) {
if (auto ptr = pair.second.lock()) {
m_sessions[pair.first] = ptr;
sessions_to_revive.emplace_back(std::move(ptr));
}
}
m_waiting_sessions.clear();
sessions_to_revive = revive_sessions();
break;
}
}
Expand Down Expand Up @@ -272,16 +272,9 @@ void SyncUser::update_access_token(std::string&& token)
m_access_token = RealmJWT(std::move(token));
break;
case State::LoggedOut: {
sessions_to_revive.reserve(m_waiting_sessions.size());
m_access_token = RealmJWT(std::move(token));
m_state = State::LoggedIn;
for (auto& pair : m_waiting_sessions) {
if (auto ptr = pair.second.lock()) {
m_sessions[pair.first] = ptr;
sessions_to_revive.emplace_back(std::move(ptr));
}
}
m_waiting_sessions.clear();
sessions_to_revive = revive_sessions();
break;
}
}
Expand Down Expand Up @@ -357,10 +350,10 @@ void SyncUser::log_out()
sync_manager_shared = m_sync_manager->shared_from_this();
// Move all active sessions into the waiting sessions pool. If the user is
// logged back in, they will automatically be reactivated.
for (auto& pair : m_sessions) {
if (auto ptr = pair.second.lock()) {
for (auto& [path, weak_session] : m_sessions) {
if (auto ptr = weak_session.lock()) {
ptr->log_out();
m_waiting_sessions[pair.first] = ptr;
m_waiting_sessions[path] = std::move(ptr);
}
}
m_sessions.clear();
Expand Down Expand Up @@ -459,10 +452,7 @@ void SyncUser::register_session(std::shared_ptr<SyncSession> session)
util::CheckedUniqueLock lock(m_mutex);
switch (m_state) {
case State::LoggedIn:
// Immediately ask the session to come online.
m_sessions[path] = session;
lock.unlock();
session->revive_if_needed();
break;
case State::LoggedOut:
m_waiting_sessions[path] = session;
Expand Down
2 changes: 2 additions & 0 deletions src/realm/object-store/sync/sync_user.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,8 @@ class SyncUser : public std::enable_shared_from_this<SyncUser>, public Subscriba

bool do_is_logged_in() const REQUIRES(m_tokens_mutex);

std::vector<std::shared_ptr<SyncSession>> revive_sessions() REQUIRES(m_mutex);

std::atomic<State> m_state GUARDED_BY(m_mutex);

util::AtomicSharedPtr<SyncUserContext> m_binding_context;
Expand Down
16 changes: 14 additions & 2 deletions test/object-store/realm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,19 @@ TEST_CASE("Get Realm using Async Open", "[asyncOpen]") {
}

SECTION("cancels download and reports an error on auth error") {
SyncTestFile config(init_sync_manager.app(), "realm");
struct Transport : realm::app::GenericNetworkTransport {
void send_request_to_server(
const realm::app::Request&,
realm::util::UniqueFunction<void(const realm::app::Response&)>&& completion) override
{
completion(app::Response{403});
}
};
TestSyncManager::Config tsm_config;
tsm_config.transport = std::make_shared<Transport>();
TestSyncManager tsm(tsm_config);

SyncTestFile config(tsm.app(), "realm");
config.sync_config->user->update_refresh_token(std::string(invalid_token));
config.sync_config->user->update_access_token(std::move(invalid_token));

Expand All @@ -860,10 +872,10 @@ TEST_CASE("Get Realm using Async Open", "[asyncOpen]") {
task->start([&](auto ref, auto error) {
std::lock_guard<std::mutex> lock(mutex);
REQUIRE(error);
REQUIRE_THROWS_WITH(std::rethrow_exception(error), "Client Error: 403");
REQUIRE(!ref);
called = true;
});
init_sync_manager.network_callback(app::Response{403});
util::EventLoop::main().run_until([&] {
return called.load();
});
Expand Down
8 changes: 4 additions & 4 deletions test/object-store/sync/session/session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ TEST_CASE("SyncSession: management by SyncUser", "[sync]") {
auto session2 = sync_session(user, "/test1c-2", [](auto, auto) {});
// Run the runloop many iterations to see if the sessions spuriously bind.
spin_runloop();
REQUIRE(sessions_are_inactive(*session1));
REQUIRE(sessions_are_inactive(*session2));
REQUIRE(session1->state() == SyncSession::State::Inactive);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes are just to get more useful reporting when the test fails.

REQUIRE(session2->state() == SyncSession::State::Inactive);
REQUIRE(user->all_sessions().size() == 0);
// Log the user back in via the sync manager.
user = app->sync_manager()->get_user(user_id, ENCODE_FAKE_JWT("fake_refresh_token"),
Expand Down Expand Up @@ -137,8 +137,8 @@ TEST_CASE("SyncSession: management by SyncUser", "[sync]") {
REQUIRE(user->state() == SyncUser::State::LoggedOut);
// Run the runloop many iterations to see if the sessions spuriously rebind.
spin_runloop();
REQUIRE(sessions_are_inactive(*session1));
REQUIRE(sessions_are_inactive(*session2));
REQUIRE(session1->state() == SyncSession::State::Inactive);
REQUIRE(session2->state() == SyncSession::State::Inactive);
REQUIRE(user->all_sessions().size() == 0);
// Log the user back in via the sync manager.
user = app->sync_manager()->get_user(user_id, ENCODE_FAKE_JWT("fake_refresh_token"),
Expand Down
21 changes: 21 additions & 0 deletions test/object-store/sync/session/session_util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,27 @@
using namespace realm;
using namespace realm::util;

namespace Catch {
template <>
struct StringMaker<SyncSession::State> {
static std::string convert(SyncSession::State state)
{
switch (state) {
case SyncSession::State::Active:
return "Active";
case SyncSession::State::Dying:
return "Dying";
case SyncSession::State::Inactive:
return "Inactive";
case SyncSession::State::WaitingForAccessToken:
return "WaitingForAccessToken";
default:
return "Unknown";
}
}
};
} // namespace Catch

inline bool sessions_are_active(const SyncSession& session)
{
return session.state() == SyncSession::State::Active;
Expand Down
5 changes: 2 additions & 3 deletions test/object-store/util/test_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -333,11 +333,10 @@ TestAppSession::~TestAppSession()
// MARK: - TestSyncManager

TestSyncManager::TestSyncManager(const Config& config, const SyncServer::Config& sync_server_config)
: m_sync_server(sync_server_config)
: transport(config.transport ? config.transport : std::make_shared<Transport>(network_callback))
, m_sync_server(sync_server_config)
, m_should_teardown_test_directory(config.should_teardown_test_directory)
{
if (config.transport)
transport = config.transport;
app::App::Config app_config = config.app_config;
set_app_config_defaults(app_config, transport);

Expand Down
2 changes: 1 addition & 1 deletion test/object-store/util/test_file.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ class TestSyncManager {

realm::util::UniqueFunction<void(const realm::app::Response&)>& network_callback;
};
std::shared_ptr<realm::app::GenericNetworkTransport> transport = std::make_shared<Transport>(network_callback);
const std::shared_ptr<realm::app::GenericNetworkTransport> transport;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assigning to transport after the TestSyncManager is constructed doesn't do anything useful and I wasted a bit of time discovering that, so I made it const.


private:
std::shared_ptr<realm::app::App> m_app;
Expand Down