Skip to content

Commit

Permalink
Introduce CheckedRecursiveMutex and turn m_state_mutex into one. Acce…
Browse files Browse the repository at this point in the history
…ss m_sync_manager only under a lock.
  • Loading branch information
danieltabacaru committed Sep 29, 2022
1 parent 8ebd18f commit a8ba34b
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 61 deletions.
83 changes: 36 additions & 47 deletions src/realm/object-store/sync/sync_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ void SyncSession::become_active()
}
}

void SyncSession::become_dying(util::CheckedUniqueLock lock)
void SyncSession::become_dying(util::CheckedRecursiveLock lock)
{
REALM_ASSERT(m_state != State::Dying);
m_state = State::Dying;
Expand All @@ -117,7 +117,7 @@ void SyncSession::become_dying(util::CheckedUniqueLock lock)
m_session->async_wait_for_upload_completion(
[weak_session = weak_from_this(), current_death_count](std::error_code) {
if (auto session = weak_session.lock()) {
util::CheckedUniqueLock lock(session->m_state_mutex);
util::CheckedRecursiveLock lock(session->m_state_mutex);
if (session->m_state == State::Dying && session->m_death_count == current_death_count) {
session->become_inactive(std::move(lock));
}
Expand All @@ -126,7 +126,7 @@ void SyncSession::become_dying(util::CheckedUniqueLock lock)
m_state_mutex.unlock(lock);
}

void SyncSession::become_inactive(util::CheckedUniqueLock lock, std::error_code ec)
void SyncSession::become_inactive(util::CheckedRecursiveLock lock, std::error_code ec)
{
REALM_ASSERT(m_state != State::Inactive);
m_state = State::Inactive;
Expand All @@ -143,9 +143,10 @@ void SyncSession::become_inactive(util::CheckedUniqueLock lock, std::error_code
std::swap(waits, m_completion_callbacks);

m_session = nullptr;
auto& sync_manager = *m_sync_manager;
if (m_sync_manager) {
m_sync_manager->unregister_session(m_db->get_path());
}
m_state_mutex.unlock(lock);
sync_manager.unregister_session(m_db->get_path());

// Send notifications after releasing the lock to prevent deadlocks in the callback.
if (old_state != new_state) {
Expand All @@ -171,7 +172,7 @@ void SyncSession::handle_bad_auth(const std::shared_ptr<SyncUser>& user, std::er
{
// TODO: ideally this would write to the logs as well in case users didn't set up their error handler.
{
util::CheckedUniqueLock lock(m_state_mutex);
util::CheckedRecursiveLock lock(m_state_mutex);
cancel_pending_waits(std::move(lock), error_code);
}
if (user) {
Expand All @@ -190,7 +191,7 @@ SyncSession::handle_refresh(const std::shared_ptr<SyncSession>& session)
return [session](util::Optional<app::AppError> error) {
auto session_user = session->user();
if (!session_user) {
util::CheckedUniqueLock lock(session->m_state_mutex);
util::CheckedRecursiveLock lock(session->m_state_mutex);
session->cancel_pending_waits(std::move(lock), error ? error->error_code : std::error_code());
}
else if (error) {
Expand Down Expand Up @@ -223,7 +224,7 @@ SyncSession::handle_refresh(const std::shared_ptr<SyncSession>& session)
// to let the sync client attempt to reinitialize the connection using its own
// internal backoff timer which will happen automatically so nothing needs to
// happen here.
util::CheckedUniqueLock lock(session->m_state_mutex);
util::CheckedRecursiveLock lock(session->m_state_mutex);
if (session->m_state == State::WaitingForAccessToken) {
session->become_active();
}
Expand All @@ -245,7 +246,7 @@ SyncSession::SyncSession(SyncClient& client, std::shared_ptr<DB> db, const Realm
}

return sync::SubscriptionStore::create(m_db, [this](int64_t new_version) {
util::CheckedLockGuard lk(m_state_mutex);
util::CheckedRecursiveLock lk(m_state_mutex);
if (m_state != State::Active && m_state != State::WaitingForAccessToken) {
return;
}
Expand Down Expand Up @@ -288,19 +289,15 @@ SyncSession::SyncSession(SyncClient& client, std::shared_ptr<DB> db, const Realm

std::shared_ptr<SyncManager> SyncSession::sync_manager() const
{
util::CheckedLockGuard lk(m_state_mutex);
util::CheckedRecursiveLock lk(m_state_mutex);
REALM_ASSERT(m_sync_manager);
return m_sync_manager->shared_from_this();
}

void SyncSession::detach_from_sync_manager()
{
{
util::CheckedUniqueLock lock(m_state_mutex);
m_detaching_from_sync_manager = true;
}
shutdown_and_wait();
util::CheckedLockGuard lk(m_state_mutex);
util::CheckedRecursiveLock lk(m_state_mutex);
m_sync_manager = nullptr;
}

Expand Down Expand Up @@ -367,7 +364,7 @@ void SyncSession::download_fresh_realm(sync::ProtocolErrorInfo::Action server_re
return;
}

util::CheckedLockGuard state_lock(m_state_mutex);
util::CheckedRecursiveLock state_lock(m_state_mutex);
if (m_state != State::Active) {
return;
}
Expand Down Expand Up @@ -452,7 +449,7 @@ void SyncSession::download_fresh_realm(sync::ProtocolErrorInfo::Action server_re
void SyncSession::handle_fresh_realm_downloaded(DBRef db, util::Optional<std::string> error_message,
sync::ProtocolErrorInfo::Action server_requests_action)
{
util::CheckedUniqueLock lock(m_state_mutex);
util::CheckedRecursiveLock lock(m_state_mutex);
if (m_state != State::Active) {
return;
}
Expand Down Expand Up @@ -495,7 +492,7 @@ void SyncSession::handle_fresh_realm_downloaded(DBRef db, util::Optional<std::st
std::swap(m_completion_callbacks, callbacks);
// always swap back, even if advance_state throws
auto guard = util::make_scope_exit([&]() noexcept {
util::CheckedUniqueLock lock(m_state_mutex);
util::CheckedRecursiveLock lock(m_state_mutex);
if (m_completion_callbacks.empty())
std::swap(callbacks, m_completion_callbacks);
else
Expand Down Expand Up @@ -623,7 +620,7 @@ void SyncSession::handle_error(SyncError error)
error.is_unrecognized_by_client = true;
}

util::CheckedUniqueLock lock(m_state_mutex);
util::CheckedRecursiveLock lock(m_state_mutex);
if (delete_file)
update_error_and_mark_file_for_deletion(error, *delete_file);

Expand Down Expand Up @@ -664,7 +661,7 @@ void SyncSession::handle_error(SyncError error)
}
}

void SyncSession::cancel_pending_waits(util::CheckedUniqueLock lock, std::error_code error)
void SyncSession::cancel_pending_waits(util::CheckedRecursiveLock lock, std::error_code error)
{
CompletionCallbacks callbacks;
std::swap(callbacks, m_completion_callbacks);
Expand Down Expand Up @@ -790,7 +787,7 @@ void SyncSession::create_sync_session()
// Configure the sync transaction callback.
auto wrapped_callback = [weak_self](VersionID old_version, VersionID new_version) {
if (auto self = weak_self.lock()) {
util::CheckedLockGuard l(self->m_state_mutex);
util::CheckedRecursiveLock l(self->m_state_mutex);
if (self->m_sync_transact_callback) {
self->m_sync_transact_callback(old_version, new_version);
}
Expand Down Expand Up @@ -851,15 +848,15 @@ void SyncSession::create_sync_session()

void SyncSession::set_sync_transact_callback(util::UniqueFunction<sync::Session::SyncTransactCallback> callback)
{
util::CheckedLockGuard l(m_state_mutex);
util::CheckedRecursiveLock l(m_state_mutex);
m_sync_transact_callback = std::move(callback);
}

void SyncSession::nonsync_transact_notify(sync::version_type version)
{
m_progress_notifier.set_local_version(version);

util::CheckedUniqueLock lock(m_state_mutex);
util::CheckedRecursiveLock lock(m_state_mutex);
switch (m_state) {
case State::Active:
case State::WaitingForAccessToken:
Expand All @@ -875,7 +872,7 @@ void SyncSession::nonsync_transact_notify(sync::version_type version)

void SyncSession::revive_if_needed()
{
util::CheckedUniqueLock lock(m_state_mutex);
util::CheckedRecursiveLock lock(m_state_mutex);
switch (m_state) {
case State::Active:
case State::WaitingForAccessToken:
Expand Down Expand Up @@ -903,7 +900,7 @@ void SyncSession::revive_if_needed()

void SyncSession::handle_reconnect()
{
util::CheckedUniqueLock lock(m_state_mutex);
util::CheckedRecursiveLock lock(m_state_mutex);
switch (m_state) {
case State::Active:
m_session->cancel_reconnect_delay();
Expand All @@ -917,7 +914,7 @@ void SyncSession::handle_reconnect()

void SyncSession::log_out()
{
util::CheckedUniqueLock lock(m_state_mutex);
util::CheckedRecursiveLock lock(m_state_mutex);
switch (m_state) {
case State::Active:
case State::Dying:
Expand All @@ -931,11 +928,11 @@ void SyncSession::log_out()

void SyncSession::close()
{
util::CheckedUniqueLock lock(m_state_mutex);
util::CheckedRecursiveLock lock(m_state_mutex);
close(std::move(lock));
}

void SyncSession::close(util::CheckedUniqueLock lock)
void SyncSession::close(util::CheckedRecursiveLock lock)
{
switch (m_state) {
case State::Active: {
Expand All @@ -958,18 +955,10 @@ void SyncSession::close(util::CheckedUniqueLock lock)
m_state_mutex.unlock(lock);
break;
case State::Inactive: {
auto& sync_manager = *m_sync_manager;
auto needs_to_unregister = !m_detaching_from_sync_manager;
m_state_mutex.unlock(lock);
// There is a race if `detach_from_sync_manager()` and `close()` are called at the same time (the
// SyncManager may get deallocated just before unregistering the session). So if at this point
// `detach_from_sync_manager()` was called, we skip unregistration.
// Note: The session will not be unregistered if `detach_from_sync_manager()` is called while the session
// is in its initial state (i.e, `State::Inactive`), but that's not a problem since the SyncManager is
// being destroyed anyway.
if (needs_to_unregister) {
sync_manager.unregister_session(m_db->get_path());
if (m_sync_manager) {
m_sync_manager->unregister_session(m_db->get_path());
}
m_state_mutex.unlock(lock);
break;
}
case State::WaitingForAccessToken:
Expand All @@ -988,7 +977,7 @@ void SyncSession::shutdown_and_wait()
// sync::Client::wait_for_session_terminations_or_client_stopped() in order to wait for the
// Realm file to be closed. This works so long as this SyncSession object remains in the
// `inactive` state after the invocation of shutdown_and_wait().
util::CheckedUniqueLock lock(m_state_mutex);
util::CheckedRecursiveLock lock(m_state_mutex);
if (m_state != State::Inactive) {
become_inactive(std::move(lock));
}
Expand All @@ -998,7 +987,7 @@ void SyncSession::shutdown_and_wait()

void SyncSession::update_access_token(const std::string& signed_token)
{
util::CheckedUniqueLock lock(m_state_mutex);
util::CheckedRecursiveLock lock(m_state_mutex);
// We don't expect there to be a session when waiting for access token, but if there is, refresh its token.
// If not, the latest token will be seeded from SyncUser::access_token() on session creation.
if (m_session) {
Expand Down Expand Up @@ -1037,7 +1026,7 @@ void SyncSession::add_completion_callback(util::UniqueFunction<void(std::error_c
auto self = weak_self.lock();
if (!self)
return;
util::CheckedUniqueLock lock(self->m_state_mutex);
util::CheckedRecursiveLock lock(self->m_state_mutex);
auto callback_node = self->m_completion_callbacks.extract(id);
lock.unlock();
if (callback_node)
Expand All @@ -1047,13 +1036,13 @@ void SyncSession::add_completion_callback(util::UniqueFunction<void(std::error_c

void SyncSession::wait_for_upload_completion(util::UniqueFunction<void(std::error_code)>&& callback)
{
util::CheckedUniqueLock lock(m_state_mutex);
util::CheckedRecursiveLock lock(m_state_mutex);
add_completion_callback(std::move(callback), ProgressDirection::upload);
}

void SyncSession::wait_for_download_completion(util::UniqueFunction<void(std::error_code)>&& callback)
{
util::CheckedUniqueLock lock(m_state_mutex);
util::CheckedRecursiveLock lock(m_state_mutex);
add_completion_callback(std::move(callback), ProgressDirection::download);
}

Expand Down Expand Up @@ -1082,7 +1071,7 @@ SyncSession::~SyncSession() {}

SyncSession::State SyncSession::state() const
{
util::CheckedUniqueLock lock(m_state_mutex);
util::CheckedRecursiveLock lock(m_state_mutex);
return m_state;
}

Expand All @@ -1105,7 +1094,7 @@ const std::shared_ptr<sync::SubscriptionStore>& SyncSession::get_flx_subscriptio
void SyncSession::update_configuration(SyncConfig new_config)
{
while (true) {
util::CheckedUniqueLock state_lock(m_state_mutex);
util::CheckedRecursiveLock state_lock(m_state_mutex);
if (m_state != State::Inactive) {
// Changing the state releases the lock, which means that by the
// time we reacquire the lock the state may have changed again
Expand Down Expand Up @@ -1168,7 +1157,7 @@ std::shared_ptr<SyncSession> SyncSession::existing_external_reference()

void SyncSession::did_drop_external_reference()
{
util::CheckedUniqueLock lock1(m_state_mutex);
util::CheckedRecursiveLock lock1(m_state_mutex);
{
util::CheckedLockGuard lock2(m_external_reference_mutex);

Expand Down
12 changes: 6 additions & 6 deletions src/realm/object-store/sync/sync_session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
void handle_error(SyncError) REQUIRES(!m_state_mutex, !m_config_mutex, !m_connection_state_mutex);
void handle_bad_auth(const std::shared_ptr<SyncUser>& user, std::error_code error_code,
const std::string& context_message) REQUIRES(!m_state_mutex, !m_config_mutex);
void cancel_pending_waits(util::CheckedUniqueLock, std::error_code) RELEASE(m_state_mutex);
void cancel_pending_waits(util::CheckedRecursiveLock, std::error_code) RELEASE(m_state_mutex);
enum class ShouldBackup { yes, no };
void update_error_and_mark_file_for_deletion(SyncError&, ShouldBackup) REQUIRES(m_state_mutex, !m_config_mutex);
void handle_progress_update(uint64_t, uint64_t, uint64_t, uint64_t, uint64_t, uint64_t);
Expand All @@ -345,11 +345,12 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
void did_drop_external_reference()
REQUIRES(!m_state_mutex, !m_config_mutex, !m_external_reference_mutex, !m_connection_state_mutex);
void detach_from_sync_manager() REQUIRES(!m_state_mutex, !m_connection_state_mutex);
void close(util::CheckedUniqueLock) RELEASE(m_state_mutex) REQUIRES(!m_config_mutex, !m_connection_state_mutex);
void close(util::CheckedRecursiveLock) RELEASE(m_state_mutex)
REQUIRES(!m_config_mutex, !m_connection_state_mutex);

void become_active() REQUIRES(m_state_mutex, !m_config_mutex);
void become_dying(util::CheckedUniqueLock) RELEASE(m_state_mutex) REQUIRES(!m_connection_state_mutex);
void become_inactive(util::CheckedUniqueLock, std::error_code ec = {}) RELEASE(m_state_mutex)
void become_dying(util::CheckedRecursiveLock) RELEASE(m_state_mutex) REQUIRES(!m_connection_state_mutex);
void become_inactive(util::CheckedRecursiveLock, std::error_code ec = {}) RELEASE(m_state_mutex)
REQUIRES(!m_connection_state_mutex);
void become_waiting_for_access_token() REQUIRES(m_state_mutex);

Expand All @@ -367,7 +368,7 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {

void assert_mutex_unlocked() ASSERT_CAPABILITY(!m_state_mutex) ASSERT_CAPABILITY(!m_config_mutex) {}

mutable util::CheckedMutex m_state_mutex;
mutable util::CheckedRecursiveMutex m_state_mutex;
mutable util::CheckedMutex m_connection_state_mutex;

State m_state GUARDED_BY(m_state_mutex) = State::Inactive;
Expand All @@ -387,7 +388,6 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
DBRef m_client_reset_fresh_copy GUARDED_BY(m_state_mutex);
_impl::SyncClient& m_client;
SyncManager* m_sync_manager GUARDED_BY(m_state_mutex) = nullptr;
bool m_detaching_from_sync_manager GUARDED_BY(m_state_mutex) = false;

int64_t m_completion_request_counter GUARDED_BY(m_state_mutex) = 0;
CompletionCallbacks m_completion_callbacks GUARDED_BY(m_state_mutex);
Expand Down
Loading

0 comments on commit a8ba34b

Please sign in to comment.