diff --git a/CHANGELOG.md b/CHANGELOG.md index 201f1a75d84..85f2a0cc03b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,11 +2,20 @@ ### Enhancements * (PR [#????](https://github.com/realm/realm-core/pull/????)) -* None. +* Automatic client reset recovery now preserves the original division of changesets, rather than combining all unsynchronized changes into a single changeset ([PR #7161](https://github.com/realm/realm-core/pull/7161)). +* Automatic client reset recovery now does a better job of recovering changes when changesets were downloaded from the server after the unuploaded local changes were committed. If the local Realm happened to be fully up to date with the server prior to the client reset, automatic recovery should now always produce exactly the same state as if no client reset was involved ([PR #7161](https://github.com/realm/realm-core/pull/7161)). ### Fixed * ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?) * Update existing std exceptions thrown by the Sync Client to use Realm exceptions. ([#6255](https://github.com/realm/realm-core/issues/6255), since v10.2.0) +* Automatic client reset recovery on flexible sync Realms would apply recovered changes in multiple write transactions, releasing the write lock in between. This had several observable negative effects: + - Other threads reading from the Realm while a client reset was in progress could observe invalid mid-reset state. + - Other threads could potentially write in the middle of a client reset, resulting in history diverging from the server. + - The change notifications produced by client resets were not minimal and would report that some things changed which actually didn't. + - All pending subscriptions were marked as Superseded and then recreating, resulting in anything waiting for subscriptions to complete firing early. + ([PR #7161](https://github.com/realm/realm-core/pull/7161), since v12.3.0). +* If the very first open of a flexible sync Realm triggered a client reset, the configuration had an initial subscriptions callback, both before and after reset callbacks, and the initial subscription callback began a read transaction without ending it (which is normally going to be the case), opening the frozen Realm for the after reset callback would trigger a BadVersion exception ([PR #7161](https://github.com/realm/realm-core/pull/7161), since v12.3.0). + ### Breaking changes * Update existing std exceptions thrown by the Sync Client to use Realm exceptions. ([PR #7141](https://github.com/realm/realm-core/pull/7141/files)) diff --git a/src/realm/object-store/impl/realm_coordinator.cpp b/src/realm/object-store/impl/realm_coordinator.cpp index 69c2f674077..782b5bf27a8 100644 --- a/src/realm/object-store/impl/realm_coordinator.cpp +++ b/src/realm/object-store/impl/realm_coordinator.cpp @@ -389,8 +389,10 @@ void RealmCoordinator::do_get_realm(RealmConfig&& config, std::shared_ptr if (!first_time_open) first_time_open = db_created; if (subscription_version == 0 || (first_time_open && rerun_on_open)) { - // if the tasks is cancelled, the subscription may or may not be run. + bool was_in_read = realm->is_in_read_transaction(); subscription_function(realm); + if (!was_in_read) + realm->invalidate(); } } #endif diff --git a/src/realm/object-store/sync/sync_session.cpp b/src/realm/object-store/sync/sync_session.cpp index 98336d6a5e0..aa08066fb82 100644 --- a/src/realm/object-store/sync/sync_session.cpp +++ b/src/realm/object-store/sync/sync_session.cpp @@ -510,49 +510,45 @@ void SyncSession::download_fresh_realm(sync::ProtocolErrorInfo::Action server_re auto fresh_sub = fresh_sub_store->get_latest(); // The local realm uses flexible sync as well so copy the active subscription set to the fresh realm. if (auto local_subs_store = m_flx_subscription_store) { - sync::SubscriptionSet active = local_subs_store->get_active(); auto fresh_mut_sub = fresh_sub.make_mutable_copy(); - fresh_mut_sub.import(active); + fresh_mut_sub.import(local_subs_store->get_active()); fresh_sub = fresh_mut_sub.commit(); } - fresh_sub.get_state_change_notification(sync::SubscriptionSet::State::Complete) - .then([=, weak_self = weak_from_this()](sync::SubscriptionSet::State state) { + + auto self = shared_from_this(); + using SubscriptionState = sync::SubscriptionSet::State; + fresh_sub.get_state_change_notification(SubscriptionState::Complete) + .then([=](SubscriptionState) -> util::Future { if (server_requests_action != sync::ProtocolErrorInfo::Action::MigrateToFLX) { - return util::Future::make_ready(state); + return fresh_sub; } - auto strong_self = weak_self.lock(); - if (!strong_self || !strong_self->m_migration_store->is_migration_in_progress()) { - return util::Future::make_ready(state); + if (!self->m_migration_store->is_migration_in_progress()) { + return fresh_sub; } // fresh_sync_session is using a new realm file that doesn't have the migration_store info // so the query string from the local migration store will need to be provided - auto query_string = strong_self->m_migration_store->get_query_string(); + auto query_string = self->m_migration_store->get_query_string(); REALM_ASSERT(query_string); // Create subscriptions in the fresh realm based on the schema instructions received in the bootstrap // message. fresh_sync_session->m_migration_store->create_subscriptions(*fresh_sub_store, *query_string); - auto latest_subs = fresh_sub_store->get_latest(); - { - util::CheckedLockGuard lock(strong_self->m_state_mutex); - // Save a copy of the subscriptions so we add them to the local realm once the - // subscription store is created. - strong_self->m_active_subscriptions_after_migration = latest_subs; - } - - return latest_subs.get_state_change_notification(sync::SubscriptionSet::State::Complete); + return fresh_sub_store->get_latest() + .get_state_change_notification(SubscriptionState::Complete) + .then([=](SubscriptionState) { + return fresh_sub_store->get_latest(); + }); }) - .get_async([=, weak_self = weak_from_this()](StatusWith s) { + .get_async([=](StatusWith&& subs) { // Keep the sync session alive while it's downloading, but then close // it immediately fresh_sync_session->force_close(); - if (auto strong_self = weak_self.lock()) { - if (s.is_ok()) { - strong_self->handle_fresh_realm_downloaded(db, Status::OK(), server_requests_action); - } - else { - strong_self->handle_fresh_realm_downloaded(nullptr, s.get_status(), server_requests_action); - } + if (subs.is_ok()) { + self->handle_fresh_realm_downloaded(db, Status::OK(), server_requests_action, + std::move(subs.get_value())); + } + else { + self->handle_fresh_realm_downloaded(nullptr, subs.get_status(), server_requests_action); } }); } @@ -570,7 +566,8 @@ void SyncSession::download_fresh_realm(sync::ProtocolErrorInfo::Action server_re } void SyncSession::handle_fresh_realm_downloaded(DBRef db, Status status, - sync::ProtocolErrorInfo::Action server_requests_action) + sync::ProtocolErrorInfo::Action server_requests_action, + std::optional new_subs) { util::CheckedUniqueLock lock(m_state_mutex); if (m_state != State::Active) { @@ -625,7 +622,7 @@ void SyncSession::handle_fresh_realm_downloaded(DBRef db, Status status, server_requests_action == sync::ProtocolErrorInfo::Action::RevertToPBS) { apply_sync_config_after_migration_or_rollback(); auto flx_sync_requested = config(&SyncConfig::flx_sync_requested); - update_subscription_store(flx_sync_requested); + update_subscription_store(flx_sync_requested, std::move(new_subs)); } } revive_if_needed(); @@ -843,12 +840,13 @@ static sync::Session::Config::ClientReset make_client_reset_config(const RealmCo // to the user here. Note that the schema changes made here will be considered // an "offline write" to be recovered if this is recovery mode. auto before = Realm::get_shared_realm(config); - before->read_group(); if (auto& notify_before = config.sync_config->notify_before_client_reset) { notify_before(config.sync_config->freeze_before_reset_realm ? before->freeze() : before); } - // Note that if the SDK requested a live Realm this may be a different - // version than what we had before calling the callback. + // Note that if the SDK wrote to the Realm (hopefully by requesting a + // live instance and not opening a secondary one), this may be a + // different version than what we had before calling the callback. + before->refresh(); return before->read_transaction_version(); }; @@ -1320,7 +1318,7 @@ void SyncSession::save_sync_config_after_migration_or_rollback() m_migrated_sync_config = m_migration_store->convert_sync_config(m_original_sync_config); } -void SyncSession::update_subscription_store(bool flx_sync_requested) +void SyncSession::update_subscription_store(bool flx_sync_requested, std::optional new_subs) { util::CheckedUniqueLock lock(m_state_mutex); @@ -1350,11 +1348,15 @@ void SyncSession::update_subscription_store(bool flx_sync_requested) create_subscription_store(); std::weak_ptr weak_sub_mgr(m_flx_subscription_store); - lock.unlock(); // If migrated to FLX, create subscriptions in the local realm to cover the existing data. // This needs to be done before setting the write validator to avoid NoSubscriptionForWrite errors. - make_active_subscription_set(); + if (new_subs) { + auto active_mut_sub = m_flx_subscription_store->get_active().make_mutable_copy(); + active_mut_sub.import(std::move(*new_subs)); + active_mut_sub.set_state(sync::SubscriptionSet::State::Complete); + active_mut_sub.commit(); + } auto tr = m_db->start_write(); set_write_validator_factory(weak_sub_mgr); @@ -1601,21 +1603,3 @@ util::Future SyncSession::send_test_command(std::string body) return m_session->send_test_command(std::move(body)); } - -void SyncSession::make_active_subscription_set() -{ - util::CheckedUniqueLock lock(m_state_mutex); - - if (!m_active_subscriptions_after_migration) - return; - - REALM_ASSERT(m_flx_subscription_store); - - // Create subscription set from the subscriptions used to download the fresh realm after migration. - auto active_mut_sub = m_flx_subscription_store->get_active().make_mutable_copy(); - active_mut_sub.import(*m_active_subscriptions_after_migration); - active_mut_sub.update_state(sync::SubscriptionSet::State::Complete); - active_mut_sub.commit(); - - m_active_subscriptions_after_migration.reset(); -} diff --git a/src/realm/object-store/sync/sync_session.hpp b/src/realm/object-store/sync/sync_session.hpp index d6035cc3af4..be15d157202 100644 --- a/src/realm/object-store/sync/sync_session.hpp +++ b/src/realm/object-store/sync/sync_session.hpp @@ -305,7 +305,7 @@ class SyncSession : public std::enable_shared_from_this { return session.send_test_command(std::move(request)); } - static sync::SaltedFileIdent get_file_ident(SyncSession& session) + static sync::SaltedFileIdent get_file_ident(const SyncSession& session) { return session.get_file_ident(); } @@ -365,7 +365,8 @@ class SyncSession : public std::enable_shared_from_this { SyncSession(_impl::SyncClient&, std::shared_ptr, const RealmConfig&, SyncManager* sync_manager); // Initialize or tear down the subscription store based on whether or not flx_sync_requested is true - void update_subscription_store(bool flx_sync_requested) REQUIRES(!m_state_mutex); + void update_subscription_store(bool flx_sync_requested, std::optional new_subs) + REQUIRES(!m_state_mutex); void create_subscription_store() REQUIRES(m_state_mutex); void set_write_validator_factory(std::weak_ptr weak_sub_mgr); // Update the sync config after a PBS->FLX migration or FLX->PBS rollback occurs @@ -375,7 +376,8 @@ class SyncSession : public std::enable_shared_from_this { void download_fresh_realm(sync::ProtocolErrorInfo::Action server_requests_action) REQUIRES(!m_config_mutex, !m_state_mutex, !m_connection_state_mutex); void handle_fresh_realm_downloaded(DBRef db, Status status, - sync::ProtocolErrorInfo::Action server_requests_action) + sync::ProtocolErrorInfo::Action server_requests_action, + std::optional new_subs = std::nullopt) REQUIRES(!m_state_mutex, !m_config_mutex, !m_connection_state_mutex); void handle_error(sync::SessionErrorInfo) REQUIRES(!m_state_mutex, !m_config_mutex, !m_connection_state_mutex); void handle_bad_auth(const std::shared_ptr& user, Status status) @@ -433,9 +435,6 @@ class SyncSession : public std::enable_shared_from_this { void assert_mutex_unlocked() ASSERT_CAPABILITY(!m_state_mutex) ASSERT_CAPABILITY(!m_config_mutex) {} - // Create active subscription set after PBS -> FLX migration to cover the data. - void make_active_subscription_set() REQUIRES(!m_state_mutex); - // Return the subscription_store_base - to be used only for testing std::shared_ptr get_subscription_store_base() REQUIRES(!m_state_mutex); @@ -458,7 +457,6 @@ class SyncSession : public std::enable_shared_from_this { // m_flx_subscription_store will either point to m_subscription_store_base if currently using FLX // or set to nullptr if currently using PBS (mutable for client PBS->FLX migration) std::shared_ptr m_flx_subscription_store GUARDED_BY(m_state_mutex); - std::optional m_active_subscriptions_after_migration GUARDED_BY(m_state_mutex); // Original sync config for reverting back to PBS if FLX migration is rolled back const std::shared_ptr m_original_sync_config; // does not change after construction std::shared_ptr m_migrated_sync_config GUARDED_BY(m_config_mutex); diff --git a/src/realm/sync/changeset_encoder.cpp b/src/realm/sync/changeset_encoder.cpp index 2f89739447a..ceeecd4442d 100644 --- a/src/realm/sync/changeset_encoder.cpp +++ b/src/realm/sync/changeset_encoder.cpp @@ -403,7 +403,9 @@ void ChangesetEncoder::append_value(Decimal128 id) auto ChangesetEncoder::release() noexcept -> Buffer { m_intern_strings_rev.clear(); - return std::move(m_buffer); + Buffer buffer; + std::swap(buffer, m_buffer); + return buffer; } void ChangesetEncoder::reset() noexcept diff --git a/src/realm/sync/client.cpp b/src/realm/sync/client.cpp index 38df01a559c..b91aaaced66 100644 --- a/src/realm/sync/client.cpp +++ b/src/realm/sync/client.cpp @@ -1176,9 +1176,7 @@ bool SessionWrapper::has_flx_subscription_store() const void SessionWrapper::on_flx_sync_error(int64_t version, std::string_view err_msg) { REALM_ASSERT(!m_finalized); - auto mut_subs = get_flx_subscription_store()->get_mutable_by_version(version); - mut_subs.update_state(SubscriptionSet::State::Error, err_msg); - mut_subs.commit(); + get_flx_subscription_store()->update_state(version, SubscriptionSet::State::Error, err_msg); } void SessionWrapper::on_flx_sync_version_complete(int64_t version) @@ -1227,9 +1225,7 @@ void SessionWrapper::on_flx_sync_progress(int64_t new_version, DownloadBatchStat break; } - auto mut_subs = get_flx_subscription_store()->get_mutable_by_version(new_version); - mut_subs.update_state(new_state); - mut_subs.commit(); + get_flx_subscription_store()->update_state(new_version, new_state); } SubscriptionStore* SessionWrapper::get_flx_subscription_store() @@ -1687,9 +1683,7 @@ void SessionWrapper::on_download_completion() if (m_flx_subscription_store && m_flx_pending_mark_version != SubscriptionSet::EmptyVersion) { m_sess->logger.debug("Marking query version %1 as complete after receiving MARK message", m_flx_pending_mark_version); - auto mutable_subs = m_flx_subscription_store->get_mutable_by_version(m_flx_pending_mark_version); - mutable_subs.update_state(SubscriptionSet::State::Complete); - mutable_subs.commit(); + m_flx_subscription_store->update_state(m_flx_pending_mark_version, SubscriptionSet::State::Complete); m_flx_pending_mark_version = SubscriptionSet::EmptyVersion; } diff --git a/src/realm/sync/instruction_applier.cpp b/src/realm/sync/instruction_applier.cpp index 4ebb02f2e6f..34f1b599384 100644 --- a/src/realm/sync/instruction_applier.cpp +++ b/src/realm/sync/instruction_applier.cpp @@ -941,11 +941,9 @@ bool InstructionApplier::check_links_exist(const Instruction::Payload& payload) [&](InternString interned_pk) { return Mixed{get_string(interned_pk)}; }, - [&](GlobalKey) { + [&](GlobalKey) -> Mixed { bad_transaction_log( "Unexpected link to embedded object while validating a primary key"); - return Mixed{}; // appease the compiler; visitors must have a single - // return type }, [&](ObjectId pk) { return Mixed{pk}; diff --git a/src/realm/sync/instructions.hpp b/src/realm/sync/instructions.hpp index ad8d18b2f6b..2afa39ec3c5 100644 --- a/src/realm/sync/instructions.hpp +++ b/src/realm/sync/instructions.hpp @@ -734,8 +734,8 @@ struct Instruction { const Instruction& at(size_t) const noexcept; private: - template - struct Visitor; + template + static decltype(auto) visit(F&& lambda, V&& instr); }; inline const char* get_type_name(Instruction::Type type) @@ -934,47 +934,17 @@ Instruction::Instruction(T instr) static_assert(!std::is_same_v); } -template -struct Instruction::Visitor { - F lambda; // reference type - Visitor(F lambda) - : lambda(lambda) - { - } - - template - decltype(auto) operator()(T& instr) - { - return lambda(instr); - } - - template - decltype(auto) operator()(const T& instr) - { - return lambda(instr); - } - - auto operator()(const Instruction::Vector&) -> decltype(lambda(std::declval())) - { - REALM_TERMINATE("visiting instruction vector"); - } - auto operator()(Instruction::Vector&) -> decltype(lambda(std::declval())) - { - REALM_TERMINATE("visiting instruction vector"); - } -}; - -template -inline decltype(auto) Instruction::visit(F&& lambda) +template +inline decltype(auto) Instruction::visit(F&& lambda, V&& instr) { // Cannot use std::visit, because it does not pass lvalue references to the visitor. - if (mpark::holds_alternative(m_instr)) { + if (mpark::holds_alternative(instr)) { REALM_TERMINATE("visiting instruction vector"); } #define REALM_VISIT_VARIANT(X) \ - else if (mpark::holds_alternative(m_instr)) \ + else if (auto ptr = mpark::get_if(&instr)) \ { \ - return lambda(mpark::get(m_instr)); \ + return lambda(*ptr); \ } REALM_FOR_EACH_INSTRUCTION_TYPE(REALM_VISIT_VARIANT) #undef REALM_VISIT_VARIANT @@ -984,24 +954,16 @@ inline decltype(auto) Instruction::visit(F&& lambda) } } +template +inline decltype(auto) Instruction::visit(F&& lambda) +{ + return visit(std::forward(lambda), m_instr); +} + template inline decltype(auto) Instruction::visit(F&& lambda) const { - // Cannot use std::visit, because it does not pass lvalue references to the visitor. - if (mpark::holds_alternative(m_instr)) { - REALM_TERMINATE("visiting instruction vector"); - } -#define REALM_VISIT_VARIANT(X) \ - else if (mpark::holds_alternative(m_instr)) \ - { \ - return lambda(mpark::get(m_instr)); \ - } - REALM_FOR_EACH_INSTRUCTION_TYPE(REALM_VISIT_VARIANT) -#undef REALM_VISIT_VARIANT - else - { - REALM_TERMINATE("Unhandled instruction variant entry"); - } + return visit(std::forward(lambda), m_instr); } inline Instruction::Type Instruction::type() const noexcept diff --git a/src/realm/sync/noinst/client_history_impl.cpp b/src/realm/sync/noinst/client_history_impl.cpp index 30b8e41fe93..18695c360c2 100644 --- a/src/realm/sync/noinst/client_history_impl.cpp +++ b/src/realm/sync/noinst/client_history_impl.cpp @@ -16,18 +16,19 @@ // //////////////////////////////////////////////////////////////////////////// -#include "realm/util/functional.hpp" #include -#include -#include -#include #include #include #include #include #include +#include #include +#include +#include +#include +#include #include #include @@ -37,44 +38,78 @@ namespace realm::sync { -void ClientHistory::set_client_file_ident_in_wt(version_type current_version, SaltedFileIdent client_file_ident) +void ClientHistory::set_client_reset_adjustments( + util::Logger& logger, version_type current_version, SaltedFileIdent client_file_ident, + SaltedVersion server_version, const std::vector<_impl::client_reset::RecoveredChange>& recovered_changesets) { ensure_updated(current_version); // Throws prepare_for_write(); // Throws + version_type client_version = m_sync_history_base_version + sync_history_size(); + REALM_ASSERT(client_version == current_version); // For now Array& root = m_arrays->root; m_group->set_sync_file_id(client_file_ident.ident); // Throws - root.set(s_client_file_ident_salt_iip, - RefOrTagged::make_tagged(client_file_ident.salt)); // Throws -} + size_t uploadable_bytes = 0; + if (recovered_changesets.empty()) { + // Either we had nothing to upload or we're discarding the unsynced changes + logger.debug("Client reset adjustments: discarding %1 history entries", sync_history_size()); + do_trim_sync_history(sync_history_size()); // Throws + } + else { + // Discard all sync history before the first recovered changeset. This is + // required because we are going to discard our progress information and + // so won't know which history entries have been uploaded already. + auto first_version = recovered_changesets.front().version; + REALM_ASSERT(first_version >= m_sync_history_base_version); + auto discard_count = std::size_t(first_version - m_sync_history_base_version); + do_trim_sync_history(discard_count); + + if (logger.would_log(util::Logger::Level::debug)) { + logger.debug("Client reset adjustments: trimming %1 history entries and updating %2 of %3 remaining " + "history entries:", + discard_count, recovered_changesets.size(), sync_history_size()); + for (size_t i = 0, size = m_arrays->changesets.size(); i < size; ++i) { + logger.debug("- %1: ident(%2) changeset_size(%3) remote_version(%4)", i, + m_arrays->origin_file_idents.get(i), m_arrays->changesets.get(i).size(), + m_arrays->remote_versions.get(i)); + } + } -void ClientHistory::set_client_reset_adjustments(version_type current_version, SaltedFileIdent client_file_ident, - SaltedVersion server_version, BinaryData uploadable_changeset) -{ - ensure_updated(current_version); // Throws - prepare_for_write(); // Throws + util::compression::CompressMemoryArena arena; + util::AppendBuffer compressed; + for (auto& [changeset, version] : recovered_changesets) { + uploadable_bytes += changeset.size(); + auto i = size_t(version - m_sync_history_base_version); + util::compression::allocate_and_compress_nonportable(arena, changeset, compressed); + m_arrays->changesets.set(i, BinaryData{compressed.data(), compressed.size()}); // Throws + m_arrays->remote_versions.set(i, server_version.version); + m_arrays->reciprocal_transforms.set(i, BinaryData()); + logger.debug("Updating %1: client_version(%2) changeset_size(%3) server_version(%4)", i, version, + compressed.size(), server_version.version); + } + } + logger.debug("New uploadable bytes after client reset adjustment: %1", uploadable_bytes); + + // Client progress versions are set to 0 to signal to the server that we've + // reset our versioning. If we send the actual values, the server would + // complain that the versions (probably) don't correspond to the ones sent + // when downloading the fresh realm. + root.set(s_progress_download_client_version_iip, + RefOrTagged::make_tagged(0)); // Throws + root.set(s_progress_upload_client_version_iip, + RefOrTagged::make_tagged(0)); // Throws - version_type client_version = m_sync_history_base_version + sync_history_size(); - REALM_ASSERT(client_version == current_version); // For now - DownloadCursor download_progress = {server_version.version, 0}; - UploadCursor upload_progress = {0, 0}; - Array& root = m_arrays->root; - m_group->set_sync_file_id(client_file_ident.ident); // Throws root.set(s_client_file_ident_salt_iip, RefOrTagged::make_tagged(client_file_ident.salt)); // Throws root.set(s_progress_download_server_version_iip, - RefOrTagged::make_tagged(download_progress.server_version)); // Throws - root.set(s_progress_download_client_version_iip, - RefOrTagged::make_tagged(download_progress.last_integrated_client_version)); // Throws + RefOrTagged::make_tagged(server_version.version)); // Throws root.set(s_progress_latest_server_version_iip, RefOrTagged::make_tagged(server_version.version)); // Throws root.set(s_progress_latest_server_version_salt_iip, RefOrTagged::make_tagged(server_version.salt)); // Throws - root.set(s_progress_upload_client_version_iip, - RefOrTagged::make_tagged(upload_progress.client_version)); // Throws root.set(s_progress_upload_server_version_iip, - RefOrTagged::make_tagged(upload_progress.last_integrated_server_version)); // Throws + RefOrTagged::make_tagged(server_version.version)); // Throws root.set(s_progress_downloaded_bytes_iip, RefOrTagged::make_tagged(0)); // Throws root.set(s_progress_downloadable_bytes_iip, @@ -82,13 +117,10 @@ void ClientHistory::set_client_reset_adjustments(version_type current_version, S root.set(s_progress_uploaded_bytes_iip, RefOrTagged::make_tagged(0)); // Throws root.set(s_progress_uploadable_bytes_iip, - RefOrTagged::make_tagged(0)); // Throws - - // Discard existing synchronization history - do_trim_sync_history(sync_history_size()); // Throws + RefOrTagged::make_tagged(uploadable_bytes)); // Throws - m_progress_download = download_progress; - m_client_reset_changeset = uploadable_changeset; // Picked up by prepare_changeset() + m_progress_download = {server_version.version, 0}; + m_applying_client_reset = true; } std::vector ClientHistory::get_local_changes(version_type current_version) const @@ -116,7 +148,13 @@ std::vector ClientHistory::get_local_changes(version std::int_fast64_t origin_file_ident = m_arrays->origin_file_idents.get(ndx); bool not_from_server = (origin_file_ident == 0); if (not_from_server) { - changesets.push_back({version, m_arrays->changesets.get(ndx)}); + bool compressed = false; + // find_sync_history_entry() returns 0 to indicate not found and + // otherwise adds 1 to the version, and then get_reciprocal_transform() + // subtracts 1 from the version + if (auto changeset = get_reciprocal_transform(version + 1, compressed); changeset.size()) { + changesets.push_back({version, changeset}); + } } } return changesets; @@ -269,20 +307,6 @@ void ClientHistory::get_status(version_type& current_client_version, SaltedFileI } } -SaltedFileIdent ClientHistory::get_client_file_ident(const Transaction& rt) const -{ - SaltedFileIdent client_file_ident{rt.get_sync_file_id(), 0}; - - using gf = _impl::GroupFriend; - if (ref_type ref = gf::get_history_ref(rt)) { - Array root(m_db->get_alloc()); - root.init_from_ref(ref); - client_file_ident.salt = salt_type(root.get_as_ref_or_tagged(s_client_file_ident_salt_iip).get_as_int()); - } - - return client_file_ident; -} - void ClientHistory::set_client_file_ident(SaltedFileIdent client_file_ident, bool fix_up_object_ids) { REALM_ASSERT(client_file_ident.ident != 0); @@ -524,7 +548,7 @@ size_t ClientHistory::transform_and_apply_server_changesets(util::Spanget_transact_stage() == DB::transact_Writing); if (!m_replication.apply_server_changes()) { - std::for_each(changesets_to_integrate.begin(), changesets_to_integrate.end(), [&](const Changeset c) { + std::for_each(changesets_to_integrate.begin(), changesets_to_integrate.end(), [&](const Changeset& c) { downloaded_bytes += c.original_changeset_size; }); // Skip over all changesets if they don't need to be transformed and applied. @@ -737,7 +761,7 @@ Replication::version_type ClientHistory::add_changeset(BinaryData ct_changeset, ct_changeset = BinaryData("", 0); m_arrays->ct_history.add(ct_changeset); // Throws - REALM_ASSERT(!m_applying_server_changeset || !m_client_reset_changeset); + REALM_ASSERT(!m_applying_server_changeset || !m_applying_client_reset); // If we're applying a changeset from the server then we should have already // added the history entry and don't need to do so here @@ -751,21 +775,19 @@ Replication::version_type ClientHistory::add_changeset(BinaryData ct_changeset, return m_ct_history_base_version + ct_history_size(); } - BinaryData changeset = sync_changeset; - if (m_client_reset_changeset) { - // When performing a client reset, `sync_changeset` is generated from - // the operations performed to bring the local Realm in sync with the - // server, so we don't want to send that to the server. Instead we - // send m_client_reset_changeset which is the recovered local writes - // (or null in discard local mode). - changeset = *std::exchange(m_client_reset_changeset, util::none); + // We don't generate a changeset for any of the changes made as part of + // applying a client reset as those changes are just bringing us into + // alignment with the new server state + if (m_applying_client_reset) { + m_applying_client_reset = false; + sync_changeset = {}; } HistoryEntry entry; entry.origin_timestamp = m_local_origin_timestamp_source(); entry.origin_file_ident = 0; // Of local origin entry.remote_version = m_progress_download.server_version; - entry.changeset = changeset; + entry.changeset = sync_changeset; add_sync_history_entry(entry); // Throws // uploadable_bytes is updated at every local Realm change. The total @@ -920,7 +942,7 @@ void ClientHistory::trim_ct_history() // Definition: An *upload skippable history entry* is one whose changeset is // either empty, or of remote origin. // -// Then, a history entry, E, can be trimmed away if it preceeds C, or E is +// Then, a history entry, E, can be trimmed away if it precedes C, or E is // upload skippable, and there are no upload nonskippable entries between C and // E. // @@ -1004,8 +1026,14 @@ void ClientHistory::do_trim_sync_history(std::size_t n) REALM_ASSERT(m_arrays->origin_timestamps.size() == sync_history_size()); REALM_ASSERT(n <= sync_history_size()); - if (n > 0) { - // FIXME: shouldn't this be using truncate()? + if (n == sync_history_size()) { + m_arrays->changesets.clear(); + m_arrays->reciprocal_transforms.clear(); + m_arrays->remote_versions.clear(); + m_arrays->origin_file_idents.clear(); + m_arrays->origin_timestamps.clear(); + } + else if (n > 0) { for (std::size_t i = 0; i < n; ++i) { std::size_t j = (n - 1) - i; m_arrays->changesets.erase(j); // Throws @@ -1026,9 +1054,9 @@ void ClientHistory::do_trim_sync_history(std::size_t n) std::size_t j = (n - 1) - i; m_arrays->origin_timestamps.erase(j); // Throws } - - m_sync_history_base_version += n; } + + m_sync_history_base_version += n; } void ClientHistory::fix_up_client_file_ident_in_stored_changesets(Transaction& group, diff --git a/src/realm/sync/noinst/client_history_impl.hpp b/src/realm/sync/noinst/client_history_impl.hpp index cbfb17cd505..0a2288d9a8d 100644 --- a/src/realm/sync/noinst/client_history_impl.hpp +++ b/src/realm/sync/noinst/client_history_impl.hpp @@ -19,11 +19,15 @@ #ifndef REALM_NOINST_CLIENT_HISTORY_IMPL_HPP #define REALM_NOINST_CLIENT_HISTORY_IMPL_HPP -#include "realm/util/functional.hpp" -#include +#include #include #include -#include +#include +#include + +namespace realm::_impl::client_reset { +struct RecoveredChange; +} namespace realm::sync { @@ -92,11 +96,12 @@ class ClientHistory final : public _impl::History, public TransformHistory { }; /// set_client_reset_adjustments() is used by client reset to adjust the - /// content of the history compartment. The shared group associated with + /// content of the history compartment. The DB associated with /// this history object must be in a write transaction when this function /// is called. - void set_client_reset_adjustments(version_type current_version, SaltedFileIdent client_file_ident, - SaltedVersion server_version, BinaryData uploadable_changeset); + void set_client_reset_adjustments(util::Logger& logger, version_type current_version, + SaltedFileIdent client_file_ident, SaltedVersion server_version, + const std::vector<_impl::client_reset::RecoveredChange>&); struct LocalChange { version_type version; @@ -152,9 +157,6 @@ class ClientHistory final : public _impl::History, public TransformHistory { /// when engaging in future synchronization sessions. void set_client_file_ident(SaltedFileIdent client_file_ident, bool fix_up_object_ids); - /// Gets the client file ident set with `set_client_file_ident`, or `{0, 0}` if it has never been set. - SaltedFileIdent get_client_file_ident(const Transaction& tr) const; - /// Stores the synchronization progress in the associated Realm file in a /// way that makes it available via get_status() during future /// synchronization sessions. Progress is reported by the server in the @@ -258,11 +260,6 @@ class ClientHistory final : public _impl::History, public TransformHistory { /// generate_changeset_timestamp(). void set_local_origin_timestamp_source(util::UniqueFunction source_fn); -public: // Stuff in this section is only used by tests. - // virtual void set_client_file_ident_in_wt() sets the client file ident. - // The history must be in a write transaction with version 'current_version'. - void set_client_file_ident_in_wt(version_type current_version, SaltedFileIdent client_file_ident); - private: friend class ClientReplication; static constexpr version_type s_initial_version = 1; @@ -373,8 +370,7 @@ class ClientHistory final : public _impl::History, public TransformHistory { // This field is guarded by the DB's write lock and should only be accessed // while that is held. mutable bool m_applying_server_changeset = false; - - util::Optional m_client_reset_changeset; + bool m_applying_client_reset = false; // Cache of s_progress_download_server_version_iip and // s_progress_download_client_version_iip slots of history compartment root diff --git a/src/realm/sync/noinst/client_impl_base.cpp b/src/realm/sync/noinst/client_impl_base.cpp index d07d874c269..c0e92c76644 100644 --- a/src/realm/sync/noinst/client_impl_base.cpp +++ b/src/realm/sync/noinst/client_impl_base.cpp @@ -2233,8 +2233,6 @@ bool Session::client_reset_if_needed() REALM_ASSERT_EX(m_progress.download.last_integrated_client_version == 0, m_progress.download.last_integrated_client_version); REALM_ASSERT_EX(m_progress.upload.client_version == 0, m_progress.upload.client_version); - REALM_ASSERT_EX(m_progress.upload.last_integrated_server_version == 0, - m_progress.upload.last_integrated_server_version); logger.trace("last_version_available = %1", m_last_version_available); // Throws m_upload_progress = m_progress.upload; diff --git a/src/realm/sync/noinst/client_reset.cpp b/src/realm/sync/noinst/client_reset.cpp index 7819f3980f2..0f4db374552 100644 --- a/src/realm/sync/noinst/client_reset.cpp +++ b/src/realm/sync/noinst/client_reset.cpp @@ -497,6 +497,8 @@ void track_reset(Transaction& wt, ClientResyncMode mode) {timestamp_col, Timestamp(std::chrono::system_clock::now())}, {type_col, mode_val}}); + // Ensure we save the tracker object even if we encounter an error and roll + // back the client reset later wt.commit_and_continue_writing(); } @@ -553,10 +555,10 @@ static ClientResyncMode reset_precheck_guard(Transaction& wt, ClientResyncMode m return mode; } -LocalVersionIDs perform_client_reset_diff(DB& db_local, DB& db_remote, sync::SaltedFileIdent client_file_ident, - util::Logger& logger, ClientResyncMode mode, bool recovery_is_allowed, - bool* did_recover_out, sync::SubscriptionStore* sub_store, - util::FunctionRef on_flx_version_complete) +bool perform_client_reset_diff(DB& db_local, DB& db_remote, sync::SaltedFileIdent client_file_ident, + util::Logger& logger, ClientResyncMode mode, bool recovery_is_allowed, + sync::SubscriptionStore* sub_store, + util::FunctionRef on_flx_version_complete) { auto wt_local = db_local.start_write(); auto actual_mode = reset_precheck_guard(*wt_local, mode, recovery_is_allowed, logger); @@ -573,7 +575,6 @@ LocalVersionIDs perform_client_reset_diff(DB& db_local, DB& db_remote, sync::Sal auto& repl_local = dynamic_cast(*db_local.get_replication()); auto& history_local = repl_local.get_history(); history_local.ensure_updated(wt_local->get_version()); - SaltedFileIdent orig_file_ident = history_local.get_client_file_ident(*wt_local); VersionID old_version_local = wt_local->get_version_of_current_transaction(); auto& repl_remote = dynamic_cast(*db_remote.get_replication()); @@ -588,126 +589,48 @@ LocalVersionIDs perform_client_reset_diff(DB& db_local, DB& db_remote, sync::Sal fresh_server_version = remote_progress.latest_server_version; } - if (!recover_local_changes) { - auto rt_remote = db_remote.start_read(); - // transform the local Realm such that all public tables become identical to the remote Realm - transfer_group(*rt_remote, *wt_local, logger, false); + TransactionRef tr_remote; + std::vector recovered; + if (recover_local_changes) { + auto frozen_pre_local_state = db_local.start_frozen(); + auto local_changes = history_local.get_local_changes(wt_local->get_version()); + logger.info("Local changesets to recover: %1", local_changes.size()); - // now that the state of the fresh and local Realms are identical, - // reset the local sync history and steal the fresh Realm's ident - history_local.set_client_reset_adjustments(wt_local->get_version(), client_file_ident, fresh_server_version, - BinaryData()); - - int64_t subscription_version = 0; - if (sub_store) { - subscription_version = sub_store->set_active_as_latest(*wt_local); - } - - wt_local->commit_and_continue_as_read(); - if (did_recover_out) { - *did_recover_out = false; - } - on_flx_version_complete(subscription_version); - - VersionID new_version_local = wt_local->get_version_of_current_transaction(); - logger.info("perform_client_reset_diff is done: old_version = (version: %1, index: %2), " - "new_version = (version: %3, index: %4)", - old_version_local.version, old_version_local.index, new_version_local.version, - new_version_local.index); - return LocalVersionIDs{old_version_local, new_version_local}; + tr_remote = db_remote.start_write(); + recovered = process_recovered_changesets(*tr_remote, *frozen_pre_local_state, logger, local_changes); + } + else { + tr_remote = db_remote.start_read(); } - auto remake_active_subscription = [&]() { - if (!sub_store) { - return; - } - auto subs = sub_store->get_active(); - int64_t before_version = subs.version(); - auto mut_subs = subs.make_mutable_copy(); - mut_subs.update_state(sync::SubscriptionSet::State::Complete); - auto sub = std::move(mut_subs).commit(); - on_flx_version_complete(sub.version()); - logger.info("Recreated the active subscription set in the complete state (%1 -> %2)", before_version, - sub.version()); - }; - - auto frozen_pre_local_state = db_local.start_frozen(); - auto local_changes = history_local.get_local_changes(wt_local->get_version()); - logger.info("Local changesets to recover: %1", local_changes.size()); - - auto wt_remote = db_remote.start_write(); + // transform the local Realm such that all public tables become identical to the remote Realm + transfer_group(*tr_remote, *wt_local, logger, false); - BinaryData recovered_changeset; + // now that the state of the fresh and local Realms are identical, + // reset the local sync history and steal the fresh Realm's ident + history_local.set_client_reset_adjustments(logger, wt_local->get_version(), client_file_ident, + fresh_server_version, recovered); - // FLX with recovery has to be done in multiple commits, which is significantly different than other modes + int64_t subscription_version = 0; if (sub_store) { - // In FLX recovery, save a copy of the pending subscriptions for later. This - // needs to be done before they are wiped out by remake_active_subscription() - std::vector pending_subscriptions = sub_store->get_pending_subscriptions(); - // transform the local Realm such that all public tables become identical to the remote Realm - transfer_group(*wt_remote, *wt_local, logger, recover_local_changes); - // now that the state of the fresh and local Realms are identical, - // reset the local sync history. - // Note that we do not set the new file ident yet! This is done in the last commit. - history_local.set_client_reset_adjustments(wt_local->get_version(), orig_file_ident, fresh_server_version, - recovered_changeset); - // The local Realm is committed. There are no changes to the remote Realm. - wt_remote->rollback_and_continue_as_read(); - wt_local->commit_and_continue_as_read(); - // Make a copy of the active subscription set and mark it as - // complete. This will cause all other subscription sets to become superceded. - remake_active_subscription(); - // Apply local changes interleaved with pending subscriptions in separate commits - // as needed. This has the consequence that there may be extra notifications along - // the way to the final state, but since separate commits are necessary, this is - // unavoidable. - wt_local = db_local.start_write(); - RecoverLocalChangesetsHandler handler{*wt_local, *frozen_pre_local_state, logger}; - handler.process_changesets(local_changes, std::move(pending_subscriptions)); // throws on error - // The new file ident is set as part of the final commit. This is to ensure that if - // there are any exceptions during recovery, or the process is killed for some - // reason, the client reset cycle detection will catch this and we will not attempt - // to recover again. If we had set the ident in the first commit, a Realm which was - // partially recovered, but interrupted may continue sync the next time it is - // opened with only partially recovered state while having lost the history of any - // offline modifications. - history_local.set_client_file_ident_in_wt(wt_local->get_version(), client_file_ident); - wt_local->commit_and_continue_as_read(); - } - else { - // In PBS recovery, the strategy is to apply all local changes to the remote realm first, - // and then transfer the modified state all at once to the local Realm. This creates a - // nice side effect for notifications because only the minimal state change is made. - RecoverLocalChangesetsHandler handler{*wt_remote, *frozen_pre_local_state, logger}; - handler.process_changesets(local_changes, {}); // throws on error - ChangesetEncoder& encoder = repl_remote.get_instruction_encoder(); - const sync::ChangesetEncoder::Buffer& buffer = encoder.buffer(); - recovered_changeset = {buffer.data(), buffer.size()}; - - // transform the local Realm such that all public tables become identical to the remote Realm - transfer_group(*wt_remote, *wt_local, logger, recover_local_changes); - - // now that the state of the fresh and local Realms are identical, - // reset the local sync history and steal the fresh Realm's ident - history_local.set_client_reset_adjustments(wt_local->get_version(), client_file_ident, fresh_server_version, - recovered_changeset); - - // Finally, the local Realm is committed. The changes to the remote Realm are discarded. - wt_remote->rollback_and_continue_as_read(); - wt_local->commit_and_continue_as_read(); + if (recover_local_changes) { + subscription_version = sub_store->mark_active_as_complete(*wt_local); + } + else { + subscription_version = sub_store->set_active_as_latest(*wt_local); + } } - if (did_recover_out) { - *did_recover_out = true; - } + wt_local->commit_and_continue_as_read(); + on_flx_version_complete(subscription_version); + VersionID new_version_local = wt_local->get_version_of_current_transaction(); - logger.info("perform_client_reset_diff is done, old_version.version = %1, " - "old_version.index = %2, new_version.version = %3, " - "new_version.index = %4", + logger.info("perform_client_reset_diff is done: old_version = (version: %1, index: %2), " + "new_version = (version: %3, index: %4)", old_version_local.version, old_version_local.index, new_version_local.version, new_version_local.index); - return LocalVersionIDs{old_version_local, new_version_local}; + return recover_local_changes; } } // namespace realm::_impl::client_reset diff --git a/src/realm/sync/noinst/client_reset.hpp b/src/realm/sync/noinst/client_reset.hpp index fd113bc3d54..a4d7e9b35d1 100644 --- a/src/realm/sync/noinst/client_reset.hpp +++ b/src/realm/sync/noinst/client_reset.hpp @@ -78,18 +78,9 @@ void track_reset(Transaction& wt, ClientResyncMode mode); // If the fresh path is provided, the local Realm is changed such that its state is equal // to the fresh Realm. Then the local Realm will have its client file ident set to // 'client_file_ident' -// -// The function returns the old version and the new version of the local Realm to -// be used to report the sync transaction to the user. -struct LocalVersionIDs { - realm::VersionID old_version; - realm::VersionID new_version; -}; - -LocalVersionIDs perform_client_reset_diff(DB& db, DB& db_remote, sync::SaltedFileIdent client_file_ident, - util::Logger& logger, ClientResyncMode mode, bool recovery_is_allowed, - bool* did_recover_out, sync::SubscriptionStore* sub_store, - util::FunctionRef on_flx_version_complete); +bool perform_client_reset_diff(DB& db, DB& db_remote, sync::SaltedFileIdent client_file_ident, util::Logger& logger, + ClientResyncMode mode, bool recovery_is_allowed, sync::SubscriptionStore* sub_store, + util::FunctionRef on_flx_version_complete); } // namespace _impl::client_reset } // namespace realm diff --git a/src/realm/sync/noinst/client_reset_operation.cpp b/src/realm/sync/noinst/client_reset_operation.cpp index 10780bba52d..a51c8d47059 100644 --- a/src/realm/sync/noinst/client_reset_operation.cpp +++ b/src/realm/sync/noinst/client_reset_operation.cpp @@ -92,13 +92,11 @@ bool perform_client_reset(util::Logger& logger, DB& db, DB& fresh_db, ClientResy if (notify_after) { previous_state = db.start_frozen(frozen_before_state_version); } - bool did_recover_out = false; - client_reset::perform_client_reset_diff(db, fresh_db, new_file_ident, logger, mode, recovery_is_allowed, - &did_recover_out, sub_store, - on_flx_version); // throws + bool did_recover = client_reset::perform_client_reset_diff( + db, fresh_db, new_file_ident, logger, mode, recovery_is_allowed, sub_store, on_flx_version); // throws if (notify_after) { - notify_after(previous_state->get_version_of_current_transaction(), did_recover_out); + notify_after(previous_state->get_version_of_current_transaction(), did_recover); } return true; diff --git a/src/realm/sync/noinst/client_reset_recovery.cpp b/src/realm/sync/noinst/client_reset_recovery.cpp index 715787079d3..5fa23545382 100644 --- a/src/realm/sync/noinst/client_reset_recovery.cpp +++ b/src/realm/sync/noinst/client_reset_recovery.cpp @@ -22,24 +22,204 @@ #include #include #include -#include - -#include #include -#include +#include +#include #include +#include +#include #include - #include +#include +#include #include #include using namespace realm; -using namespace _impl; -using namespace sync; +using namespace realm::_impl; +using namespace realm::sync; + +namespace { + +// State tracking of operations on list indices. All list operations in a recovered changeset +// must apply to a "known" index. An index is known if the element at that position was added +// by the recovery itself. If any operation applies to an "unknown" index, the list will go into +// a requires_manual_copy state which means that all further operations on the list are ignored +// and the entire list is copied over verbatim at the end. +struct ListTracker { + struct CrossListIndex { + uint32_t local; + uint32_t remote; + }; + + util::Optional insert(uint32_t local_index, size_t remote_list_size); + util::Optional update(uint32_t index); + void clear(); + bool move(uint32_t from, uint32_t to, size_t lst_size, uint32_t& remote_from_out, uint32_t& remote_to_out); + bool remove(uint32_t index, uint32_t& remote_index_out); + bool requires_manual_copy() const; + void queue_for_manual_copy(); + void mark_as_copied(); + +private: + std::vector m_indices_allowed; + bool m_requires_manual_copy = false; + bool m_has_been_copied = false; +}; + +struct InternDictKey { + bool is_null() const + { + return m_pos == realm::npos && m_size == realm::npos; + } + constexpr bool operator==(const InternDictKey& other) const noexcept + { + return m_pos == other.m_pos && m_size == other.m_size; + } + constexpr bool operator!=(const InternDictKey& other) const noexcept + { + return !operator==(other); + } + constexpr bool operator<(const InternDictKey& other) const noexcept + { + if (m_pos < other.m_pos) { + return true; + } + else if (m_pos == other.m_pos) { + return m_size < other.m_size; + } + return false; + } + +private: + friend struct InterningBuffer; + size_t m_pos = realm::npos; + size_t m_size = realm::npos; +}; + +struct InterningBuffer { + std::string_view get_key(const InternDictKey& key) const; + InternDictKey get_or_add(const std::string_view& str); + +private: + std::string m_dict_keys_buffer; + std::vector m_dict_keys; +}; + +// A wrapper around a PathInstruction which enables storing this path in a +// FlatMap or other container. The advantage of using this instead of a PathInstruction +// is the use of ColKey instead of column names and that because it is not possible to use +// the InternStrings of a PathInstruction because they are tied to a specific Changeset, +// while the ListPath can be used across multiple Changesets. +struct ListPath { + ListPath(TableKey table_key, ObjKey obj_key); + + struct Element { + explicit Element(const InternDictKey& str); + explicit Element(ColKey key); + union { + InternDictKey intern_key; + size_t index; + ColKey col_key; + }; + enum class Type { + InternKey, + ListIndex, + ColumnKey, + } type; + + bool operator==(const Element& other) const noexcept; + bool operator!=(const Element& other) const noexcept; + bool operator<(const Element& other) const noexcept; + }; -namespace realm::_impl::client_reset { + void append(const Element& item); + bool operator<(const ListPath& other) const noexcept; + bool operator==(const ListPath& other) const noexcept; + bool operator!=(const ListPath& other) const noexcept; + std::string path_to_string(Transaction& remote, const InterningBuffer& buffer); + + using const_iterator = typename std::vector::const_iterator; + using iterator = typename std::vector::iterator; + const_iterator begin() const noexcept + { + return m_path.begin(); + } + const_iterator end() const noexcept + { + return m_path.end(); + } + TableKey table_key() const noexcept + { + return m_table_key; + } + ObjKey obj_key() const noexcept + { + return m_obj_key; + } + +private: + std::vector m_path; + TableKey m_table_key; + ObjKey m_obj_key; +}; + +struct RecoverLocalChangesetsHandler : public sync::InstructionApplier { + RecoverLocalChangesetsHandler(Transaction& dest_wt, Transaction& frozen_pre_local_state, util::Logger& logger); + util::AppendBuffer process_changeset(const ChunkedBinaryData& changeset); + +private: + using Instruction = sync::Instruction; + using ListPathCallback = util::UniqueFunction; + + struct RecoveryResolver : public InstructionApplier::PathResolver { + RecoveryResolver(RecoverLocalChangesetsHandler* applier, Instruction::PathInstruction& instr, + const std::string_view& instr_name); + void on_property(Obj&, ColKey) override; + void on_list(LstBase&) override; + Status on_list_index(LstBase&, uint32_t) override; + void on_dictionary(Dictionary&) override; + Status on_dictionary_key(Dictionary&, Mixed) override; + void on_set(SetBase&) override; + void on_error(const std::string&) override; + void on_column_advance(ColKey) override; + void on_dict_key_advance(StringData) override; + Status on_list_index_advance(uint32_t) override; + Status on_null_link_advance(StringData, StringData) override; + Status on_begin(const util::Optional&) override; + void on_finish() override {} + + void set_last_path_index(uint32_t ndx); + + ListPath m_list_path; + Instruction::PathInstruction& m_mutable_instr; + RecoverLocalChangesetsHandler* m_recovery_applier; + }; + + REALM_NORETURN void handle_error(const std::string& message) const; + void copy_lists_with_unrecoverable_changes(); + + bool resolve_path(ListPath& path, Obj remote_obj, Obj local_obj, + util::UniqueFunction callback); + bool resolve(ListPath& path, util::UniqueFunction callback); + +#define REALM_DECLARE_INSTRUCTION_HANDLER(X) void operator()(const Instruction::X&) override; + REALM_FOR_EACH_INSTRUCTION_TYPE(REALM_DECLARE_INSTRUCTION_HANDLER) +#undef REALM_DECLARE_INSTRUCTION_HANDLER + friend struct sync::Instruction; // to allow visitor + +private: + Transaction& m_frozen_pre_local_state; + // Keeping the member variable reference to a logger since the lifetime of this class is + // only within the function that created it. + util::Logger& m_logger; + InterningBuffer m_intern_keys; + // Track any recovered operations on lists to make sure that they are allowed. + // If not, the lists here will be copied verbatim from the local state to the remote. + util::FlatMap m_lists; + Replication* m_replication; +}; util::Optional ListTracker::insert(uint32_t local_index, size_t remote_list_size) { @@ -226,33 +406,6 @@ InternDictKey InterningBuffer::get_or_add(const std::string_view& str) return new_key; } -InternDictKey InterningBuffer::get_interned_key(const std::string_view& str) const -{ - if (str.data() == nullptr) { - return {}; - } - for (auto& key : m_dict_keys) { - StringData existing = get_key(key); - if (existing == str) { - return key; - } - } - throw RuntimeError(ErrorCodes::InvalidArgument, - util::format("InterningBuffer::get_interned_key(%1) did not contain the requested key", str)); - return {}; -} - -std::string InterningBuffer::print() const -{ - return util::format("InterningBuffer of size=%1:'%2'", m_dict_keys.size(), m_dict_keys_buffer); -} - -ListPath::Element::Element(size_t stable_ndx) - : index(stable_ndx) - , type(Type::ListIndex) -{ -} - ListPath::Element::Element(const InternDictKey& str) : intern_key(str) , type(Type::InternKey) @@ -386,79 +539,45 @@ REALM_NORETURN void RecoverLocalChangesetsHandler::handle_error(const std::strin throw realm::_impl::client_reset::ClientResetFailed(full_message); } -void RecoverLocalChangesetsHandler::process_changesets(const std::vector& changesets, - std::vector&& pending_subscriptions) +util::AppendBuffer RecoverLocalChangesetsHandler::process_changeset(const ChunkedBinaryData& changeset) { - // When recovering in PBS, we can iterate through all the changes and apply them in a single commit. - // This has the nice property that any exception while applying will revert the entire recovery and leave - // the Realm in a "pre reset" state. - // - // When recovering in FLX mode, we must apply subscription sets interleaved between the correct commits. - // This handles the case where some objects were subscribed to for only one commit and then unsubscribed after. - - size_t subscription_index = 0; - auto write_pending_subscriptions_up_to = [&](version_type version) { - while (subscription_index < pending_subscriptions.size() && - pending_subscriptions[subscription_index].snapshot_version() <= version) { - if (m_transaction.get_transact_stage() == DB::TransactStage::transact_Writing) { - // List modifications may have happened on an object which we are only subscribed to - // for this commit so we need to apply them as we go. - copy_lists_with_unrecoverable_changes(); - m_transaction.commit_and_continue_as_read(); - } - auto pre_sub = pending_subscriptions[subscription_index++]; - auto post_sub = pre_sub.make_mutable_copy().commit(); - m_logger.info("Recovering pending subscription version: %1 -> %2, snapshot: %3 -> %4", pre_sub.version(), - post_sub.version(), pre_sub.snapshot_version(), post_sub.snapshot_version()); - } - if (m_transaction.get_transact_stage() != DB::TransactStage::transact_Writing) { - m_transaction.promote_to_write(); - } - }; - - for (const ClientHistory::LocalChange& change : changesets) { - if (change.changeset.size() == 0) - continue; - - ChunkedBinaryInputStream in{change.changeset}; - size_t decompressed_size; - auto decompressed = util::compression::decompress_nonportable_input_stream(in, decompressed_size); - if (!decompressed) - continue; - - write_pending_subscriptions_up_to(change.version); + ChunkedBinaryInputStream in{changeset}; + size_t decompressed_size; + auto decompressed = util::compression::decompress_nonportable_input_stream(in, decompressed_size); + if (!decompressed) + return {}; - sync::Changeset parsed_changeset; - sync::parse_changeset(*decompressed, parsed_changeset); // Throws + sync::Changeset parsed_changeset; + sync::parse_changeset(*decompressed, parsed_changeset); // Throws #if REALM_DEBUG - if (m_logger.would_log(util::Logger::Level::trace)) { - std::stringstream dumped_changeset; - parsed_changeset.print(dumped_changeset); - m_logger.trace("Recovering changeset: %1", dumped_changeset.str()); - } + if (m_logger.would_log(util::Logger::Level::trace)) { + std::stringstream dumped_changeset; + parsed_changeset.print(dumped_changeset); + m_logger.trace("Recovering changeset: %1", dumped_changeset.str()); + } #endif - InstructionApplier::begin_apply(parsed_changeset, &m_logger); - for (auto instr : parsed_changeset) { - if (!instr) - continue; - instr->visit(*this); // Throws - } - InstructionApplier::end_apply(); + InstructionApplier::begin_apply(parsed_changeset, &m_logger); + for (auto instr : parsed_changeset) { + if (!instr) + continue; + instr->visit(*this); // Throws } - - // write any remaining subscriptions - write_pending_subscriptions_up_to(std::numeric_limits::max()); - REALM_ASSERT_EX(subscription_index == pending_subscriptions.size(), subscription_index); + InstructionApplier::end_apply(); copy_lists_with_unrecoverable_changes(); + + auto& repl = static_cast(*m_replication); + auto buffer = repl.get_instruction_encoder().release(); + repl.reset(); + return buffer; } void RecoverLocalChangesetsHandler::copy_lists_with_unrecoverable_changes() { // Any modifications, moves or deletes to list elements which were not also created in the recovery // cannot be reliably applied because there is no way to know if the indices on the server have - // shifted without a reliable server side history. For these lists, create a consistant state by + // shifted without a reliable server side history. For these lists, create a consistent state by // copying over the entire list from the recovering client's state. This does create a "last recovery wins" // scenario for modifications to lists, but this is only a best effort. // For example, consider a list [A,B]. @@ -690,13 +809,6 @@ RecoverLocalChangesetsHandler::RecoveryResolver::on_begin(const util::Optional +client_reset::process_recovered_changesets(Transaction& dest_tr, Transaction& pre_reset_state, util::Logger& logger, + const std::vector& local_changes) +{ + RecoverLocalChangesetsHandler handler(dest_tr, pre_reset_state, logger); + std::vector encoded; + for (auto& local_change : local_changes) { + encoded.push_back({handler.process_changeset(local_change.changeset), local_change.version}); + } + return encoded; +} diff --git a/src/realm/sync/noinst/client_reset_recovery.hpp b/src/realm/sync/noinst/client_reset_recovery.hpp index 91230b4452b..626089af7e9 100644 --- a/src/realm/sync/noinst/client_reset_recovery.hpp +++ b/src/realm/sync/noinst/client_reset_recovery.hpp @@ -19,202 +19,19 @@ #ifndef REALM_NOINST_CLIENT_RESET_RECOVERY_HPP #define REALM_NOINST_CLIENT_RESET_RECOVERY_HPP -#include -#include -#include -#include #include -#include -#include +#include +#include namespace realm::_impl::client_reset { - -// State tracking of operations on list indices. All list operations in a recovered changeset -// must apply to a "known" index. An index is known if the element at that position was added -// by the recovery itself. If any operation applies to an "unknown" index, the list will go into -// a requires_manual_copy state which means that all further operations on the list are ignored -// and the entire list is copied over verbatim at the end. -struct ListTracker { - struct CrossListIndex { - uint32_t local; - uint32_t remote; - }; - - util::Optional insert(uint32_t local_index, size_t remote_list_size); - util::Optional update(uint32_t index); - void clear(); - bool move(uint32_t from, uint32_t to, size_t lst_size, uint32_t& remote_from_out, uint32_t& remote_to_out); - bool remove(uint32_t index, uint32_t& remote_index_out); - bool requires_manual_copy() const; - void queue_for_manual_copy(); - void mark_as_copied(); - -private: - std::vector m_indices_allowed; - bool m_requires_manual_copy = false; - bool m_has_been_copied = false; -}; - -struct InternDictKey { - bool is_null() const - { - return m_pos == realm::npos && m_size == realm::npos; - } - constexpr bool operator==(const InternDictKey& other) const noexcept - { - return m_pos == other.m_pos && m_size == other.m_size; - } - constexpr bool operator!=(const InternDictKey& other) const noexcept - { - return !operator==(other); - } - constexpr bool operator<(const InternDictKey& other) const noexcept - { - if (m_pos < other.m_pos) { - return true; - } - else if (m_pos == other.m_pos) { - return m_size < other.m_size; - } - return false; - } - -private: - friend struct InterningBuffer; - size_t m_pos = realm::npos; - size_t m_size = realm::npos; -}; - -struct InterningBuffer { - std::string_view get_key(const InternDictKey& key) const; - InternDictKey get_or_add(const std::string_view& str); - InternDictKey get_interned_key(const std::string_view& str) const; // throws if the str is not found - std::string print() const; - -private: - std::string m_dict_keys_buffer; - std::vector m_dict_keys; -}; - -// A wrapper around a PathInstruction which enables storing this path in a -// FlatMap or other container. The advantage of using this instead of a PathInstruction -// is the use of ColKey instead of column names and that because it is not possible to use -// the InternStrings of a PathInstruction because they are tied to a specific Changeset, -// while the ListPath can be used across multiple Changesets. -struct ListPath { - ListPath(TableKey table_key, ObjKey obj_key); - - struct Element { - explicit Element(size_t stable_ndx); - explicit Element(const InternDictKey& str); - explicit Element(ColKey key); - union { - InternDictKey intern_key; - size_t index; - ColKey col_key; - }; - enum class Type { - InternKey, - ListIndex, - ColumnKey, - } type; - - bool operator==(const Element& other) const noexcept; - bool operator!=(const Element& other) const noexcept; - bool operator<(const Element& other) const noexcept; - }; - - void append(const Element& item); - bool operator<(const ListPath& other) const noexcept; - bool operator==(const ListPath& other) const noexcept; - bool operator!=(const ListPath& other) const noexcept; - std::string path_to_string(Transaction& remote, const InterningBuffer& buffer); - - using const_iterator = typename std::vector::const_iterator; - using iterator = typename std::vector::iterator; - const_iterator begin() const noexcept - { - return m_path.begin(); - } - const_iterator end() const noexcept - { - return m_path.end(); - } - TableKey table_key() const noexcept - { - return m_table_key; - } - ObjKey obj_key() const noexcept - { - return m_obj_key; - } - -private: - std::vector m_path; - TableKey m_table_key; - ObjKey m_obj_key; -}; - -struct RecoverLocalChangesetsHandler : public sync::InstructionApplier { - RecoverLocalChangesetsHandler(Transaction& dest_wt, Transaction& frozen_pre_local_state, util::Logger& logger); - void process_changesets(const std::vector& changesets, - std::vector&& pending_subs); - -protected: - using Instruction = sync::Instruction; - using ListPathCallback = util::UniqueFunction; - - struct RecoveryResolver : public InstructionApplier::PathResolver { - RecoveryResolver(RecoverLocalChangesetsHandler* applier, Instruction::PathInstruction& instr, - const std::string_view& instr_name); - void on_property(Obj&, ColKey) override; - void on_list(LstBase&) override; - Status on_list_index(LstBase&, uint32_t) override; - void on_dictionary(Dictionary&) override; - Status on_dictionary_key(Dictionary&, Mixed) override; - void on_set(SetBase&) override; - void on_error(const std::string&) override; - void on_column_advance(ColKey) override; - void on_dict_key_advance(StringData) override; - Status on_list_index_advance(uint32_t) override; - Status on_null_link_advance(StringData, StringData) override; - Status on_begin(const util::Optional&) override; - void on_finish() override; - - ListPath& list_path(); - void set_last_path_index(uint32_t ndx); - - protected: - ListPath m_list_path; - Instruction::PathInstruction& m_mutable_instr; - RecoverLocalChangesetsHandler* m_recovery_applier; - }; - friend struct RecoveryResolver; - - REALM_NORETURN void handle_error(const std::string& message) const; - void copy_lists_with_unrecoverable_changes(); - - bool resolve_path(ListPath& path, Obj remote_obj, Obj local_obj, - util::UniqueFunction callback); - bool resolve(ListPath& path, util::UniqueFunction callback); - -#define REALM_DECLARE_INSTRUCTION_HANDLER(X) void operator()(const Instruction::X&) override; - REALM_FOR_EACH_INSTRUCTION_TYPE(REALM_DECLARE_INSTRUCTION_HANDLER) -#undef REALM_DECLARE_INSTRUCTION_HANDLER - friend struct sync::Instruction; // to allow visitor - -private: - Transaction& m_frozen_pre_local_state; - // Keeping the member variable reference to a logger since the lifetime of this class is - // only within the function that created it. - util::Logger& m_logger; - InterningBuffer m_intern_keys; - // Track any recovered operations on lists to make sure that they are allowed. - // If not, the lists here will be copied verbatim from the local state to the remote. - util::FlatMap m_lists; - Replication* m_replication; +struct RecoveredChange { + util::AppendBuffer encoded_changeset; + sync::ClientHistory::version_type version; }; +std::vector +process_recovered_changesets(Transaction& dest_tr, Transaction& pre_reset_state, util::Logger& logger, + const std::vector& changesets); } // namespace realm::_impl::client_reset #endif // REALM_NOINST_CLIENT_RESET_RECOVERY_HPP diff --git a/src/realm/sync/noinst/migration_store.cpp b/src/realm/sync/noinst/migration_store.cpp index 5149b76a578..7043b2343d6 100644 --- a/src/realm/sync/noinst/migration_store.cpp +++ b/src/realm/sync/noinst/migration_store.cpp @@ -305,7 +305,7 @@ Subscription MigrationStore::make_subscription(const std::string& object_class_n return Subscription{subscription_name, object_class_name, rql_query_string}; } -void MigrationStore::create_subscriptions(const SubscriptionStore& subs_store) +void MigrationStore::create_subscriptions(SubscriptionStore& subs_store) { std::unique_lock lock{m_mutex}; if (m_state != MigrationState::Migrated) { @@ -316,7 +316,7 @@ void MigrationStore::create_subscriptions(const SubscriptionStore& subs_store) create_subscriptions(subs_store, *m_query_string); } -void MigrationStore::create_subscriptions(const SubscriptionStore& subs_store, const std::string& rql_query_string) +void MigrationStore::create_subscriptions(SubscriptionStore& subs_store, const std::string& rql_query_string) { if (rql_query_string.empty()) { return; @@ -355,7 +355,7 @@ void MigrationStore::create_subscriptions(const SubscriptionStore& subs_store, c mut_sub.commit(); } -void MigrationStore::create_sentinel_subscription_set(const SubscriptionStore& subs_store) +void MigrationStore::create_sentinel_subscription_set(SubscriptionStore& subs_store) { std::lock_guard lock{m_mutex}; if (m_state != MigrationState::Migrated) { diff --git a/src/realm/sync/noinst/migration_store.hpp b/src/realm/sync/noinst/migration_store.hpp index 7b976b8c7d8..084d4391683 100644 --- a/src/realm/sync/noinst/migration_store.hpp +++ b/src/realm/sync/noinst/migration_store.hpp @@ -78,13 +78,13 @@ class MigrationStore : public std::enable_shared_from_this { // Create subscriptions for each table that does not have a subscription. // If new subscriptions are created, they are committed and a change of query is sent to the server. - void create_subscriptions(const SubscriptionStore& subs_store); - void create_subscriptions(const SubscriptionStore& subs_store, const std::string& rql_query_string); + void create_subscriptions(SubscriptionStore& subs_store); + void create_subscriptions(SubscriptionStore& subs_store, const std::string& rql_query_string); // Create a subscription set used as sentinel. No-op if not in 'Migrated' state. // This method is idempotent (i.e, at most one subscription set can be created during the lifetime of a // migration) - void create_sentinel_subscription_set(const SubscriptionStore& subs_store); + void create_sentinel_subscription_set(SubscriptionStore& subs_store); std::optional get_sentinel_subscription_set_version(); protected: @@ -126,4 +126,4 @@ class MigrationStore : public std::enable_shared_from_this { std::optional m_sentinel_subscription_set_version; }; -} // namespace realm::sync \ No newline at end of file +} // namespace realm::sync diff --git a/src/realm/sync/subscriptions.cpp b/src/realm/sync/subscriptions.cpp index 0b902f6ac0c..6b8b4adb5ba 100644 --- a/src/realm/sync/subscriptions.cpp +++ b/src/realm/sync/subscriptions.cpp @@ -168,7 +168,7 @@ Subscription::Subscription(util::Optional name, std::string object_ } -SubscriptionSet::SubscriptionSet(std::weak_ptr mgr, const Transaction& tr, const Obj& obj, +SubscriptionSet::SubscriptionSet(std::weak_ptr mgr, const Transaction& tr, const Obj& obj, MakingMutableCopy making_mutable_copy) : m_mgr(mgr) , m_cur_version(tr.get_version()) @@ -181,7 +181,7 @@ SubscriptionSet::SubscriptionSet(std::weak_ptr mgr, con } } -SubscriptionSet::SubscriptionSet(std::weak_ptr mgr, int64_t version, SupersededTag) +SubscriptionSet::SubscriptionSet(std::weak_ptr mgr, int64_t version, SupersededTag) : m_mgr(mgr) , m_version(version) , m_state(State::Superseded) @@ -202,7 +202,7 @@ void SubscriptionSet::load_from_database(const Obj& obj) } } -std::shared_ptr SubscriptionSet::get_flx_subscription_store() const +std::shared_ptr SubscriptionSet::get_flx_subscription_store() const { if (auto mgr = m_mgr.lock()) { return mgr; @@ -273,12 +273,10 @@ const Subscription* SubscriptionSet::find(const Query& query) const return nullptr; } -MutableSubscriptionSet::MutableSubscriptionSet(std::weak_ptr mgr, TransactionRef tr, Obj obj, - MakingMutableCopy making_mutable_copy) - : SubscriptionSet(mgr, *tr, obj, making_mutable_copy) +MutableSubscriptionSet::MutableSubscriptionSet(std::weak_ptr mgr, TransactionRef tr, Obj obj) + : SubscriptionSet(mgr, *tr, obj, MakingMutableCopy{true}) , m_tr(std::move(tr)) , m_obj(std::move(obj)) - , m_old_state(state()) { } @@ -408,55 +406,21 @@ std::pair MutableSubscriptionSet::insert_or_ass return insert_or_assign_impl(it, util::none, std::move(table_name), std::move(query_str)); } -void MutableSubscriptionSet::import(const SubscriptionSet& src_subs) +void MutableSubscriptionSet::import(SubscriptionSet&& src_subs) { - clear(); - for (const Subscription& sub : src_subs) { - insert_sub(sub); - } + check_is_mutable(); + SubscriptionSet::import(std::move(src_subs)); } -void MutableSubscriptionSet::update_state(State new_state, util::Optional error_str) +void SubscriptionSet::import(SubscriptionSet&& src_subs) { - check_is_mutable(); - auto old_state = state(); - if (error_str && new_state != State::Error) { - throw LogicError(ErrorCodes::InvalidArgument, - "Cannot supply an error message for a subscription set when state is not Error"); - } - switch (new_state) { - case State::Uncommitted: - throw LogicError(ErrorCodes::InvalidArgument, "cannot set subscription set state to uncommitted"); - - case State::Error: - if (old_state != State::Bootstrapping && old_state != State::Pending && old_state != State::Uncommitted) { - throw LogicError(ErrorCodes::InvalidArgument, - "subscription set must be in Bootstrapping or Pending to update state to error"); - } - if (!error_str) { - throw LogicError(ErrorCodes::InvalidArgument, - "Must supply an error message when setting a subscription to the error state"); - } + m_subs = std::move(src_subs.m_subs); +} - m_state = new_state; - m_error_str = std::string{*error_str}; - break; - case State::Bootstrapping: - [[fallthrough]]; - case State::AwaitingMark: - m_state = new_state; - break; - case State::Complete: { - auto mgr = get_flx_subscription_store(); // Throws - m_state = new_state; - mgr->supercede_prior_to(m_tr, version()); - break; - } - case State::Superseded: - throw LogicError(ErrorCodes::InvalidArgument, "Cannot set a subscription to the superseded state"); - case State::Pending: - throw LogicError(ErrorCodes::InvalidArgument, "Cannot set subscription set to the pending state"); - } +void MutableSubscriptionSet::set_state(State new_state) +{ + REALM_ASSERT(m_state == State::Uncommitted); + m_state = new_state; } MutableSubscriptionSet SubscriptionSet::make_mutable_copy() const @@ -478,7 +442,7 @@ util::Future SubscriptionSet::get_state_change_notificat auto mgr = get_flx_subscription_store(); // Throws util::CheckedLockGuard lk(mgr->m_pending_notifications_mutex); - // If we've already been superceded by another version getting completed, then we should skip registering + // If we've already been superseded by another version getting completed, then we should skip registering // a notification because it may never fire. if (mgr->m_min_outstanding_version > version()) { return util::Future::make_ready(State::Superseded); @@ -522,30 +486,27 @@ void SubscriptionSet::get_state_change_notification( }); } -void MutableSubscriptionSet::process_notifications() +void SubscriptionStore::process_notifications(State new_state, int64_t version, std::string_view error_str) { - auto mgr = get_flx_subscription_store(); // Throws - auto new_state = state(); - std::list to_finish; { - util::CheckedLockGuard lk(mgr->m_pending_notifications_mutex); - splice_if(mgr->m_pending_notifications, to_finish, [&](auto& req) { - return (req.version == m_version && + util::CheckedLockGuard lk(m_pending_notifications_mutex); + splice_if(m_pending_notifications, to_finish, [&](auto& req) { + return (req.version == version && (new_state == State::Error || state_to_order(new_state) >= state_to_order(req.notify_when))) || - (new_state == State::Complete && req.version < m_version); + (new_state == State::Complete && req.version < version); }); if (new_state == State::Complete) { - mgr->m_min_outstanding_version = m_version; + m_min_outstanding_version = version; } } for (auto& req : to_finish) { - if (new_state == State::Error && req.version == m_version) { - req.promise.set_error({ErrorCodes::SubscriptionFailed, std::string_view(error_str())}); + if (new_state == State::Error && req.version == version) { + req.promise.set_error({ErrorCodes::SubscriptionFailed, error_str}); } - else if (req.version < m_version) { + else if (req.version < version) { req.promise.emplace_value(State::Superseded); } else { @@ -557,30 +518,27 @@ void MutableSubscriptionSet::process_notifications() SubscriptionSet MutableSubscriptionSet::commit() { if (m_tr->get_transact_stage() != DB::transact_Writing) { - throw RuntimeError(ErrorCodes::WrongTransactionState, "SubscriptionSet is not in a commitable state"); + throw LogicError(ErrorCodes::WrongTransactionState, "SubscriptionSet has already been committed"); } auto mgr = get_flx_subscription_store(); // Throws - if (m_old_state == State::Uncommitted) { - if (m_state == State::Uncommitted) { - m_state = State::Pending; - } - m_obj.set(mgr->m_sub_set_snapshot_version, static_cast(m_tr->get_version())); - - auto obj_sub_list = m_obj.get_linklist(mgr->m_sub_set_subscriptions); - obj_sub_list.clear(); - for (const auto& sub : m_subs) { - auto new_sub = - obj_sub_list.create_and_insert_linked_object(obj_sub_list.is_empty() ? 0 : obj_sub_list.size()); - new_sub.set(mgr->m_sub_id, sub.id); - new_sub.set(mgr->m_sub_created_at, sub.created_at); - new_sub.set(mgr->m_sub_updated_at, sub.updated_at); - if (sub.name) { - new_sub.set(mgr->m_sub_name, StringData(*sub.name)); - } - new_sub.set(mgr->m_sub_object_class_name, StringData(sub.object_class_name)); - new_sub.set(mgr->m_sub_query_str, StringData(sub.query_string)); + if (m_state == State::Uncommitted) { + m_state = State::Pending; + } + m_obj.set(mgr->m_sub_set_snapshot_version, static_cast(m_tr->get_version())); + + auto obj_sub_list = m_obj.get_linklist(mgr->m_sub_set_subscriptions); + obj_sub_list.clear(); + for (const auto& sub : m_subs) { + auto new_sub = obj_sub_list.create_and_insert_linked_object(obj_sub_list.size()); + new_sub.set(mgr->m_sub_id, sub.id); + new_sub.set(mgr->m_sub_created_at, sub.created_at); + new_sub.set(mgr->m_sub_updated_at, sub.updated_at); + if (sub.name) { + new_sub.set(mgr->m_sub_name, StringData(*sub.name)); } + new_sub.set(mgr->m_sub_object_class_name, StringData(sub.object_class_name)); + new_sub.set(mgr->m_sub_query_str, StringData(sub.query_string)); } m_obj.set(mgr->m_sub_set_state, state_to_storage(m_state)); if (!m_error_str.empty()) { @@ -590,7 +548,7 @@ SubscriptionSet MutableSubscriptionSet::commit() const auto flx_version = version(); m_tr->commit_and_continue_as_read(); - process_notifications(); + mgr->process_notifications(m_state, flx_version, std::string_view(error_str())); return mgr->get_refreshed(m_obj.get_key(), flx_version, m_tr->get_version_of_current_transaction()); } @@ -720,7 +678,7 @@ void SubscriptionStore::initialize_subscriptions_table(TransactionRef&& tr, bool } } -SubscriptionSet SubscriptionStore::get_latest() const +SubscriptionSet SubscriptionStore::get_latest() { auto tr = m_db->start_frozen(); auto sub_sets = tr->get_table(m_sub_set_table); @@ -733,13 +691,13 @@ SubscriptionSet SubscriptionStore::get_latest() const return SubscriptionSet(weak_from_this(), *tr, latest_obj); } -SubscriptionSet SubscriptionStore::get_active() const +SubscriptionSet SubscriptionStore::get_active() { auto tr = m_db->start_frozen(); return SubscriptionSet(weak_from_this(), *tr, get_active(*tr)); } -Obj SubscriptionStore::get_active(const Transaction& tr) const +Obj SubscriptionStore::get_active(const Transaction& tr) { auto sub_sets = tr.get_table(m_sub_set_table); // There should always be at least one SubscriptionSet - the zeroth subscription set for schema instructions. @@ -818,7 +776,7 @@ SubscriptionStore::get_next_pending_version(int64_t last_query_version) const return PendingSubscription{query_version, static_cast(snapshot_version)}; } -std::vector SubscriptionStore::get_pending_subscriptions() const +std::vector SubscriptionStore::get_pending_subscriptions() { std::vector subscriptions_to_recover; auto active_sub = get_active(); @@ -859,18 +817,60 @@ void SubscriptionStore::terminate() } } -MutableSubscriptionSet SubscriptionStore::get_mutable_by_version(int64_t version_id) +void SubscriptionStore::update_state(int64_t version, State new_state, std::optional error_str) { + REALM_ASSERT(error_str.has_value() == (new_state == State::Error)); + REALM_ASSERT(new_state != State::Pending); + REALM_ASSERT(new_state != State::Superseded); + auto tr = m_db->start_write(); auto sub_sets = tr->get_table(m_sub_set_table); - auto obj = sub_sets->get_object_with_primary_key(Mixed{version_id}); + auto obj = sub_sets->get_object_with_primary_key(version); if (!obj) { - throw KeyNotFound(util::format("Subscription set with version %1 not found", version_id)); + // This can happen either due to a bug in the sync client or due to the + // server sending us an error message for an invalid query version. We + // assume it is the latter here. + throw RuntimeError(ErrorCodes::SyncProtocolInvariantFailed, + util::format("Invalid state update for nonexistent query version %1", version)); } - return MutableSubscriptionSet(weak_from_this(), std::move(tr), obj); + + auto old_state = state_from_storage(obj.get(m_sub_set_state)); + switch (new_state) { + case State::Error: + if (old_state == State::Complete) { + throw RuntimeError(ErrorCodes::SyncProtocolInvariantFailed, + util::format("Received error '%1' for already-completed query version %2. This " + "may be due to a queryable field being removed in the server-side " + "configuration making the previous subscription set no longer valid.", + *error_str, version)); + } + break; + + case State::Bootstrapping: + case State::AwaitingMark: + REALM_ASSERT(old_state != State::Complete); + REALM_ASSERT(old_state != State::Error); + break; + + case State::Complete: + supercede_prior_to(tr, version); + break; + + case State::Uncommitted: + case State::Superseded: + case State::Pending: + REALM_TERMINATE("Illegal new state for subscription set"); + } + + obj.set(m_sub_set_state, state_to_storage(new_state)); + obj.set(m_sub_set_error_str, error_str ? StringData(*error_str) : StringData()); + + tr->commit(); + + process_notifications(new_state, version, error_str.value_or(std::string_view{})); } -SubscriptionSet SubscriptionStore::get_by_version(int64_t version_id) const +SubscriptionSet SubscriptionStore::get_by_version(int64_t version_id) { auto tr = m_db->start_frozen(); auto sub_sets = tr->get_table(m_sub_set_table); @@ -885,8 +885,7 @@ SubscriptionSet SubscriptionStore::get_by_version(int64_t version_id) const throw KeyNotFound(util::format("Subscription set with version %1 not found", version_id)); } -SubscriptionSet SubscriptionStore::get_refreshed(ObjKey key, int64_t version, - std::optional db_version) const +SubscriptionSet SubscriptionStore::get_refreshed(ObjKey key, int64_t version, std::optional db_version) { auto tr = m_db->start_frozen(db_version.value_or(VersionID{})); auto sub_sets = tr->get_table(m_sub_set_table); @@ -923,7 +922,7 @@ void SubscriptionStore::supercede_prior_to(TransactionRef tr, int64_t version_id remove_query.remove(); } -MutableSubscriptionSet SubscriptionStore::make_mutable_copy(const SubscriptionSet& set) const +MutableSubscriptionSet SubscriptionStore::make_mutable_copy(const SubscriptionSet& set) { auto new_tr = m_db->start_write(); @@ -931,8 +930,7 @@ MutableSubscriptionSet SubscriptionStore::make_mutable_copy(const SubscriptionSe auto new_pk = sub_sets->max(sub_sets->get_primary_key_column())->get_int() + 1; MutableSubscriptionSet new_set_obj(weak_from_this(), std::move(new_tr), - sub_sets->create_object_with_primary_key(Mixed{new_pk}), - SubscriptionSet::MakingMutableCopy{true}); + sub_sets->create_object_with_primary_key(Mixed{new_pk})); for (const auto& sub : set) { new_set_obj.insert_sub(sub); } @@ -973,4 +971,25 @@ int64_t SubscriptionStore::set_active_as_latest(Transaction& wt) return version; } +int64_t SubscriptionStore::mark_active_as_complete(Transaction& wt) +{ + auto active = get_active(wt); + active.set(m_sub_set_state, state_to_storage(State::Complete)); + auto version = active.get_primary_key().get_int(); + + std::list to_finish; + { + util::CheckedLockGuard lock(m_pending_notifications_mutex); + splice_if(m_pending_notifications, to_finish, [&](auto& req) { + return req.version == version && state_to_order(req.notify_when) <= state_to_order(State::Complete); + }); + } + + for (auto& req : to_finish) { + req.promise.emplace_value(State::Complete); + } + + return version; +} + } // namespace realm::sync diff --git a/src/realm/sync/subscriptions.hpp b/src/realm/sync/subscriptions.hpp index c1ce7f7344d..636b4164631 100644 --- a/src/realm/sync/subscriptions.hpp +++ b/src/realm/sync/subscriptions.hpp @@ -194,16 +194,17 @@ class SubscriptionSet { }; using MakingMutableCopy = util::TaggedBool; - explicit SubscriptionSet(std::weak_ptr mgr, int64_t version, SupersededTag); - explicit SubscriptionSet(std::weak_ptr mgr, const Transaction& tr, const Obj& obj, + explicit SubscriptionSet(std::weak_ptr mgr, int64_t version, SupersededTag); + explicit SubscriptionSet(std::weak_ptr mgr, const Transaction& tr, const Obj& obj, MakingMutableCopy making_mutable_copy = false); void load_from_database(const Obj& obj); + void import(SubscriptionSet&&); // Get a reference to the SubscriptionStore. It may briefly extend the lifetime of the store. - std::shared_ptr get_flx_subscription_store() const; + std::shared_ptr get_flx_subscription_store() const; - std::weak_ptr m_mgr; + std::weak_ptr m_mgr; DB::version_type m_cur_version = 0; int64_t m_version = 0; @@ -241,7 +242,7 @@ class MutableSubscriptionSet : public SubscriptionSet { // will have std::pair insert_or_assign(const Query& query); - void import(const SubscriptionSet&); + void import(SubscriptionSet&&); // Erases a subscription pointed to by an iterator. Returns the "next" iterator in the set - to provide // STL compatibility. The SubscriptionSet must be in the Uncommitted state to call this - otherwise @@ -255,27 +256,20 @@ class MutableSubscriptionSet : public SubscriptionSet { bool erase_by_class_name(StringData object_class_name); bool erase_by_id(ObjectId id); - // Updates the state of the transaction and optionally updates its error information. - // - // You may only set an error_str when the State is State::Error. - // - // If set to State::Complete, this will erase all subscription sets with a version less than this one's. - // - // This should be called internally within the sync client. - void update_state(State state, util::Optional error_str = util::none); - // This commits any changes to the subscription set and returns an this subscription set as an immutable view // from after the commit. This MutableSubscriptionSet object must not be used after calling commit(). SubscriptionSet commit(); + // For testing and internal usage only. + void set_state(State new_state); + protected: friend class SubscriptionStore; // Allow the MigrationStore access to insert_sub because it cannot use insert_or_assign due to having the query as // a string and not a Query object. friend class MigrationStore; - MutableSubscriptionSet(std::weak_ptr mgr, TransactionRef tr, Obj obj, - MakingMutableCopy making_mutable_copy = MakingMutableCopy{false}); + MutableSubscriptionSet(std::weak_ptr mgr, TransactionRef tr, Obj obj); void insert_sub(const Subscription& sub); @@ -291,11 +285,8 @@ class MutableSubscriptionSet : public SubscriptionSet { void insert_sub_impl(ObjectId id, Timestamp created_at, Timestamp updated_at, StringData name, StringData object_class_name, StringData query_str); - void process_notifications(); - TransactionRef m_tr; Obj m_obj; - State m_old_state; }; class SubscriptionStore; @@ -312,12 +303,12 @@ class SubscriptionStore : public std::enable_shared_from_this // Get the latest subscription created by calling update_latest(). Once bootstrapping is complete, // this and get_active() will return the same thing. If no SubscriptionSet has been set, then // this returns an empty SubscriptionSet that you can clone() in order to mutate. - SubscriptionSet get_latest() const; + SubscriptionSet get_latest(); // Gets the subscription set that has been acknowledged by the server as having finished bootstrapping. // If no subscriptions have reached the complete stage, this returns an empty subscription with version // zero. - SubscriptionSet get_active() const; + SubscriptionSet get_active(); struct VersionInfo { int64_t latest; @@ -328,13 +319,9 @@ class SubscriptionStore : public std::enable_shared_from_this // that the versions will be read from the same underlying transaction and will thus be consistent. VersionInfo get_version_info() const; - // To be used internally by the sync client. This returns a mutable view of a subscription set by its - // version ID. If there is no SubscriptionSet with that version ID, this throws KeyNotFound. - MutableSubscriptionSet get_mutable_by_version(int64_t version_id); - // To be used internally by the sync client. This returns a read-only view of a subscription set by its // version ID. If there is no SubscriptionSet with that version ID, this throws KeyNotFound. - SubscriptionSet get_by_version(int64_t version_id) const REQUIRES(!m_pending_notifications_mutex); + SubscriptionSet get_by_version(int64_t version_id) REQUIRES(!m_pending_notifications_mutex); // Returns true if there have been commits to the DB since the given version bool would_refresh(DB::version_type version) const noexcept; @@ -348,7 +335,20 @@ class SubscriptionStore : public std::enable_shared_from_this }; util::Optional get_next_pending_version(int64_t last_query_version) const; - std::vector get_pending_subscriptions() const REQUIRES(!m_pending_notifications_mutex); + std::vector get_pending_subscriptions() REQUIRES(!m_pending_notifications_mutex); + + // Update the state of an existing subscription set. `error_str` must be set + // if the new state is Error, and must not be set otherwise. Many state + // transitions are not legal; see the state diagram on SubscriptionSet. + // + // If set to State::Complete, this will erase all subscription sets with a + // version less than the given one. + // + // This should only be called internally within the sync client. + void update_state(int64_t version_id, SubscriptionSet::State state, + std::optional error_str = util::none) + REQUIRES(!m_pending_notifications_mutex); + int64_t mark_active_as_complete(Transaction& wt) REQUIRES(!m_pending_notifications_mutex); // Notify all subscription state change notification handlers on this subscription store with the // provided Status - this does not change the state of any pending subscriptions. @@ -386,12 +386,13 @@ class SubscriptionStore : public std::enable_shared_from_this SubscriptionSet::State notify_when; }; + void process_notifications(State new_state, int64_t version, std::string_view error_str) + REQUIRES(!m_pending_notifications_mutex); void supercede_prior_to(TransactionRef tr, int64_t version_id) const; - Obj get_active(const Transaction& tr) const; - SubscriptionSet get_refreshed(ObjKey, int64_t flx_version, - std::optional version = util::none) const; - MutableSubscriptionSet make_mutable_copy(const SubscriptionSet& set) const; + Obj get_active(const Transaction& tr); + SubscriptionSet get_refreshed(ObjKey, int64_t flx_version, std::optional version = util::none); + MutableSubscriptionSet make_mutable_copy(const SubscriptionSet& set); // Ensure the subscriptions table is properly initialized // If clear_table is true, the subscriptions table will be cleared before initialization @@ -416,9 +417,9 @@ class SubscriptionStore : public std::enable_shared_from_this ColKey m_sub_set_error_str; ColKey m_sub_set_subscriptions; - mutable util::CheckedMutex m_pending_notifications_mutex; - mutable int64_t m_min_outstanding_version GUARDED_BY(m_pending_notifications_mutex) = 0; - mutable std::list m_pending_notifications GUARDED_BY(m_pending_notifications_mutex); + util::CheckedMutex m_pending_notifications_mutex; + int64_t m_min_outstanding_version GUARDED_BY(m_pending_notifications_mutex) = 0; + std::list m_pending_notifications GUARDED_BY(m_pending_notifications_mutex); }; } // namespace realm::sync diff --git a/src/realm/util/future.hpp b/src/realm/util/future.hpp index da22772f8cf..efe7e4a17ee 100644 --- a/src/realm/util/future.hpp +++ b/src/realm/util/future.hpp @@ -844,7 +844,7 @@ class REALM_NODISCARD future_details::Future { } } - /* + /** * Callbacks passed to on_completion() are always called with a StatusWith when the input future completes. */ template diff --git a/test/object-store/benchmarks/client_reset.cpp b/test/object-store/benchmarks/client_reset.cpp index a06252e0ea9..93a83fe99fb 100644 --- a/test/object-store/benchmarks/client_reset.cpp +++ b/test/object-store/benchmarks/client_reset.cpp @@ -144,8 +144,7 @@ struct BenchmarkLocalClientReset : public reset_utils::TestClientReset { auto history_local = dynamic_cast(wt_local.get_replication()->_get_history_write()); std::vector local_changes = history_local->get_local_changes(current_local_version.version); - _impl::client_reset::RecoverLocalChangesetsHandler handler{wt_remote, frozen_local, logger}; - handler.process_changesets(local_changes, {}); // throws on error + _impl::client_reset::process_recovered_changesets(wt_remote, frozen_local, logger, local_changes); } _impl::client_reset::transfer_group(wt_remote, wt_local, logger, m_mode == ClientResyncMode::Recover); if (m_on_post_reset) { diff --git a/test/object-store/sync/client_reset.cpp b/test/object-store/sync/client_reset.cpp index e6d61a4710f..914585ead7e 100644 --- a/test/object-store/sync/client_reset.cpp +++ b/test/object-store/sync/client_reset.cpp @@ -1866,73 +1866,64 @@ TEST_CASE("sync: Client reset during async open", "[sync][pbs][client reset][baa TestAppSession test_app_session(create_app(server_app_config)); auto app = test_app_session.app(); - auto before_callback_called = util::make_promise_future(); - auto after_callback_called = util::make_promise_future(); create_user_and_log_in(app); SyncTestFile realm_config(app->current_user(), partition.value, std::nullopt, [](std::shared_ptr, SyncError) { /*noop*/ }); realm_config.sync_config->client_resync_mode = ClientResyncMode::Recover; - realm_config.sync_config->on_sync_client_event_hook = - [&, client_reset_triggered = false](std::weak_ptr weak_sess, - const SyncClientHookData& event_data) mutable { - auto sess = weak_sess.lock(); - if (!sess) { - return SyncClientHookAction::NoAction; - } - if (sess->path() != realm_config.path) { - return SyncClientHookAction::NoAction; - } + bool client_reset_triggered = false; + realm_config.sync_config->on_sync_client_event_hook = [&](std::weak_ptr weak_sess, + const SyncClientHookData& event_data) mutable { + auto sess = weak_sess.lock(); + if (!sess) { + return SyncClientHookAction::NoAction; + } + if (sess->path() != realm_config.path) { + return SyncClientHookAction::NoAction; + } - if (event_data.event != SyncClientHookEvent::DownloadMessageReceived) { - return SyncClientHookAction::NoAction; - } + if (event_data.event != SyncClientHookEvent::DownloadMessageReceived) { + return SyncClientHookAction::NoAction; + } - if (client_reset_triggered) { - return SyncClientHookAction::NoAction; - } - client_reset_triggered = true; - reset_utils::trigger_client_reset(test_app_session.app_session()); - return SyncClientHookAction::EarlyReturn; - }; + if (client_reset_triggered) { + return SyncClientHookAction::NoAction; + } + client_reset_triggered = true; + reset_utils::trigger_client_reset(test_app_session.app_session(), *sess); + return SyncClientHookAction::SuspendWithRetryableError; + }; // Expected behaviour is that the frozen realm passed in the callback should have no // schema initialized if a client reset happens during an async open and the realm has never been opened before. // SDK's should handle any edge cases which require the use of a schema i.e // calling set_schema_subset(...) - realm_config.sync_config->notify_before_client_reset = - [promise = util::CopyablePromiseHolder(std::move(before_callback_called.promise))]( - std::shared_ptr realm) mutable { - CHECK(realm->schema_version() == ObjectStore::NotVersioned); - promise.get_promise().emplace_value(); - }; + auto before_callback_called = util::make_promise_future(); + realm_config.sync_config->notify_before_client_reset = [&](std::shared_ptr realm) { + CHECK(realm->schema_version() == ObjectStore::NotVersioned); + before_callback_called.promise.emplace_value(); + }; - realm_config.sync_config->notify_after_client_reset = - [promise = util::CopyablePromiseHolder(std::move(after_callback_called.promise))]( - std::shared_ptr realm, ThreadSafeReference, bool) mutable { - CHECK(realm->schema_version() == ObjectStore::NotVersioned); - promise.get_promise().emplace_value(); - }; + auto after_callback_called = util::make_promise_future(); + realm_config.sync_config->notify_after_client_reset = [&](std::shared_ptr realm, ThreadSafeReference, + bool) { + CHECK(realm->schema_version() == ObjectStore::NotVersioned); + after_callback_called.promise.emplace_value(); + }; auto realm_task = Realm::get_synchronized_realm(realm_config); auto realm_pf = util::make_promise_future(); - realm_task->start([promise_holder = util::CopyablePromiseHolder(std::move(realm_pf.promise))]( - ThreadSafeReference ref, std::exception_ptr ex) mutable { - auto promise = promise_holder.get_promise(); - if (ex) { - try { + realm_task->start([&](ThreadSafeReference ref, std::exception_ptr ex) { + try { + if (ex) { std::rethrow_exception(ex); } - catch (...) { - promise.set_error(exception_to_status()); - } - return; + auto realm = Realm::get_shared_realm(std::move(ref)); + realm_pf.promise.emplace_value(std::move(realm)); } - auto realm = Realm::get_shared_realm(std::move(ref)); - if (!realm) { - promise.set_error({ErrorCodes::RuntimeError, "could not get realm from threadsaferef"}); + catch (...) { + realm_pf.promise.set_error(exception_to_status()); } - promise.emplace_value(std::move(realm)); }); auto realm = realm_pf.future.get(); before_callback_called.future.get(); @@ -3696,16 +3687,14 @@ TEST_CASE("client reset with embedded object", "[sync][pbs][client reset][embedd reset_embedded_object({local}, {remote}, expected_recovered); } SECTION("local ArraySet to an embedded object through a deep link->linklist element which is removed by the " - "remote " - "triggers a list copy") { + "remote triggers a list copy") { local.link_value->array_vals[0] = 12345; remote.link_value->array_vals.erase(remote.link_value->array_vals.begin()); TopLevelContent expected_recovered = local; reset_embedded_object({local}, {remote}, expected_recovered); } SECTION("local ArrayErase to an embedded object through a deep link->linklist element which is removed by " - "the remote " - "triggers a list copy") { + "the remote triggers a list copy") { local.link_value->array_vals.erase(local.link_value->array_vals.begin()); remote.link_value->array_vals.clear(); TopLevelContent expected_recovered = local; @@ -3833,21 +3822,19 @@ TEST_CASE("client reset with embedded object", "[sync][pbs][client reset][embedd expected_recovered.dict_values["foo"]->array_vals = local.dict_values["foo"]->array_vals; reset_embedded_object({local}, {remote}, expected_recovered); } - std::vector keys = {"new key", "", "\0"}; - for (auto key : keys) { - SECTION(util::format("both add the same dictionary key: '%1'", key)) { - EmbeddedContent new_local, new_remote; - local.dict_values[key] = new_local; - remote.dict_values[key] = new_remote; - TopLevelContent expected_recovered = remote; - expected_recovered.dict_values[key]->apply_recovery_from(*local.dict_values[key]); - // a verbatim list copy is triggered by modifications to items which were not just inserted - expected_recovered.dict_values[key]->array_vals = local.dict_values[key]->array_vals; - expected_recovered.dict_values[key]->second_level = local.dict_values[key]->second_level; - reset_embedded_object({local}, {remote}, expected_recovered); - } + SECTION("both add the same dictionary key") { + auto key = GENERATE("new key", "", "\0"); + EmbeddedContent new_local, new_remote; + local.dict_values[key] = new_local; + remote.dict_values[key] = new_remote; + TopLevelContent expected_recovered = remote; + expected_recovered.dict_values[key]->apply_recovery_from(*local.dict_values[key]); + // a verbatim list copy is triggered by modifications to items which were not just inserted + expected_recovered.dict_values[key]->array_vals = local.dict_values[key]->array_vals; + expected_recovered.dict_values[key]->second_level = local.dict_values[key]->second_level; + reset_embedded_object({local}, {remote}, expected_recovered); } - SECTION("deep modifications to inserted and swaped list items are recovered") { + SECTION("deep modifications to inserted and swapped list items are recovered") { EmbeddedContent local_added_at_begin, local_added_at_end, local_added_before_end, remote_added; size_t list_end = initial.array_values.size(); test_reset @@ -3857,14 +3844,14 @@ TEST_CASE("client reset with embedded object", "[sync][pbs][client reset][embedd auto embedded = list.create_and_insert_linked_object(0); local_added_at_begin.assign_to(embedded); embedded = list.create_and_insert_linked_object(list_end - 1); - local_added_before_end.assign_to(embedded); // this item is needed here so that move does not - // trigger a copy of the list + // this item is needed here so that move does not trigger a copy of the list + local_added_before_end.assign_to(embedded); embedded = list.create_and_insert_linked_object(list_end); local_added_at_end.assign_to(embedded); local->commit_transaction(); local->begin_transaction(); - list.swap(0, - list_end); // generates two move instructions, move(0, list_end), move(list_end - 1, 0) + // generates two move instructions, move(0, list_end), move(list_end - 1, 0) + list.swap(0, list_end); local->commit_transaction(); local->begin_transaction(); local_added_at_end.name = "should be at begin now"; diff --git a/test/object-store/sync/flx_sync.cpp b/test/object-store/sync/flx_sync.cpp index 7c8088a2086..9866213345c 100644 --- a/test/object-store/sync/flx_sync.cpp +++ b/test/object-store/sync/flx_sync.cpp @@ -476,7 +476,7 @@ TEST_CASE("flx: client reset", "[sync][flx][client reset][baas]") { return resulting_set; }; - auto add_invalid_subscription = [&](SharedRealm realm) -> sync::SubscriptionSet { + auto add_invalid_subscription = [](SharedRealm realm) -> sync::SubscriptionSet { auto table = realm->read_group().get_table("class_TopLevel"); auto queryable_str_field = table->get_column_key("non_queryable_field"); auto sub_set = realm->get_latest_subscription_set().make_mutable_copy(); @@ -797,7 +797,7 @@ TEST_CASE("flx: client reset", "[sync][flx][client reset][baas]") { // Any schema discrepancies are caught by the initial diff, so the way to make a recovery fail here is // to add and remove a column at the core level such that the schema diff passes, but instructions are // generated which will fail when applied. - reset_utils::TestClientReset::Callback make_local_changes_that_will_fail = [&](SharedRealm local_realm) { + auto make_local_changes_that_will_fail = [&](SharedRealm local_realm) { subscribe_to_and_add_objects(local_realm, num_objects_added_before); auto table = local_realm->read_group().get_table("class_TopLevel"); REQUIRE(table->size() == num_objects_added_before + num_objects_added_by_harness); @@ -810,29 +810,40 @@ TEST_CASE("flx: client reset", "[sync][flx][client reset][baas]") { subscribe_to_and_add_objects(local_realm, num_objects_added_after); // these are lost! }; - reset_utils::TestClientReset::Callback verify_post_reset_state = [&, err_future = std::move(error_future)]( - SharedRealm local_realm) mutable { - auto sync_error = wait_for_future(std::move(err_future)).get(); + VersionID expected_version; + + auto store_pre_reset_state = [&](SharedRealm local_realm) { + expected_version = local_realm->read_transaction_version(); + }; + + auto verify_post_reset_state = [&, err_future = std::move(error_future)](SharedRealm local_realm) { + auto sync_error = err_future.get(); REQUIRE(before_reset_count == 1); REQUIRE(after_reset_count == 0); REQUIRE(sync_error.status == ErrorCodes::AutoClientResetFailed); REQUIRE(sync_error.is_client_reset_requested()); + + // All changes should have been rolled back when recovery hit remove_column(), + // leaving the Realm in the pre-reset state local_realm->refresh(); auto table = local_realm->read_group().get_table("class_TopLevel"); ColKey added = table->get_column_key(added_property_name); - REQUIRE(!added); // partial recovery halted at remove_column() but rolled back everything in the change - // table is missing num_objects_added_after and the last commit after the latest subscription - // this is due to how recovery batches together changesets up until a subscription - const size_t expected_added_objects = num_objects_added_before - 1; - REQUIRE(table->size() == expected_added_objects + num_objects_added_by_harness); + REQUIRE(!added); + const size_t expected_added_objects = num_objects_added_before + num_objects_added_after; + REQUIRE(table->size() == num_objects_added_by_harness + expected_added_objects); size_t count_of_valid_array_data = validate_integrity_of_arrays(table); REQUIRE(count_of_valid_array_data == expected_added_objects); + + // The attempted client reset should have been recorded so that we + // don't attempt it again + REQUIRE(local_realm->read_transaction_version().version == expected_version.version + 1); }; SECTION("Recover: unsuccessful recovery leads to a manual reset") { config_local.sync_config->client_resync_mode = ClientResyncMode::Recover; auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session()); - test_reset->make_local_changes(std::move(make_local_changes_that_will_fail)) + test_reset->make_local_changes(make_local_changes_that_will_fail) + ->on_post_local_changes(store_pre_reset_state) ->on_post_reset(std::move(verify_post_reset_state)) ->run(); RealmConfig config_copy = config_local; @@ -849,7 +860,8 @@ TEST_CASE("flx: client reset", "[sync][flx][client reset][baas]") { SECTION("RecoverOrDiscard: unsuccessful reapply leads to discard") { config_local.sync_config->client_resync_mode = ClientResyncMode::RecoverOrDiscard; auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session()); - test_reset->make_local_changes(std::move(make_local_changes_that_will_fail)) + test_reset->make_local_changes(make_local_changes_that_will_fail) + ->on_post_local_changes(store_pre_reset_state) ->on_post_reset(std::move(verify_post_reset_state)) ->run(); @@ -918,7 +930,7 @@ TEST_CASE("flx: client reset", "[sync][flx][client reset][baas]") { ->run(); } - SECTION("DiscardLocal: an invalid subscription made while offline becomes superceeded") { + SECTION("DiscardLocal: an invalid subscription made while offline becomes superseded") { config_local.sync_config->client_resync_mode = ClientResyncMode::DiscardLocal; auto&& [reset_future, reset_handler] = make_client_reset_handler(); config_local.sync_config->notify_after_client_reset = reset_handler; @@ -1304,48 +1316,47 @@ TEST_CASE("flx: client reset", "[sync][flx][client reset][baas]") { SECTION("Recover: additive schema changes without dev mode produce an error after client reset") { const AppSession& app_session = harness.session().app_session(); app_session.admin_api.set_development_mode_to(app_session.server_app_id, true); - { - seed_realm(config_local, ResetMode::InitiateClientReset); - // Disable dev mode so that schema changes are not allowed - app_session.admin_api.set_development_mode_to(app_session.server_app_id, false); - std::vector changed_schema = make_additive_changes(schema); - config_local.schema = changed_schema; - config_local.sync_config->client_resync_mode = ClientResyncMode::Recover; - (void)setup_reset_handlers_for_schema_validation(config_local, changed_schema); - auto&& [error_future, err_handler] = make_error_handler(); - config_local.sync_config->error_handler = err_handler; - async_open_realm(config_local, [&](ThreadSafeReference&& ref, std::exception_ptr error) { - REQUIRE(ref); - REQUIRE_FALSE(error); - auto realm = Realm::get_shared_realm(std::move(ref)); - // make changes to the new property - realm->begin_transaction(); - auto table = realm->read_group().get_table("class_TopLevel"); - ColKey new_col = table->get_column_key("added_oid_field"); - REQUIRE(new_col); - for (auto it = table->begin(); it != table->end(); ++it) { - it->set(new_col, ObjectId::gen()); - } - realm->commit_transaction(); - }); - auto realm = Realm::get_shared_realm(config_local); - auto err = error_future.get(); - std::string property_err = "Invalid schema change (UPLOAD): non-breaking schema change: adding " - "\"ObjectID\" column at field \"added_oid_field\" in schema \"TopLevel\", " - "schema changes from clients are restricted when developer mode is disabled"; - std::string class_err = "Invalid schema change (UPLOAD): non-breaking schema change: adding schema " - "for Realm table \"AddedClass\", schema changes from clients are restricted when " - "developer mode is disabled"; - REQUIRE_THAT(err.status.reason(), Catch::Matchers::ContainsSubstring(property_err) || - Catch::Matchers::ContainsSubstring(class_err)); - CHECK(before_reset_count == 1); - CHECK(after_reset_count == 1); - } - } + seed_realm(config_local, ResetMode::InitiateClientReset); + // Disable dev mode so that schema changes are not allowed + app_session.admin_api.set_development_mode_to(app_session.server_app_id, false); + auto cleanup = util::make_scope_exit([&]() noexcept { + const AppSession& app_session = harness.session().app_session(); + app_session.admin_api.set_development_mode_to(app_session.server_app_id, true); + }); - // the previous section turns off dev mode, undo that now for later tests - const AppSession& app_session = harness.session().app_session(); - app_session.admin_api.set_development_mode_to(app_session.server_app_id, true); + std::vector changed_schema = make_additive_changes(schema); + config_local.schema = changed_schema; + config_local.sync_config->client_resync_mode = ClientResyncMode::Recover; + (void)setup_reset_handlers_for_schema_validation(config_local, changed_schema); + auto&& [error_future, err_handler] = make_error_handler(); + config_local.sync_config->error_handler = err_handler; + async_open_realm(config_local, [&](ThreadSafeReference&& ref, std::exception_ptr error) { + REQUIRE(ref); + REQUIRE_FALSE(error); + auto realm = Realm::get_shared_realm(std::move(ref)); + // make changes to the new property + realm->begin_transaction(); + auto table = realm->read_group().get_table("class_TopLevel"); + ColKey new_col = table->get_column_key("added_oid_field"); + REQUIRE(new_col); + for (auto it = table->begin(); it != table->end(); ++it) { + it->set(new_col, ObjectId::gen()); + } + realm->commit_transaction(); + }); + auto realm = Realm::get_shared_realm(config_local); + auto err = error_future.get(); + std::string property_err = "Invalid schema change (UPLOAD): non-breaking schema change: adding " + "\"ObjectID\" column at field \"added_oid_field\" in schema \"TopLevel\", " + "schema changes from clients are restricted when developer mode is disabled"; + std::string class_err = "Invalid schema change (UPLOAD): non-breaking schema change: adding schema " + "for Realm table \"AddedClass\", schema changes from clients are restricted when " + "developer mode is disabled"; + REQUIRE_THAT(err.status.reason(), Catch::Matchers::ContainsSubstring(property_err) || + Catch::Matchers::ContainsSubstring(class_err)); + CHECK(before_reset_count == 1); + CHECK(after_reset_count == 1); + } } TEST_CASE("flx: creating an object on a class with no subscription throws", "[sync][flx][subscription][baas]") { @@ -4295,67 +4306,59 @@ TEST_CASE("flx sync: Client reset during async open", "[sync][flx][client reset] mutable_subscription.commit(); }; - auto before_callback_called = util::make_promise_future(); - auto after_callback_called = util::make_promise_future(); realm_config.sync_config->client_resync_mode = ClientResyncMode::Recover; realm_config.sync_config->subscription_initializer = subscription_callback; - realm_config.sync_config->on_sync_client_event_hook = - [&, client_reset_triggered = false](std::weak_ptr weak_sess, - const SyncClientHookData& event_data) mutable { - auto sess = weak_sess.lock(); - if (!sess) { - return SyncClientHookAction::NoAction; - } - if (sess->path() != realm_config.path) { - return SyncClientHookAction::NoAction; - } + bool client_reset_triggered = false; + realm_config.sync_config->on_sync_client_event_hook = [&](std::weak_ptr weak_sess, + const SyncClientHookData& event_data) { + auto sess = weak_sess.lock(); + if (!sess) { + return SyncClientHookAction::NoAction; + } + if (sess->path() != realm_config.path) { + return SyncClientHookAction::NoAction; + } - if (event_data.event != SyncClientHookEvent::DownloadMessageReceived) { - return SyncClientHookAction::NoAction; - } + if (event_data.event != SyncClientHookEvent::DownloadMessageReceived) { + return SyncClientHookAction::NoAction; + } - if (client_reset_triggered) { - return SyncClientHookAction::NoAction; - } - client_reset_triggered = true; - reset_utils::trigger_client_reset(harness.session().app_session()); - return SyncClientHookAction::EarlyReturn; - }; + if (client_reset_triggered) { + return SyncClientHookAction::NoAction; + } - realm_config.sync_config->notify_before_client_reset = - [promise = util::CopyablePromiseHolder(std::move(before_callback_called.promise))]( - std::shared_ptr realm) mutable { - CHECK(realm->schema_version() == 1); - promise.get_promise().emplace_value(); - }; + client_reset_triggered = true; + reset_utils::trigger_client_reset(harness.session().app_session(), *sess); + return SyncClientHookAction::SuspendWithRetryableError; + }; - realm_config.sync_config->notify_after_client_reset = - [promise = util::CopyablePromiseHolder(std::move(after_callback_called.promise))]( - std::shared_ptr realm, ThreadSafeReference, bool) mutable { - CHECK(realm->schema_version() == 1); - promise.get_promise().emplace_value(); - }; + auto before_callback_called = util::make_promise_future(); + realm_config.sync_config->notify_before_client_reset = [&](std::shared_ptr realm) { + CHECK(realm->schema_version() == 1); + before_callback_called.promise.emplace_value(); + }; + + auto after_callback_called = util::make_promise_future(); + realm_config.sync_config->notify_after_client_reset = [&](std::shared_ptr realm, ThreadSafeReference, + bool) { + CHECK(realm->schema_version() == 1); + after_callback_called.promise.emplace_value(); + }; auto realm_task = Realm::get_synchronized_realm(realm_config); auto realm_pf = util::make_promise_future(); - realm_task->start([promise_holder = util::CopyablePromiseHolder(std::move(realm_pf.promise))]( - ThreadSafeReference ref, std::exception_ptr ex) mutable { - auto promise = promise_holder.get_promise(); - if (ex) { - try { + realm_task->start([&](ThreadSafeReference ref, std::exception_ptr ex) { + auto& promise = realm_pf.promise; + try { + if (ex) { std::rethrow_exception(ex); } - catch (...) { - promise.set_error(exception_to_status()); - } - return; + promise.emplace_value(Realm::get_shared_realm(std::move(ref))); } - auto realm = Realm::get_shared_realm(std::move(ref)); - if (!realm) { - promise.set_error({ErrorCodes::RuntimeError, "could not get realm from threadsaferef"}); + catch (...) { + promise.set_error(exception_to_status()); } - promise.emplace_value(std::move(realm)); }); auto realm = realm_pf.future.get(); before_callback_called.future.get(); diff --git a/test/object-store/util/sync/baas_admin_api.cpp b/test/object-store/util/sync/baas_admin_api.cpp index 461294aa6d8..0b8a0074bfb 100644 --- a/test/object-store/util/sync/baas_admin_api.cpp +++ b/test/object-store/util/sync/baas_admin_api.cpp @@ -412,8 +412,7 @@ app::Response AdminAPIEndpoint::post(std::string body) const nlohmann::json AdminAPIEndpoint::post_json(nlohmann::json body) const { auto resp = post(body.dump()); - REALM_ASSERT_EX(resp.http_status_code >= 200 && resp.http_status_code < 300, - util::format("url: %1 request: %2, reply: %3", m_url, body.dump(), resp.body)); + REALM_ASSERT_EX(resp.http_status_code >= 200 && resp.http_status_code < 300, m_url, body.dump(), resp.body); return nlohmann::json::parse(resp.body.empty() ? "{}" : resp.body); } @@ -1074,7 +1073,7 @@ AppSession create_app(const AppCreateConfig& config) // Create the schemas in two passes: first populate just the primary key and // partition key, then add the rest of the properties. This ensures that the - // targest of links exist before adding the links. + // targets of links exist before adding the links. std::vector> object_schema_to_create; BaasRuleBuilder rule_builder(config.schema, config.partition_key, mongo_service_name, config.mongo_dbname, static_cast(config.flx_sync_config)); @@ -1178,6 +1177,17 @@ AppSession create_app(const AppCreateConfig& config) {"version", 1}, }); + // Wait for initial sync to complete, as connecting while this is happening + // causes various problems + bool any_sync_types = std::any_of(config.schema.begin(), config.schema.end(), [](auto& object_schema) { + return object_schema.table_type == ObjectSchema::ObjectType::TopLevel; + }); + if (any_sync_types) { + timed_sleeping_wait_for([&] { + return session.is_initial_sync_complete(app_id); + }); + } + return {client_app_id, app_id, session, config}; } diff --git a/test/object-store/util/sync/sync_test_utils.cpp b/test/object-store/util/sync/sync_test_utils.cpp index 4426d82adb0..43f8bc90612 100644 --- a/test/object-store/util/sync/sync_test_utils.cpp +++ b/test/object-store/util/sync/sync_test_utils.cpp @@ -432,7 +432,7 @@ struct FakeLocalClientReset : public TestClientReset { using _impl::client_reset::perform_client_reset_diff; constexpr bool recovery_is_allowed = true; perform_client_reset_diff(*local_db, *remote_db, fake_ident, *logger, m_mode, recovery_is_allowed, - nullptr, nullptr, [](int64_t) {}); + nullptr, [](int64_t) {}); remote_realm->close(); if (m_on_post_reset) { @@ -507,39 +507,16 @@ void wait_for_num_objects_in_atlas(std::shared_ptr user, const AppSess std::chrono::minutes(15), std::chrono::milliseconds(500)); } -void trigger_client_reset(const AppSession& app_session) +void trigger_client_reset(const AppSession& app_session, const SyncSession& sync_session) { - // cause a client reset by restarting the sync service - // this causes the server's sync history to be resynthesized - auto baas_sync_service = app_session.admin_api.get_sync_service(app_session.server_app_id); - auto baas_sync_config = app_session.admin_api.get_config(app_session.server_app_id, baas_sync_service); - - REQUIRE(app_session.admin_api.is_sync_enabled(app_session.server_app_id)); - app_session.admin_api.disable_sync(app_session.server_app_id, baas_sync_service.id, baas_sync_config); - timed_sleeping_wait_for([&] { - return app_session.admin_api.is_sync_terminated(app_session.server_app_id); - }); - app_session.admin_api.enable_sync(app_session.server_app_id, baas_sync_service.id, baas_sync_config); - REQUIRE(app_session.admin_api.is_sync_enabled(app_session.server_app_id)); - if (app_session.config.dev_mode_enabled) { // dev mode is not sticky across a reset - app_session.admin_api.set_development_mode_to(app_session.server_app_id, true); - } - - // In FLX sync, the server won't let you connect until the initial sync is complete. With PBS tho, we need - // to make sure we've actually copied all the data from atlas into the realm history before we do any of - // our remote changes. - if (!app_session.config.flx_sync_config) { - timed_sleeping_wait_for([&] { - return app_session.admin_api.is_initial_sync_complete(app_session.server_app_id); - }); - } + auto file_ident = SyncSession::OnlyForTesting::get_file_ident(sync_session); + REQUIRE(file_ident.ident != 0); + app_session.admin_api.trigger_client_reset(app_session.server_app_id, file_ident.ident); } void trigger_client_reset(const AppSession& app_session, const SharedRealm& realm) { - auto file_ident = SyncSession::OnlyForTesting::get_file_ident(*realm->sync_session()); - REQUIRE(file_ident.ident != 0); - app_session.admin_api.trigger_client_reset(app_session.server_app_id, file_ident.ident); + trigger_client_reset(app_session, *realm->sync_session()); } struct BaasClientReset : public TestClientReset { diff --git a/test/object-store/util/sync/sync_test_utils.hpp b/test/object-store/util/sync/sync_test_utils.hpp index 195f8eb2f24..879c74cb563 100644 --- a/test/object-store/util/sync/sync_test_utils.hpp +++ b/test/object-store/util/sync/sync_test_utils.hpp @@ -232,7 +232,7 @@ void wait_for_object_to_persist_to_atlas(std::shared_ptr user, const A void wait_for_num_objects_in_atlas(std::shared_ptr user, const AppSession& app_session, const std::string& schema_name, size_t expected_size); -void trigger_client_reset(const AppSession& app_session); +void trigger_client_reset(const AppSession& app_session, const SyncSession& sync_session); void trigger_client_reset(const AppSession& app_session, const SharedRealm& realm); #endif // REALM_ENABLE_AUTH_TESTS diff --git a/test/test_client_reset.cpp b/test/test_client_reset.cpp index 0c2f1962d12..c6beed04601 100644 --- a/test/test_client_reset.cpp +++ b/test/test_client_reset.cpp @@ -854,6 +854,7 @@ void mark_as_synchronized(DB& db) progress.upload.last_integrated_server_version = current_version; sync::VersionInfo info_out; history.set_sync_progress(progress, nullptr, info_out); + history.set_client_file_ident({1, 0}, false); } void expect_reset(unit_test::TestContext& test_context, DB& target, DB& fresh, ClientResyncMode mode, @@ -1158,7 +1159,7 @@ SubscriptionSet add_subscription(SubscriptionStore& sub_store, const std::string auto mut = sub_store.get_latest().make_mutable_copy(); mut.insert_or_assign(name, q); if (state) { - mut.update_state(*state); + mut.set_state(*state); } return mut.commit(); } @@ -1202,7 +1203,9 @@ TEST(ClientReset_DiscardLocal_DiscardsPendingSubscriptions) } } -TEST(ClientReset_DiscardLocal_MakesAwaitingMarkActiveSubscriptionsComplete) +TEST_TYPES(ClientReset_DiscardLocal_MakesAwaitingMarkActiveSubscriptionsComplete, + std::integral_constant, + std::integral_constant) { SHARED_GROUP_TEST_PATH(path_1); SHARED_GROUP_TEST_PATH(path_2); @@ -1216,7 +1219,7 @@ TEST(ClientReset_DiscardLocal_MakesAwaitingMarkActiveSubscriptionsComplete) auto set = add_subscription(*sub_store, "complete", query, SubscriptionSet::State::AwaitingMark); auto future = set.get_state_change_notification(SubscriptionSet::State::Complete); - expect_reset(test_context, *db, *db_fresh, ClientResyncMode::DiscardLocal, sub_store.get()); + expect_reset(test_context, *db, *db_fresh, TEST_TYPE::value, sub_store.get()); CHECK_EQUAL(future.get(), SubscriptionSet::State::Complete); CHECK_EQUAL(set.state(), SubscriptionSet::State::AwaitingMark); @@ -1224,6 +1227,200 @@ TEST(ClientReset_DiscardLocal_MakesAwaitingMarkActiveSubscriptionsComplete) CHECK_EQUAL(set.state(), SubscriptionSet::State::Complete); } +TEST(ClientReset_Recover_DoesNotCompletePendingSubscriptions) +{ + SHARED_GROUP_TEST_PATH(path_1); + SHARED_GROUP_TEST_PATH(path_2); + auto [db, db_fresh] = prepare_db(path_1, path_2, [&](Transaction& tr) { + tr.add_table_with_primary_key("class_table", type_Int, "pk"); + }); + + auto tr = db->start_read(); + auto sub_store = SubscriptionStore::create(db); + auto query = tr->get_table("class_table")->where(); + + add_subscription(*sub_store, "complete", query, SubscriptionSet::State::Complete); + + std::vector> futures; + for (int i = 0; i < 3; ++i) { + auto subs = add_subscription(*sub_store, util::format("pending %1", i), query); + futures.push_back(subs.get_state_change_notification(SubscriptionSet::State::Complete)); + } + + expect_reset(test_context, *db, *db_fresh, ClientResyncMode::Recover, sub_store.get()); + + for (auto& fut : futures) { + CHECK_NOT(fut.is_ready()); + } + + auto pending = sub_store->get_pending_subscriptions(); + CHECK_EQUAL(pending.size(), 3); + for (int i = 0; i < 3; ++i) { + CHECK_EQUAL(pending[i].size(), i + 2); + CHECK_EQUAL(std::prev(pending[i].end())->name, util::format("pending %1", i)); + } +} + +TEST(ClientReset_Recover_UpdatesRemoteServerVersions) +{ + SHARED_GROUP_TEST_PATH(path_1); + SHARED_GROUP_TEST_PATH(path_2); + auto [db, db_fresh] = prepare_db(path_1, path_2, [&](Transaction& tr) { + tr.add_table_with_primary_key("class_table", type_Int, "pk"); + }); + + // Create local unsynchronized changes + for (int i = 0; i < 5; ++i) { + auto wt = db->start_write(); + auto table = wt->get_table("class_table"); + table->create_object_with_primary_key(i); + wt->commit(); + } + + // Change the last seen server version for the freshly download DB + { + sync::SyncProgress progress; + // Set to a valid but incorrect client version which should not be + // copied over by client reset + auto client_version = db_fresh->get_version_of_latest_snapshot() - 1; + progress.download.last_integrated_client_version = client_version; + progress.upload.client_version = client_version; + + // Server versions are opaque increasing values, so they can be whatever. + // Set to known values that we can verify are used + progress.latest_server_version.version = 123; + progress.latest_server_version.salt = 456; + progress.download.server_version = 123; + progress.upload.last_integrated_server_version = 789; + + sync::VersionInfo info_out; + auto& history = static_cast(db_fresh->get_replication())->get_history(); + history.set_sync_progress(progress, nullptr, info_out); + } + + expect_reset(test_context, *db, *db_fresh, ClientResyncMode::Recover, nullptr); + + auto& history = static_cast(db->get_replication())->get_history(); + history.ensure_updated(db->get_version_of_latest_snapshot()); + + version_type current_client_version; + SaltedFileIdent file_ident; + SyncProgress sync_progress; + history.get_status(current_client_version, file_ident, sync_progress); + + CHECK_EQUAL(file_ident.ident, 100); + CHECK_EQUAL(file_ident.salt, 200); + CHECK_EQUAL(sync_progress.upload.client_version, 0); + CHECK_EQUAL(sync_progress.download.last_integrated_client_version, 0); + CHECK_EQUAL(sync_progress.upload.last_integrated_server_version, 123); + CHECK_EQUAL(sync_progress.download.server_version, 123); + + std::vector uploadable_changesets; + version_type locked_server_version; + history.find_uploadable_changesets(sync_progress.upload, db->get_version_of_latest_snapshot(), + uploadable_changesets, locked_server_version); + + CHECK_EQUAL(uploadable_changesets.size(), 5); + for (auto& uc : uploadable_changesets) { + CHECK_EQUAL(uc.progress.last_integrated_server_version, 123); + } +} + +TEST(ClientReset_Recover_UploadableBytes) +{ + SHARED_GROUP_TEST_PATH(path_1); + SHARED_GROUP_TEST_PATH(path_2); + auto [db, db_fresh] = prepare_db(path_1, path_2, [&](Transaction& tr) { + tr.add_table_with_primary_key("class_table", type_Int, "pk"); + }); + + // Create local unsynchronized changes + for (int i = 0; i < 5; ++i) { + auto wt = db->start_write(); + auto table = wt->get_table("class_table"); + table->create_object_with_primary_key(i); + wt->commit(); + } + + // Create some of the same objects in the fresh realm so that the post-reset + // uploadable_bytes should be different from pre-reset (but still not zero) + { + auto wt = db_fresh->start_write(); + auto table = wt->get_table("class_table"); + for (int i = 0; i < 3; ++i) { + table->create_object_with_primary_key(i); + } + wt->commit(); + } + + auto& history = static_cast(db->get_replication())->get_history(); + uint_fast64_t unused, pre_reset_uploadable_bytes; + history.get_upload_download_bytes(db.get(), unused, unused, unused, pre_reset_uploadable_bytes, unused); + CHECK_GREATER(pre_reset_uploadable_bytes, 0); + + expect_reset(test_context, *db, *db_fresh, ClientResyncMode::Recover, nullptr); + + uint_fast64_t post_reset_uploadable_bytes; + history.get_upload_download_bytes(db.get(), unused, unused, unused, post_reset_uploadable_bytes, unused); + CHECK_GREATER(post_reset_uploadable_bytes, 0); + CHECK_GREATER(pre_reset_uploadable_bytes, post_reset_uploadable_bytes); +} + +TEST(ClientReset_Recover_ListsAreOnlyCopiedOnce) +{ + SHARED_GROUP_TEST_PATH(path_1); + SHARED_GROUP_TEST_PATH(path_2); + auto [db, db_fresh] = prepare_db(path_1, path_2, [&](Transaction& tr) { + auto table = tr.add_table_with_primary_key("class_table", type_Int, "pk"); + auto col = table->add_column_list(type_Int, "list"); + auto list = table->create_object_with_primary_key(0).get_list(col); + list.add(0); + list.add(1); + list.add(2); + }); + + // Perform some conflicting list writes which aren't recoverable and require + // a copy + { // modify local + auto wt = db->start_write(); + auto list = wt->get_table("class_table")->begin()->get_list("list"); + list.remove(0); + list.add(4); + wt->commit_and_continue_writing(); + list.remove(0); + list.add(5); + wt->commit_and_continue_writing(); + list.remove(0); + list.add(6); + wt->commit(); + } + { // modify remote + auto wt = db_fresh->start_write(); + auto list = wt->get_table("class_table")->begin()->get_list("list"); + list.clear(); + list.add(7); + list.add(8); + list.add(9); + wt->commit(); + } + + expect_reset(test_context, *db, *db_fresh, ClientResyncMode::Recover, nullptr); + + // List should match the pre-reset local state + auto rt = db->start_read(); + auto list = rt->get_table("class_table")->begin()->get_list("list"); + CHECK_EQUAL(list.size(), 3); + CHECK_EQUAL(list.get(0), 4); + CHECK_EQUAL(list.get(1), 5); + CHECK_EQUAL(list.get(2), 6); + + // The second and third changeset should now be empty and so excluded from + // get_local_changes() + auto repl = static_cast(db->get_replication()); + auto changes = repl->get_history().get_local_changes(rt->get_version()); + CHECK_EQUAL(changes.size(), 1); +} + TEST(ClientReset_Recover_RecoverableChangesOnListsAfterUnrecoverableAreNotDuplicated) { SHARED_GROUP_TEST_PATH(path_1); @@ -1264,10 +1461,7 @@ TEST(ClientReset_Recover_RecoverableChangesOnListsAfterUnrecoverableAreNotDuplic wt->commit(); } - bool did_reset = _impl::client_reset::perform_client_reset( - *test_context.logger, *db, *db_fresh, ClientResyncMode::Recover, nullptr, nullptr, {100, 200}, - sub_store.get(), [](int64_t) {}, true); - CHECK(did_reset); + expect_reset(test_context, *db, *db_fresh, ClientResyncMode::Recover, sub_store.get()); // List should match the pre-reset local state auto rt = db->start_read(); @@ -1277,14 +1471,173 @@ TEST(ClientReset_Recover_RecoverableChangesOnListsAfterUnrecoverableAreNotDuplic CHECK_EQUAL(list.get(1), 4); CHECK_EQUAL(list.get(2), 5); - // There should only be one non-empty changeset as the second one did not - // write anything + // The second changeset should now be empty and so excluded from get_local_changes() auto repl = static_cast(db->get_replication()); auto changes = repl->get_history().get_local_changes(rt->get_version()); - auto non_empty_count = std::count_if(changes.begin(), changes.end(), [](auto&& c) { - return c.changeset.size() > 0; + CHECK_EQUAL(changes.size(), 1); +} + +// Apply uploaded changes in src to dst as if they had been exchanged by sync +void apply_changes(DB& src, DB& dst) +{ + auto& src_history = static_cast(src.get_replication())->get_history(); + auto& dst_history = static_cast(dst.get_replication())->get_history(); + + version_type dst_client_version; + SaltedFileIdent dst_file_ident; + SyncProgress dst_progress; + dst_history.get_status(dst_client_version, dst_file_ident, dst_progress); + + std::vector> decompressed_changesets; + std::vector remote_changesets; + auto local_changes = src_history.get_local_changes(src.get_version_of_latest_snapshot()); + for (auto& change : local_changes) { + decompressed_changesets.emplace_back(); + auto& buffer = decompressed_changesets.back(); + ChunkedBinaryInputStream is{change.changeset}; + util::compression::decompress_nonportable(is, buffer); + + // Arbitrary non-zero file ident + file_ident_type file_ident = 2; + // Treat src's changesets as being "after" dst's + uint_fast64_t timestamp = -1; + remote_changesets.emplace_back(change.version, dst_progress.upload.last_integrated_server_version, + BinaryData(buffer.data(), buffer.size()), timestamp, file_ident); + } + + dst_progress.download.server_version += remote_changesets.size(); + dst_progress.latest_server_version.version += remote_changesets.size(); + + util::NullLogger logger; + VersionInfo new_version; + dst_history.integrate_server_changesets(dst_progress, nullptr, remote_changesets, new_version, + DownloadBatchState::SteadyState, logger, dst.start_read()); +} + +TEST(ClientReset_Recover_ReciprocalListChanges) +{ + SHARED_GROUP_TEST_PATH(path_1); + SHARED_GROUP_TEST_PATH(path_2); + auto [db, db_fresh] = prepare_db(path_1, path_2, [&](Transaction& tr) { + auto table = tr.add_table_with_primary_key("class_table", type_Int, "pk"); + auto col = table->add_column_list(type_Int, "list"); + auto list = table->create_object_with_primary_key(0).get_list(col); + for (int i = 0; i < 5; ++i) { + list.add(i * 10); + } }); - CHECK_EQUAL(non_empty_count, 1); + + { + auto wt = db->start_write(); + auto list = wt->get_table("class_table")->begin()->get_list("list"); + for (int i = 0; i < 5; ++i) { + list.insert(i * 2 + 1, i * 10 + 1); + } + // list is now [0, 1, 10, 11, 20, 21, 30, 31, 40, 41] + wt->commit(); + } + + { + auto wt = db_fresh->start_write(); + auto list = wt->get_table("class_table")->begin()->get_list("list"); + for (int i = 0; i < 5; ++i) { + list.insert(i * 2 + 1, i * 10 + 2); + } + // list is now [0, 2, 10, 12, 20, 22, 30, 32, 40, 42] + wt->commit(); + } + + // Apply the changes in db_fresh to db as if it was a changeset downloaded + // from the server. This creates reciprocal history for the unuploaded + // changeset in db. + // list is now [0, 1, 2, 10, 11, 12, 20, 21, 22, 30, 31, 32, 40, 41, 42] + apply_changes(*db_fresh, *db); + + // The local realm is fully up-to-date with the server, so this client reset + // shouldn't modify the group. However, if it reapplied the original changesets + // and not the reciprocal history, it'd result in the list being + // [0, 1, 2, 11, 10, 21, 12, 31, 20, 41, 22, 30, 32, 40, 42] + expect_reset(test_context, *db, *db_fresh, ClientResyncMode::Recover, nullptr); + + auto rt = db->start_read(); + auto list = rt->get_table("class_table")->begin()->get_list("list"); + CHECK_OR_RETURN(list.size() == 15); + for (int i = 0; i < 5; ++i) { + CHECK_EQUAL(list[i * 3], i * 10); + CHECK_EQUAL(list[i * 3 + 1], i * 10 + 1); + CHECK_EQUAL(list[i * 3 + 2], i * 10 + 2); + } +} + +TEST(ClientReset_Recover_UpdatesReciprocalHistory) +{ + SHARED_GROUP_TEST_PATH(path_1); + SHARED_GROUP_TEST_PATH(path_2); + SHARED_GROUP_TEST_PATH(path_3); + + auto [db, db_fresh] = prepare_db(path_1, path_2, [&](Transaction& tr) { + auto table = tr.add_table_with_primary_key("class_table", type_Int, "pk"); + auto col = table->add_column_list(type_Int, "list"); + table->create_object_with_primary_key(0).get_list(col).add(0); + }); + + { // local online write that doesn't get uploaded + auto wt = db->start_write(); + // This instruction is merged with the add in the remote write, + // generating reciprocal history. It is then discarded when replaying + // onto the fresh realm in the client reset as the object will no longer + // exist at that point + wt->get_table("class_table")->begin()->get_list("list").add(1); + // An instruction that won't get discarded when replaying to ensure + // the changeset remains non-empty + wt->get_table("class_table")->create_object_with_primary_key(1); + wt->commit(); + } + + { // remote write which gets sent to the client in a DOWNLOAD + auto wt = db_fresh->start_write(); + wt->get_table("class_table")->begin()->get_list("list").add(2); + wt->commit(); + } + + // db now has a changeset waiting to be uploaded with both a changeset + // and reciprocal transform + apply_changes(*db_fresh, *db); + + { // Freshly downloaded client reset realm doesn't have the object + auto wt = db_fresh->start_write(); + wt->get_table("class_table")->begin()->remove(); + wt->commit(); + } + + // Make a copy as client reset will delete the fresh realm + mark_as_synchronized(*db_fresh); + db_fresh->write_copy(path_3, nullptr); + + // client reset will discard the recovered array insertion as the object + // doesn't exist, but keep the object creation + expect_reset(test_context, *db, *db_fresh, ClientResyncMode::Recover, nullptr); + + // Recreate the object and add a different value to the list + { + db_fresh = DB::create(make_client_replication(), path_3); + auto wt = db_fresh->start_write(); + wt->get_table("class_table")->create_object_with_primary_key(0).get_list("list").add(3); + wt->commit(); + } + + // If the client failed to discard the old reciprocal transform when performing + // the client reset this'll merge the ArrayInsert with the discarded ArrayInsert, + // and then throw an exception because prior_size is now incorrect + apply_changes(*db_fresh, *db); + + // Sanity check the end state + auto rt = db->start_read(); + auto table = rt->get_table("class_table"); + CHECK_OR_RETURN(table->size() == 2); + auto list = table->get_object(1).get_list("list"); + CHECK_OR_RETURN(list.size() == 1); + CHECK_EQUAL(list.get(0), 3); } } // unnamed namespace diff --git a/test/test_sync_subscriptions.cpp b/test/test_sync_subscriptions.cpp index 7398878d936..dc528d5a84f 100644 --- a/test/test_sync_subscriptions.cpp +++ b/test/test_sync_subscriptions.cpp @@ -128,7 +128,7 @@ TEST(Sync_SubscriptionStoreStateUpdates) CHECK(inserted); CHECK_NOT(it == out.end()); - out.update_state(SubscriptionSet::State::Complete); + out.set_state(SubscriptionSet::State::Complete); out.commit(); } @@ -165,21 +165,18 @@ TEST(Sync_SubscriptionStoreStateUpdates) } // Mark the version 2 set as complete. - { - auto latest_mutable = store->get_mutable_by_version(2); - latest_mutable.update_state(SubscriptionSet::State::Complete); - latest_mutable.commit(); - } + store->update_state(2, SubscriptionSet::State::Complete); - // There should now only be one set, version 2, that is complete. Trying to get version 1 should throw an error. + // There should now only be one set, version 2, that is complete. Trying to + // get version 1 should report that it was superseded { auto active = store->get_active(); auto latest = store->get_latest(); CHECK(active.version() == latest.version()); CHECK(active.state() == SubscriptionSet::State::Complete); - // By marking version 2 as complete version 1 will get superceded and removed. - CHECK_THROW(store->get_mutable_by_version(1), KeyNotFound); + // By marking version 2 as complete version 1 will get superseded and removed. + CHECK_EQUAL(store->get_by_version(1).state(), SubscriptionSet::State::Superseded); } { @@ -305,15 +302,13 @@ TEST(Sync_SubscriptionStoreNotifications) CHECK_EQUAL(notification_futures[0].get(), SubscriptionSet::State::Pending); // This should also return immediately with a ready future because the subset is in the correct state. - CHECK_EQUAL(store->get_mutable_by_version(1).get_state_change_notification(SubscriptionSet::State::Pending).get(), + CHECK_EQUAL(store->get_by_version(1).get_state_change_notification(SubscriptionSet::State::Pending).get(), SubscriptionSet::State::Pending); // This should not be ready yet because we haven't updated its state. CHECK_NOT(notification_futures[1].is_ready()); - sub_set = store->get_mutable_by_version(2); - sub_set.update_state(SubscriptionSet::State::Bootstrapping); - std::move(sub_set).commit(); + store->update_state(2, SubscriptionSet::State::Bootstrapping); // Now we should be able to get the future result because we updated the state. CHECK_EQUAL(notification_futures[1].get(), SubscriptionSet::State::Bootstrapping); @@ -322,9 +317,7 @@ TEST(Sync_SubscriptionStoreNotifications) CHECK_NOT(notification_futures[2].is_ready()); // Update the state to complete - skipping the bootstrapping phase entirely. - sub_set = store->get_mutable_by_version(3); - sub_set.update_state(SubscriptionSet::State::Complete); - sub_set.commit(); + store->update_state(3, SubscriptionSet::State::Complete); // Now we should be able to get the future result because we updated the state and skipped the bootstrapping // phase. @@ -334,10 +327,7 @@ TEST(Sync_SubscriptionStoreNotifications) std::string error_msg = "foo bar bizz buzz. i'm an error string for this test!"; CHECK_NOT(notification_futures[3].is_ready()); auto old_sub_set = store->get_by_version(4); - sub_set = store->get_mutable_by_version(4); - sub_set.update_state(SubscriptionSet::State::Bootstrapping); - sub_set.update_state(SubscriptionSet::State::Error, std::string_view(error_msg)); - sub_set.commit(); + store->update_state(4, SubscriptionSet::State::Error, std::string_view(error_msg)); CHECK_EQUAL(old_sub_set.state(), SubscriptionSet::State::Pending); CHECK(old_sub_set.error_str().is_null()); @@ -365,14 +355,12 @@ TEST(Sync_SubscriptionStoreNotifications) old_sub_set = store->get_by_version(5); - sub_set = store->get_mutable_by_version(6); - sub_set.update_state(SubscriptionSet::State::Complete); - sub_set.commit(); + store->update_state(6, SubscriptionSet::State::Complete); CHECK_EQUAL(notification_futures[4].get(), SubscriptionSet::State::Superseded); CHECK_EQUAL(notification_futures[5].get(), SubscriptionSet::State::Complete); - // Also check that new requests for the superceded sub set get filled immediately. + // Also check that new requests for the superseded sub set get filled immediately. CHECK_EQUAL(old_sub_set.get_state_change_notification(SubscriptionSet::State::Complete).get(), SubscriptionSet::State::Superseded); old_sub_set.refresh(); @@ -389,11 +377,7 @@ TEST(Sync_SubscriptionStoreNotifications) auto mut_set = store->get_latest().make_mutable_copy(); auto waitable_set = mut_set.commit(); - { - mut_set = store->get_mutable_by_version(waitable_set.version()); - mut_set.update_state(SubscriptionSet::State::Complete); - mut_set.commit(); - } + store->update_state(waitable_set.version(), SubscriptionSet::State::Complete); auto fut = waitable_set.get_state_change_notification(SubscriptionSet::State::Complete); CHECK(fut.is_ready()); @@ -469,13 +453,8 @@ TEST(Sync_SubscriptionStoreNextPendingVersion) sub_set = mut_sub_set.commit(); auto pending_set = sub_set.version(); - mut_sub_set = store->get_mutable_by_version(complete_set); - mut_sub_set.update_state(SubscriptionSet::State::Complete); - mut_sub_set.commit(); - - mut_sub_set = store->get_mutable_by_version(bootstrapping_set); - mut_sub_set.update_state(SubscriptionSet::State::Bootstrapping); - mut_sub_set.commit(); + store->update_state(complete_set, SubscriptionSet::State::Complete); + store->update_state(bootstrapping_set, SubscriptionSet::State::Bootstrapping); auto pending_version = store->get_next_pending_version(0); CHECK(pending_version); @@ -946,7 +925,7 @@ TEST(Sync_MutableSubscriptionSetOperations) // This is an empty subscription set. auto out2 = store->get_active().make_mutable_copy(); out2.insert_or_assign("c sub", query_c); - out2.import(subs); + out2.import(std::move(subs)); // "c sub" is erased when 'import' is used. CHECK_EQUAL(out2.size(), 2); // insert "c sub" again.