Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RCORE-2209 Treat completing a client reset as receiving a MARK message #7921

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
* Fixed bug which would prevent eventual consistency during conflict resolution. Affected clients would experience data divergence and potentially consistency errors as a result. ([PR #7955](https://github.com/realm/realm-core/pull/7955), since v14.8.0)
* Fixed issues loading the native Realm libraries on Linux ARMv7 systems when they linked against our bundled OpenSSL resulting in errors like `unexpected reloc type 0x03`. ([#7947](https://github.com/realm/realm-core/issues/7947), since v14.1.0)
* `Realm::convert()` would sometimes incorrectly throw an exception claiming that there were unuploaded local changes when the source Realm is a synchronized Realm ([#7966](https://github.com/realm/realm-core/issues/7966), since v10.7.0).
* Automatic client reset handling now reports download completion as soon as all changes from the newly downloaded file have been applied to the main Realm file rather than at an inconsistent time afterwards ([PR #7921](https://github.com/realm/realm-core/pull/7921)).
* Cycle detection for automatic client reset handling is now more precise. Previously errors which occurred after all recovered changesets were uploaded would sometimes be incorrectly considered a cycle and skip automatic handling. ([PR #7921](https://github.com/realm/realm-core/pull/7921)).

### Breaking changes
* None.
Expand Down
5 changes: 5 additions & 0 deletions src/realm/object-store/shared_realm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1213,6 +1213,11 @@ void Realm::convert(const Config& config, bool merge_into_existing)
}
}

bool Realm::has_pending_unuploaded_changes() const noexcept
{
return !m_transaction->get_history()->no_pending_local_changes(m_transaction->get_version());
}

OwnedBinaryData Realm::write_copy()
{
verify_thread();
Expand Down
2 changes: 2 additions & 0 deletions src/realm/object-store/shared_realm.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,8 @@ class Realm : public std::enable_shared_from_this<Realm> {
// Returns `true` if the Realm is frozen, `false` otherwise.
bool is_frozen() const;

bool has_pending_unuploaded_changes() const noexcept;

// Returns true if the Realm is either in a read or frozen transaction
bool is_in_read_transaction() const
{
Expand Down
59 changes: 5 additions & 54 deletions src/realm/sync/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,6 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener
// Can be called from any thread.
util::Future<std::string> send_test_command(std::string body);

void handle_pending_client_reset_acknowledgement();

// Can be called from any thread.
std::string get_appservices_connection_id();

Expand Down Expand Up @@ -779,14 +777,6 @@ void SessionImpl::on_resumed()
}
}

void SessionImpl::handle_pending_client_reset_acknowledgement()
{
// Ignore the call if the session is not active
if (m_state == State::Active) {
m_wrapper.handle_pending_client_reset_acknowledgement();
}
}

bool SessionImpl::process_flx_bootstrap_message(const DownloadMessage& message)
{
// Ignore the message if the session is not active or a steady state message
Expand Down Expand Up @@ -1354,8 +1344,12 @@ void SessionWrapper::actualize()
}
}

if (!m_client_reset_config)
if (!m_client_reset_config) {
check_progress(); // Throws
if (auto pending_reset = PendingResetStore::has_pending_reset(*m_db->start_frozen())) {
m_sess->logger.info(util::LogCategory::reset, "Found pending client reset tracker: %1", *pending_reset);
}
}
}

void SessionWrapper::force_close()
Expand Down Expand Up @@ -1651,49 +1645,6 @@ util::Future<std::string> SessionWrapper::send_test_command(std::string body)
return m_sess->send_test_command(std::move(body));
}

void SessionWrapper::handle_pending_client_reset_acknowledgement()
{
REALM_ASSERT(!m_finalized);

auto has_pending_reset = PendingResetStore::has_pending_reset(*m_db->start_frozen());
if (!has_pending_reset) {
return; // nothing to do
}

m_sess->logger.info(util::LogCategory::reset, "Tracking %1", *has_pending_reset);

// Now that the client reset merge is complete, wait for the changes to synchronize with the server
async_wait_for(
true, true, [self = util::bind_ptr(this), pending_reset = std::move(*has_pending_reset)](Status status) {
if (status == ErrorCodes::OperationAborted) {
return;
}
auto& logger = self->m_sess->logger;
if (!status.is_ok()) {
logger.error(util::LogCategory::reset, "Error while tracking client reset acknowledgement: %1",
status);
return;
}

logger.debug(util::LogCategory::reset, "Server has acknowledged %1", pending_reset);

auto tr = self->m_db->start_write();
auto cur_pending_reset = PendingResetStore::has_pending_reset(*tr);
if (!cur_pending_reset) {
logger.debug(util::LogCategory::reset, "Client reset cycle detection tracker already removed.");
return;
}
if (*cur_pending_reset == pending_reset) {
logger.debug(util::LogCategory::reset, "Removing client reset cycle detection tracker.");
}
else {
logger.info(util::LogCategory::reset, "Found new %1", cur_pending_reset);
}
PendingResetStore::clear_pending_reset(*tr);
tr->commit();
});
}

std::string SessionWrapper::get_appservices_connection_id()
{
auto pf = util::make_promise_future<std::string>();
Expand Down
2 changes: 0 additions & 2 deletions src/realm/sync/network/default_socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ class DefaultWebSocketImpl final : public DefaultWebSocket, public Config {
initiate_resolve();
}

virtual ~DefaultWebSocketImpl() = default;

void async_write_binary(util::Span<const char> data, SyncSocketProvider::FunctionHandler&& handler) override
{
m_websocket.async_write_binary(data.data(), data.size(),
Expand Down
5 changes: 1 addition & 4 deletions src/realm/sync/network/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1414,10 +1414,7 @@ class Service::Impl {
bool resolver_thread_started = m_resolver_thread.joinable();
if (resolver_thread_started)
return;
auto func = [this]() noexcept {
resolver_thread();
};
m_resolver_thread = std::thread{std::move(func)};
m_resolver_thread = std::thread{&Impl::resolver_thread, this};
}

void add_wait_oper(LendersWaitOperPtr op)
Expand Down
28 changes: 20 additions & 8 deletions src/realm/sync/noinst/client_history_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <realm/sync/instruction_replication.hpp>
#include <realm/sync/noinst/client_reset.hpp>
#include <realm/sync/noinst/client_reset_recovery.hpp>
#include <realm/sync/noinst/pending_reset_store.hpp>
#include <realm/transaction.hpp>
#include <realm/util/compression.hpp>
#include <realm/util/features.h>
Expand Down Expand Up @@ -90,7 +91,7 @@ void ClientHistory::set_history_adjustments(
for (size_t i = 0, size = m_arrays->remote_versions.size(); i < size; ++i) {
m_arrays->remote_versions.set(i, server_version.version);
version_type version = m_sync_history_base_version + i;
logger.debug("Updating %1: client_version(%2) changeset_size(%3) server_version(%4)", i, version + 1,
logger.debug("Updating %1: client_version(%2) changeset_size(%3) server_version(%4)", i, version,
m_arrays->changesets.get(i).size(), server_version.version);
}
}
Expand Down Expand Up @@ -335,16 +336,15 @@ void ClientHistory::set_client_file_ident(SaltedFileIdent client_file_ident, boo
}


// Overriding member function in realm::sync::ClientHistoryBase
void ClientHistory::set_sync_progress(const SyncProgress& progress, DownloadableProgress downloadable_bytes,
VersionInfo& version_info)
VersionInfo& version_info, util::Logger& logger)
{
TransactionRef wt = m_db->start_write(); // Throws
version_type local_version = wt->get_version();
ensure_updated(local_version); // Throws
prepare_for_write(); // Throws

update_sync_progress(progress, downloadable_bytes); // Throws
update_sync_progress(progress, downloadable_bytes, logger); // Throws

// Note: This transaction produces an empty changeset. Empty changesets are
// not uploaded to the server.
Expand Down Expand Up @@ -489,17 +489,17 @@ void ClientHistory::integrate_server_changesets(
// During the bootstrap phase in flexible sync, the server sends multiple download messages with the same
// synthetic server version that represents synthetic changesets generated from state on the server.
if (batch_state == DownloadBatchState::LastInBatch && changesets_to_integrate.empty()) {
update_sync_progress(progress, downloadable_bytes); // Throws
update_sync_progress(progress, downloadable_bytes, logger); // Throws
}
// Always update progress for download messages from steady state.
else if (batch_state == DownloadBatchState::SteadyState && !changesets_to_integrate.empty()) {
auto partial_progress = progress;
partial_progress.download.server_version = last_changeset.remote_version;
partial_progress.download.last_integrated_client_version = last_changeset.last_integrated_local_version;
update_sync_progress(partial_progress, downloadable_bytes); // Throws
update_sync_progress(partial_progress, downloadable_bytes, logger); // Throws
}
else if (batch_state == DownloadBatchState::SteadyState && changesets_to_integrate.empty()) {
update_sync_progress(progress, downloadable_bytes); // Throws
update_sync_progress(progress, downloadable_bytes, logger); // Throws
}
if (run_in_write_tr) {
run_in_write_tr(*transact, changesets_for_cb);
Expand Down Expand Up @@ -876,7 +876,8 @@ void ClientHistory::add_sync_history_entry(const HistoryEntry& entry)
}


void ClientHistory::update_sync_progress(const SyncProgress& progress, DownloadableProgress downloadable_bytes)
void ClientHistory::update_sync_progress(const SyncProgress& progress, DownloadableProgress downloadable_bytes,
util::Logger& logger)
{
Array& root = m_arrays->root;

Expand Down Expand Up @@ -947,6 +948,17 @@ void ClientHistory::update_sync_progress(const SyncProgress& progress, Downloada
root.set(s_progress_uploaded_bytes_iip,
RefOrTagged::make_tagged(uploaded_bytes)); // Throws

if (previous_upload_client_version < progress.upload.client_version) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this assume that if we make any upload progress that we'll have fully uploaded all changes and we know for sure we aren't going to get another client reset from any recovered changesets? maybe now that we have compensating writes that doesn't really matter as much?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uploading changesets should either result in the server acknowledging the upload or sending a client reset and not both, so once our UPLOAD is acked the window for getting a client reset due to those changesets being invalid has ended.

I think this check is probably wrong and it needs to actually be checking if we've reached the client version at the time of the client reset (or at time of opening).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It took a while to figure out how to test it but this is indeed incorrect; it marks the client reset as complete as soon as any changesets are acked rather than when all of the recovered changesets are.

// This is part of the client reset cycle detection.
// A client reset operation will write a flag to an internal table indicating that
// the changes there are a result of a successful reset. However, it is not possible to
// know if a recovery has been successful until the changes have been acknowledged by the
// server. The situation we want to avoid is that a recovery itself causes another reset
// which creates a reset cycle. However, at this point, upload progress has been made
// and we can remove the cycle detection flag if there is one.
PendingResetStore::remove_if_complete(*m_group, progress.upload.client_version, logger);
}

m_progress_download = progress.download;

trim_sync_history(); // Throws
Expand Down
5 changes: 3 additions & 2 deletions src/realm/sync/noinst/client_history_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ class ClientHistory final : public _impl::History, public TransformHistory {
/// \param downloadable_bytes If specified, and if the implementation cares
/// about byte-level progress, this function updates the persistent record
/// of the estimate of the number of remaining bytes to be downloaded.
void set_sync_progress(const SyncProgress& progress, DownloadableProgress downloadable_bytes, VersionInfo&);
void set_sync_progress(const SyncProgress& progress, DownloadableProgress downloadable_bytes, VersionInfo&,
util::Logger& logger);

/// \brief Scan through the history for changesets to be uploaded.
///
Expand Down Expand Up @@ -421,7 +422,7 @@ class ClientHistory final : public _impl::History, public TransformHistory {
void prepare_for_write();
Replication::version_type add_changeset(BinaryData changeset, BinaryData sync_changeset);
void add_sync_history_entry(const HistoryEntry&);
void update_sync_progress(const SyncProgress&, DownloadableProgress downloadable_bytes);
void update_sync_progress(const SyncProgress&, DownloadableProgress downloadable_bytes, util::Logger& logger);
void trim_ct_history();
void trim_sync_history();
void do_trim_sync_history(std::size_t n);
Expand Down
23 changes: 11 additions & 12 deletions src/realm/sync/noinst/client_impl_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1571,7 +1571,7 @@ void Session::integrate_changesets(const SyncProgress& progress, std::uint_fast6
"received empty download message that was not the last in batch",
ProtocolError::bad_progress);
}
history.set_sync_progress(progress, downloadable_bytes, version_info); // Throws
history.set_sync_progress(progress, downloadable_bytes, version_info, logger); // Throws
return;
}

Expand Down Expand Up @@ -1718,9 +1718,6 @@ void Session::activate()
catch (...) {
on_integration_failure(IntegrationException(exception_to_status()));
}

// Checks if there is a pending client reset
handle_pending_client_reset_acknowledgement();
}


Expand Down Expand Up @@ -2270,16 +2267,18 @@ bool Session::client_reset_if_needed()
m_progress.download.last_integrated_client_version);
REALM_ASSERT_EX(m_progress.upload.client_version == 0, m_progress.upload.client_version);

m_upload_progress = m_progress.upload;
m_download_progress = m_progress.download;
// Reset the cached values which are used to calculate progress since the
// last time sync completed
init_progress_handler();
// In recovery mode, there may be new changesets to upload and nothing left to download.
// In FLX DiscardLocal mode, there may be new commits due to subscription handling.
// For both, we want to allow uploads again without needing external changes to download first.
m_delay_uploads = false;

// Checks if there is a pending client reset
handle_pending_client_reset_acknowledgement();
// Update the download progress to match what it would have been if we'd
// received a MARK message from the server (as the fresh Realm which we used
// as the source data for the reset did).
m_upload_progress = m_progress.upload;
m_download_progress = m_progress.download;
m_server_version_at_last_download_mark = m_progress.download.server_version;
m_last_download_mark_received = m_last_download_mark_sent = m_target_download_mark;
check_for_download_completion();
michael-wb marked this conversation as resolved.
Show resolved Hide resolved

// If a migration or rollback is in progress, mark it complete when client reset is completed.
if (auto migration_store = get_migration_store()) {
Expand Down
1 change: 0 additions & 1 deletion src/realm/sync/noinst/client_impl_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -934,7 +934,6 @@ class ClientImpl::Session {
void process_pending_flx_bootstrap();

bool client_reset_if_needed();
void handle_pending_client_reset_acknowledgement();

void gather_pending_compensating_writes(util::Span<Changeset> changesets, std::vector<ProtocolErrorInfo>* out);

Expand Down
14 changes: 14 additions & 0 deletions src/realm/sync/noinst/client_reset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,20 @@ bool perform_client_reset_diff(DB& db_local, sync::ClientReset& reset_config, ut
}
}

// If there was nothing to recover or recovery was disabled then immediately
// mark the client reset as successfully complete
if (recovered.empty()) {
logger.info(util::LogCategory::reset,
"Immediately removing client reset tracker as there are no recovered changesets to upload.");
sync::PendingResetStore::clear_pending_reset(*wt_local);
}
else {
logger.debug(util::LogCategory::reset,
"Marking %1 as the version which must be uploaded to complete client reset recovery.",
recovered.back().version);
sync::PendingResetStore::set_recovered_version(*wt_local, recovered.back().version);
}

wt_local->commit_and_continue_as_read();

VersionID new_version_local = wt_local->get_version_of_current_transaction();
Expand Down
4 changes: 2 additions & 2 deletions src/realm/sync/noinst/migration_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ bool MigrationStore::load_data(bool read_only)

auto tr = m_db->start_read();
// Start with a reader so it doesn't try to write until we are ready
SyncMetadataSchemaVersionsReader schema_versions_reader(tr);
SyncMetadataSchemaVersionsReader schema_versions_reader(*tr);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of the changes to migration store, pending bootstrap store, sync metadata schema, pending bootstrap store, and subscriptions are just secondary effects of making PendingResetStore::has_pending_reset() take a Group rather than a TransactionRef.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aside from passing around a ref vs const ref to a shared_ptr, is there a reason why this changed from a Transaction to a Group?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aside from passing around a const ref instead of a const ref to a shared_ptr, is there a reason the parameter is a Group now and not a Transaction?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aside from passing around a ref vs ptr, is there a reason why this changed from a Transaction to a Group?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The root change is enabling PendingResetStore::has_pending_reset(realm->read_group()), which previously didn't work because the function expected a Transaction even though it didn't do anything which required a Transaction. All of these functions should have been taking a Group the whole time as they don't change the transaction state.

if (auto schema_version =
schema_versions_reader.get_version_for(tr, internal_schema_groups::c_flx_migration_store)) {
schema_versions_reader.get_version_for(*tr, internal_schema_groups::c_flx_migration_store)) {
if (*schema_version != c_schema_version) {
throw RuntimeError(ErrorCodes::UnsupportedFileFormatVersion,
"Invalid schema version for flexible sync migration store metadata");
Expand Down
4 changes: 2 additions & 2 deletions src/realm/sync/noinst/pending_bootstrap_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ PendingBootstrapStore::PendingBootstrapStore(DBRef db, util::Logger& logger,

auto tr = m_db->start_read();
// Start with a reader so it doesn't try to write until we are ready
SyncMetadataSchemaVersionsReader schema_versions_reader(tr);
SyncMetadataSchemaVersionsReader schema_versions_reader(*tr);
if (auto schema_version =
schema_versions_reader.get_version_for(tr, internal_schema_groups::c_pending_bootstraps)) {
schema_versions_reader.get_version_for(*tr, internal_schema_groups::c_pending_bootstraps)) {
if (*schema_version != c_schema_version) {
throw RuntimeError(ErrorCodes::SchemaVersionMismatch,
"Invalid schema version for FLX sync pending bootstrap table group");
Expand Down
Loading
Loading