Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a race condition in notifier initialization #4234

Merged
merged 1 commit into from
Dec 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

### Fixed
* <How to hit and notice issue? what was the impact?> ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?)
* None.
* Fix a race condition which would lead to "uncaught exception in notifier thread: N5realm15InvalidTableRefE: transaction_ended" and a crash when the source Realm was closed or invalidated at a very specific time during the first run of a collection notifier ([#3761](https://github.com/realm/realm-core/issues/3761), since v6.0.0).

### Breaking changes
* None.
Expand Down
11 changes: 8 additions & 3 deletions src/realm/db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2146,7 +2146,7 @@ bool DB::grow_reader_mapping(uint_fast32_t index)
}


DB::version_type DB::get_version_of_latest_snapshot()
VersionID DB::get_version_id_of_latest_snapshot()
{
std::lock_guard<std::recursive_mutex> lock(m_mutex);
// As get_version_of_latest_snapshot() may be called outside of the write
Expand All @@ -2170,17 +2170,22 @@ DB::version_type DB::get_version_of_latest_snapshot()
// while we read it.
const Ringbuffer::ReadCount& r = r_info->readers.get(index);
if (!atomic_double_inc_if_even(r.count)) {

continue;
}
version_type version = r.version;
VersionID version{r.version, index};
// release the entry again:
atomic_double_dec(r.count);
return version;
}
}


DB::version_type DB::get_version_of_latest_snapshot()
{
return get_version_id_of_latest_snapshot().version;
}


void DB::low_level_commit(uint_fast64_t new_version, Transaction& transaction)
{
SharedInfo* info = m_file_map.get_addr();
Expand Down
1 change: 1 addition & 0 deletions src/realm/db.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ class DB : public std::enable_shared_from_this<DB> {

/// Returns the version of the latest snapshot.
version_type get_version_of_latest_snapshot();
VersionID get_version_id_of_latest_snapshot();

/// Thrown by start_read() if the specified version does not correspond to a
/// bound (AKA tethered) snapshot.
Expand Down
5 changes: 3 additions & 2 deletions src/realm/object-store/impl/list_notifier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,13 @@ void ListNotifier::do_attach_to(Transaction& sg)
m_list = obj.get_listbase_ptr(m_col);
}
catch (const KeyNotFound&) {
m_list = nullptr;
}
}

