Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite the local changesets in-place for client reset recovery #7161

Merged
merged 9 commits into from
Dec 5, 2023
11 changes: 10 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,20 @@

### Enhancements
* <New feature description> (PR [#????](https://github.com/realm/realm-core/pull/????))
* None.
* Automatic client reset recovery now preserves the original division of changesets, rather than combining all unsynchronized changes into a single changeset ([PR #7161](https://github.com/realm/realm-core/pull/7161)).
* Automatic client reset recovery now does a better job of recovering changes when changesets were downloaded from the server after the unuploaded local changes were committed. If the local Realm happened to be fully up to date with the server prior to the client reset, automatic recovery should now always produce exactly the same state as if no client reset was involved ([PR #7161](https://github.com/realm/realm-core/pull/7161)).

### Fixed
* <How do the end-user experience this issue? what was the impact?> ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?)
* Update existing std exceptions thrown by the Sync Client to use Realm exceptions. ([#6255](https://github.com/realm/realm-core/issues/6255), since v10.2.0)
* Automatic client reset recovery on flexible sync Realms would apply recovered changes in multiple write transactions, releasing the write lock in between. This had several observable negative effects:
- Other threads reading from the Realm while a client reset was in progress could observe invalid mid-reset state.
- Other threads could potentially write in the middle of a client reset, resulting in history diverging from the server.
- The change notifications produced by client resets were not minimal and would report that some things changed which actually didn't.
- All pending subscriptions were marked as Superseded and then recreating, resulting in anything waiting for subscriptions to complete firing early.
([PR #7161](https://github.com/realm/realm-core/pull/7161), since v12.3.0).
* If the very first open of a flexible sync Realm triggered a client reset, the configuration had an initial subscriptions callback, both before and after reset callbacks, and the initial subscription callback began a read transaction without ending it (which is normally going to be the case), opening the frozen Realm for the after reset callback would trigger a BadVersion exception ([PR #7161](https://github.com/realm/realm-core/pull/7161), since v12.3.0).


### Breaking changes
* Update existing std exceptions thrown by the Sync Client to use Realm exceptions. ([PR #7141](https://github.com/realm/realm-core/pull/7141/files))
Expand Down
4 changes: 3 additions & 1 deletion src/realm/object-store/impl/realm_coordinator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -389,8 +389,10 @@ void RealmCoordinator::do_get_realm(RealmConfig&& config, std::shared_ptr<Realm>
if (!first_time_open)
first_time_open = db_created;
if (subscription_version == 0 || (first_time_open && rerun_on_open)) {
// if the tasks is cancelled, the subscription may or may not be run.
bool was_in_read = realm->is_in_read_transaction();
subscription_function(realm);
if (!was_in_read)
realm->invalidate();
}
}
#endif
Expand Down
88 changes: 36 additions & 52 deletions src/realm/object-store/sync/sync_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -510,49 +510,45 @@ void SyncSession::download_fresh_realm(sync::ProtocolErrorInfo::Action server_re
auto fresh_sub = fresh_sub_store->get_latest();
// The local realm uses flexible sync as well so copy the active subscription set to the fresh realm.
if (auto local_subs_store = m_flx_subscription_store) {
sync::SubscriptionSet active = local_subs_store->get_active();
auto fresh_mut_sub = fresh_sub.make_mutable_copy();
fresh_mut_sub.import(active);
fresh_mut_sub.import(local_subs_store->get_active());
fresh_sub = fresh_mut_sub.commit();
}
fresh_sub.get_state_change_notification(sync::SubscriptionSet::State::Complete)
.then([=, weak_self = weak_from_this()](sync::SubscriptionSet::State state) {

auto self = shared_from_this();
using SubscriptionState = sync::SubscriptionSet::State;
fresh_sub.get_state_change_notification(SubscriptionState::Complete)
.then([=](SubscriptionState) -> util::Future<sync::SubscriptionSet> {
if (server_requests_action != sync::ProtocolErrorInfo::Action::MigrateToFLX) {
return util::Future<sync::SubscriptionSet::State>::make_ready(state);
return fresh_sub;
}
auto strong_self = weak_self.lock();
if (!strong_self || !strong_self->m_migration_store->is_migration_in_progress()) {
return util::Future<sync::SubscriptionSet::State>::make_ready(state);
if (!self->m_migration_store->is_migration_in_progress()) {
return fresh_sub;
}

// fresh_sync_session is using a new realm file that doesn't have the migration_store info
// so the query string from the local migration store will need to be provided
auto query_string = strong_self->m_migration_store->get_query_string();
auto query_string = self->m_migration_store->get_query_string();
REALM_ASSERT(query_string);
// Create subscriptions in the fresh realm based on the schema instructions received in the bootstrap
// message.
fresh_sync_session->m_migration_store->create_subscriptions(*fresh_sub_store, *query_string);
auto latest_subs = fresh_sub_store->get_latest();
{
util::CheckedLockGuard lock(strong_self->m_state_mutex);
// Save a copy of the subscriptions so we add them to the local realm once the
// subscription store is created.
strong_self->m_active_subscriptions_after_migration = latest_subs;
}

return latest_subs.get_state_change_notification(sync::SubscriptionSet::State::Complete);
return fresh_sub_store->get_latest()
.get_state_change_notification(SubscriptionState::Complete)
.then([=](SubscriptionState) {
return fresh_sub_store->get_latest();
});
})
.get_async([=, weak_self = weak_from_this()](StatusWith<sync::SubscriptionSet::State> s) {
.get_async([=](StatusWith<sync::SubscriptionSet>&& subs) {
// Keep the sync session alive while it's downloading, but then close
// it immediately
fresh_sync_session->force_close();
if (auto strong_self = weak_self.lock()) {
if (s.is_ok()) {
strong_self->handle_fresh_realm_downloaded(db, Status::OK(), server_requests_action);
}
else {
strong_self->handle_fresh_realm_downloaded(nullptr, s.get_status(), server_requests_action);
}
if (subs.is_ok()) {
self->handle_fresh_realm_downloaded(db, Status::OK(), server_requests_action,
std::move(subs.get_value()));
}
else {
self->handle_fresh_realm_downloaded(nullptr, subs.get_status(), server_requests_action);
}
});
}
Expand All @@ -570,7 +566,8 @@ void SyncSession::download_fresh_realm(sync::ProtocolErrorInfo::Action server_re
}

void SyncSession::handle_fresh_realm_downloaded(DBRef db, Status status,
sync::ProtocolErrorInfo::Action server_requests_action)
sync::ProtocolErrorInfo::Action server_requests_action,
std::optional<sync::SubscriptionSet> new_subs)
{
util::CheckedUniqueLock lock(m_state_mutex);
if (m_state != State::Active) {
Expand Down Expand Up @@ -625,7 +622,7 @@ void SyncSession::handle_fresh_realm_downloaded(DBRef db, Status status,
server_requests_action == sync::ProtocolErrorInfo::Action::RevertToPBS) {
apply_sync_config_after_migration_or_rollback();
auto flx_sync_requested = config(&SyncConfig::flx_sync_requested);
update_subscription_store(flx_sync_requested);
update_subscription_store(flx_sync_requested, std::move(new_subs));
}
}
revive_if_needed();
Expand Down Expand Up @@ -843,12 +840,13 @@ static sync::Session::Config::ClientReset make_client_reset_config(const RealmCo
// to the user here. Note that the schema changes made here will be considered
// an "offline write" to be recovered if this is recovery mode.
auto before = Realm::get_shared_realm(config);
before->read_group();
if (auto& notify_before = config.sync_config->notify_before_client_reset) {
notify_before(config.sync_config->freeze_before_reset_realm ? before->freeze() : before);
}
// Note that if the SDK requested a live Realm this may be a different
// version than what we had before calling the callback.
// Note that if the SDK wrote to the Realm (hopefully by requesting a
// live instance and not opening a secondary one), this may be a
// different version than what we had before calling the callback.
before->refresh();
return before->read_transaction_version();
};

Expand Down Expand Up @@ -1320,7 +1318,7 @@ void SyncSession::save_sync_config_after_migration_or_rollback()
m_migrated_sync_config = m_migration_store->convert_sync_config(m_original_sync_config);
}

void SyncSession::update_subscription_store(bool flx_sync_requested)
void SyncSession::update_subscription_store(bool flx_sync_requested, std::optional<sync::SubscriptionSet> new_subs)
{
util::CheckedUniqueLock lock(m_state_mutex);

Expand Down Expand Up @@ -1350,11 +1348,15 @@ void SyncSession::update_subscription_store(bool flx_sync_requested)
create_subscription_store();

std::weak_ptr<sync::SubscriptionStore> weak_sub_mgr(m_flx_subscription_store);
lock.unlock();

// If migrated to FLX, create subscriptions in the local realm to cover the existing data.
// This needs to be done before setting the write validator to avoid NoSubscriptionForWrite errors.
make_active_subscription_set();
if (new_subs) {
auto active_mut_sub = m_flx_subscription_store->get_active().make_mutable_copy();
active_mut_sub.import(std::move(*new_subs));
active_mut_sub.set_state(sync::SubscriptionSet::State::Complete);
active_mut_sub.commit();
}

auto tr = m_db->start_write();
set_write_validator_factory(weak_sub_mgr);
Expand Down Expand Up @@ -1601,21 +1603,3 @@ util::Future<std::string> SyncSession::send_test_command(std::string body)

return m_session->send_test_command(std::move(body));
}

void SyncSession::make_active_subscription_set()
{
util::CheckedUniqueLock lock(m_state_mutex);

if (!m_active_subscriptions_after_migration)
return;

REALM_ASSERT(m_flx_subscription_store);

// Create subscription set from the subscriptions used to download the fresh realm after migration.
auto active_mut_sub = m_flx_subscription_store->get_active().make_mutable_copy();
active_mut_sub.import(*m_active_subscriptions_after_migration);
active_mut_sub.update_state(sync::SubscriptionSet::State::Complete);
active_mut_sub.commit();

m_active_subscriptions_after_migration.reset();
}
12 changes: 5 additions & 7 deletions src/realm/object-store/sync/sync_session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
return session.send_test_command(std::move(request));
}

static sync::SaltedFileIdent get_file_ident(SyncSession& session)
static sync::SaltedFileIdent get_file_ident(const SyncSession& session)
{
return session.get_file_ident();
}
Expand Down Expand Up @@ -365,7 +365,8 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
SyncSession(_impl::SyncClient&, std::shared_ptr<DB>, const RealmConfig&, SyncManager* sync_manager);

// Initialize or tear down the subscription store based on whether or not flx_sync_requested is true
void update_subscription_store(bool flx_sync_requested) REQUIRES(!m_state_mutex);
void update_subscription_store(bool flx_sync_requested, std::optional<sync::SubscriptionSet> new_subs)
REQUIRES(!m_state_mutex);
void create_subscription_store() REQUIRES(m_state_mutex);
void set_write_validator_factory(std::weak_ptr<sync::SubscriptionStore> weak_sub_mgr);
// Update the sync config after a PBS->FLX migration or FLX->PBS rollback occurs
Expand All @@ -375,7 +376,8 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
void download_fresh_realm(sync::ProtocolErrorInfo::Action server_requests_action)
REQUIRES(!m_config_mutex, !m_state_mutex, !m_connection_state_mutex);
void handle_fresh_realm_downloaded(DBRef db, Status status,
sync::ProtocolErrorInfo::Action server_requests_action)
sync::ProtocolErrorInfo::Action server_requests_action,
std::optional<sync::SubscriptionSet> new_subs = std::nullopt)
REQUIRES(!m_state_mutex, !m_config_mutex, !m_connection_state_mutex);
void handle_error(sync::SessionErrorInfo) REQUIRES(!m_state_mutex, !m_config_mutex, !m_connection_state_mutex);
void handle_bad_auth(const std::shared_ptr<SyncUser>& user, Status status)
Expand Down Expand Up @@ -433,9 +435,6 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {

void assert_mutex_unlocked() ASSERT_CAPABILITY(!m_state_mutex) ASSERT_CAPABILITY(!m_config_mutex) {}

// Create active subscription set after PBS -> FLX migration to cover the data.
void make_active_subscription_set() REQUIRES(!m_state_mutex);

// Return the subscription_store_base - to be used only for testing
std::shared_ptr<sync::SubscriptionStore> get_subscription_store_base() REQUIRES(!m_state_mutex);

Expand All @@ -458,7 +457,6 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
// m_flx_subscription_store will either point to m_subscription_store_base if currently using FLX
// or set to nullptr if currently using PBS (mutable for client PBS->FLX migration)
std::shared_ptr<sync::SubscriptionStore> m_flx_subscription_store GUARDED_BY(m_state_mutex);
std::optional<sync::SubscriptionSet> m_active_subscriptions_after_migration GUARDED_BY(m_state_mutex);
// Original sync config for reverting back to PBS if FLX migration is rolled back
const std::shared_ptr<SyncConfig> m_original_sync_config; // does not change after construction
std::shared_ptr<SyncConfig> m_migrated_sync_config GUARDED_BY(m_config_mutex);
Expand Down
4 changes: 3 additions & 1 deletion src/realm/sync/changeset_encoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,9 @@ void ChangesetEncoder::append_value(Decimal128 id)
auto ChangesetEncoder::release() noexcept -> Buffer
{
m_intern_strings_rev.clear();
return std::move(m_buffer);
Buffer buffer;
std::swap(buffer, m_buffer);
return buffer;
}

void ChangesetEncoder::reset() noexcept
Expand Down
12 changes: 3 additions & 9 deletions src/realm/sync/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1176,9 +1176,7 @@ bool SessionWrapper::has_flx_subscription_store() const
void SessionWrapper::on_flx_sync_error(int64_t version, std::string_view err_msg)
{
REALM_ASSERT(!m_finalized);
auto mut_subs = get_flx_subscription_store()->get_mutable_by_version(version);
mut_subs.update_state(SubscriptionSet::State::Error, err_msg);
mut_subs.commit();
get_flx_subscription_store()->update_state(version, SubscriptionSet::State::Error, err_msg);
}

void SessionWrapper::on_flx_sync_version_complete(int64_t version)
Expand Down Expand Up @@ -1227,9 +1225,7 @@ void SessionWrapper::on_flx_sync_progress(int64_t new_version, DownloadBatchStat
break;
}

auto mut_subs = get_flx_subscription_store()->get_mutable_by_version(new_version);
mut_subs.update_state(new_state);
mut_subs.commit();
get_flx_subscription_store()->update_state(new_version, new_state);
}

SubscriptionStore* SessionWrapper::get_flx_subscription_store()
Expand Down Expand Up @@ -1687,9 +1683,7 @@ void SessionWrapper::on_download_completion()
if (m_flx_subscription_store && m_flx_pending_mark_version != SubscriptionSet::EmptyVersion) {
m_sess->logger.debug("Marking query version %1 as complete after receiving MARK message",
m_flx_pending_mark_version);
auto mutable_subs = m_flx_subscription_store->get_mutable_by_version(m_flx_pending_mark_version);
mutable_subs.update_state(SubscriptionSet::State::Complete);
mutable_subs.commit();
m_flx_subscription_store->update_state(m_flx_pending_mark_version, SubscriptionSet::State::Complete);
m_flx_pending_mark_version = SubscriptionSet::EmptyVersion;
}

Expand Down
4 changes: 1 addition & 3 deletions src/realm/sync/instruction_applier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -941,11 +941,9 @@ bool InstructionApplier::check_links_exist(const Instruction::Payload& payload)
[&](InternString interned_pk) {
return Mixed{get_string(interned_pk)};
},
[&](GlobalKey) {
[&](GlobalKey) -> Mixed {
bad_transaction_log(
"Unexpected link to embedded object while validating a primary key");
return Mixed{}; // appease the compiler; visitors must have a single
// return type
},
[&](ObjectId pk) {
return Mixed{pk};
Expand Down
Loading