bool ListNotifier::do_add_required_change_info(TransactionChangeInfo& info)
{
if (!m_list->is_attached())
if (!m_list || !m_list->is_attached())
return false; // origin row was deleted after the notification was added

info.lists.push_back({m_table, m_obj.value, m_col.value, &m_change});
Expand All @@ -68,7 +69,7 @@ bool ListNotifier::do_add_required_change_info(TransactionChangeInfo& info)

void ListNotifier::run()
{
if (!m_list->is_attached()) {
if (!m_list || !m_list->is_attached()) {
// List was deleted, so report all of the rows being removed if this is
// the first run after that
if (m_prev_size) {
Expand Down
109 changes: 50 additions & 59 deletions src/realm/object-store/impl/realm_coordinator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -757,23 +757,15 @@ void RealmCoordinator::wait_for_change_release()
m_db->wait_for_change_release();
}

void RealmCoordinator::pin_version(VersionID versionid)
{
if (m_async_error)
return;
if (!m_advancer_sg || versionid < m_advancer_sg->get_version_of_current_transaction())
m_advancer_sg = m_db->start_read(versionid);
}

// Thread-safety analsys doesn't reasonably handle calling functions on different
// instances of this type
void RealmCoordinator::register_notifier(std::shared_ptr<CollectionNotifier> notifier) NO_THREAD_SAFETY_ANALYSIS
{
auto version = notifier->version();
auto& self = Realm::Internal::get_coordinator(*notifier->get_realm());
{
util::CheckedLockGuard lock(self.m_notifier_mutex);
self.pin_version(version);
if (!self.m_async_error)
notifier->attach_to(notifier->get_realm()->duplicate());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is the core of the fix.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this ended up being the only change actually needed to fix the bug. The changes in run_async_notifiers() are left over from a different attempt at fixing the bug that failed to fix it but did bring a functional benefit (of not running new notifiers with the lock held).

self.m_new_notifiers.push_back(std::move(notifier));
}
}
Expand Down Expand Up @@ -803,9 +795,7 @@ void RealmCoordinator::clean_up_dead_notifiers()
m_notifier_sg = nullptr;
m_notifier_skip_version = {0, 0};
}
if (swap_remove(m_new_notifiers) && m_new_notifiers.empty()) {
m_advancer_sg = nullptr;
}
swap_remove(m_new_notifiers);
}

void RealmCoordinator::on_change()
Expand Down Expand Up @@ -947,49 +937,70 @@ void RealmCoordinator::run_async_notifiers()
}

VersionID version;
auto skip_version = m_notifier_skip_version;
m_notifier_skip_version = {0, 0};

// Make a copy of the notifiers vector and then release the lock to avoid
// blocking other threads trying to register or unregister notifiers while we run them
decltype(m_notifiers) notifiers;
if (version != m_notifier_sg->get_version_of_current_transaction()) {
// We only want to rerun the existing notifiers if the version has changed.
// This is both a minor optimization and required for notification
// skipping to work. The skip logic assumes that the notifier can't be
// running when suppress_next() is called because it can only be called
// from within a write transaction, and starting the write transaction
// would have blocked until the notifier is done running. However,
// on_change() can be triggered by things other than writes, so we may
// be here even if the notifiers don't need to rerun.
notifiers = m_notifiers;
}

// Advance all of the new notifiers to the most recent version, if any
auto new_notifiers = std::move(m_new_notifiers);
IncrementalChangeInfo new_notifier_change_info(*m_advancer_sg, new_notifiers);
auto advancer_sg = std::move(m_advancer_sg);
m_new_notifiers.clear();
m_notifiers.insert(m_notifiers.end(), new_notifiers.begin(), new_notifiers.end());

lock.unlock();

// Advance all of the new notifiers to the most recent version, if any
TransactionRef new_notifier_transaction;
util::Optional<IncrementalChangeInfo> new_notifier_change_info;
if (!new_notifiers.empty()) {
REALM_ASSERT(advancer_sg);
REALM_ASSERT_3(advancer_sg->get_version_of_current_transaction().version, <=,
new_notifiers.front()->version().version);

// The advancer SG can be at an older version than the oldest new notifier
// if a notifier was added and then removed before it ever got the chance
// to run, as we don't move the pin forward when removing dead notifiers
transaction::advance(*advancer_sg, nullptr, new_notifiers.front()->version());

// Advance each of the new notifiers to the latest version, attaching them
// to the SG at their handover version. This requires a unique
// TransactionChangeInfo for each source version, so that things don't
// see changes from before the version they were handed over from.
// Each Info has all of the changes between that source version and the
// next source version, and they'll be merged together later after
// releasing the lock
// Starting from the oldest notifier, incrementally advance the notifiers
// to the latest version, attaching each new notifier as we reach its
// source version. Suppose three new notifiers have been created:
// - Notifier A has a source version of 2
// - Notifier B has a source version of 7
// - Notifier C has a source version of 5
// Notifier A wants the changes from versions 2-latest, B wants 7-latest,
// and C wants 5-latest. We achieve this by starting at version 2 and
// attaching A, then advancing to version 5 (letting A gather changes
// from 2-5). We then attach C and advance to 7, then attach B and advance
// to the latest.
std::sort(new_notifiers.begin(), new_notifiers.end(), [](auto& a, auto& b) {
return a->version() < b->version();
});
new_notifier_transaction = m_db->start_read(new_notifiers.front()->version());

new_notifier_change_info.emplace(*new_notifier_transaction, new_notifiers);
for (auto& notifier : new_notifiers) {
new_notifier_change_info.advance_incremental(notifier->version());
notifier->attach_to(advancer_sg);
notifier->add_required_change_info(new_notifier_change_info.current());
new_notifier_change_info->advance_incremental(notifier->version());
notifier->attach_to(new_notifier_transaction);
notifier->add_required_change_info(new_notifier_change_info->current());
}
new_notifier_change_info.advance_to_final(VersionID{});
new_notifier_change_info->advance_to_final(VersionID{});

// We want to advance the non-new notifiers to the same version as the
// new notifiers to avoid having to merge changes from any new
// transaction that happen immediately after this into the new notifier
// changes
version = advancer_sg->get_version_of_current_transaction();
version = new_notifier_transaction->get_version_of_current_transaction();
}
else {
// If we have no new notifiers we want to just advance to the latest
// version, but we have to pick a "latest" version while holding the
// notifier lock to avoid advancing over a transaction which should be
// skipped
// FIXME: this is comically slow
version = m_db->start_read()->get_version_of_current_transaction();
version = m_db->get_version_id_of_latest_snapshot();
if (version == m_notifier_sg->get_version_of_current_transaction()) {
// We were spuriously woken up and there isn't actually anything to do
REALM_ASSERT(!m_notifier_skip_version.version);
Expand All @@ -998,26 +1009,6 @@ void RealmCoordinator::run_async_notifiers()
}
}

auto skip_version = m_notifier_skip_version;
m_notifier_skip_version = {0, 0};

// Make a copy of the notifiers vector and then release the lock to avoid
// blocking other threads trying to register or unregister notifiers while we run them
decltype(m_notifiers) notifiers;
if (version != m_notifier_sg->get_version_of_current_transaction()) {
// We only want to rerun the existing notifiers if the version has changed.
// This is both a minor optimization and required for notification
// skipping to work. The skip logic assumes that the notifier can't be
// running when suppress_next() is called because it can only be called
// from within a write transaction, and starting the write transaction
// would have blocked until the notifier is done running. However,
// on_change() can be triggered by things other than writes, so we may
// be here even if the notifiers don't need to rerun.
notifiers = m_notifiers;
}
m_notifiers.insert(m_notifiers.end(), new_notifiers.begin(), new_notifiers.end());
lock.unlock();

if (skip_version.version) {
REALM_ASSERT(!notifiers.empty());
REALM_ASSERT(version >= skip_version);
Expand Down
6 changes: 0 additions & 6 deletions src/realm/object-store/impl/realm_coordinator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,6 @@ class RealmCoordinator : public std::enable_shared_from_this<RealmCoordinator> {
// Will have a read transaction iff m_notifiers is non-empty
std::shared_ptr<Transaction> m_notifier_sg;

// Transaction used to advance notifiers in m_new_notifiers to the main shared
// group's transaction version
// Will have a read transaction iff m_new_notifiers is non-empty
std::shared_ptr<Transaction> m_advancer_sg;
std::exception_ptr m_async_error;

std::unique_ptr<_impl::ExternalCommitHelper> m_notifier;
Expand All @@ -248,8 +244,6 @@ class RealmCoordinator : public std::enable_shared_from_this<RealmCoordinator> {

void open_db();

void pin_version(VersionID version) REQUIRES(m_notifier_mutex);

void set_config(const Realm::Config&) REQUIRES(m_realm_mutex, !m_schema_cache_mutex);
void create_sync_session(bool force_client_resync);
std::shared_ptr<Realm> do_get_cached_realm(Realm::Config const& config,
Expand Down