Skip to content

Commit

Permalink
RCORE-2192 RCORE-2193 Fix FLX download progress reporting (#7870)
Browse files Browse the repository at this point in the history
* Fix FLX download progress reporting

We need to store the download progress for each batch of a bootstrap and not
just at the end for it to be useful in any way.

The server will sometimes send us DOWNLOAD messages with a non-one estimate
followed by a one estimate where the byte-level information is the same (as the
final message is empty). When this happens we need to report the download
completion to the user, so add the estimate to the fields checked for changes.

A subscription change which doesn't actually change what set of objects is in
view can result in an empty DOWNLOAD message with no changes other than the
query version, and we should report that too.

* Fix a comment

* Pass the DownloadMessage to process_flx_bootstrap_message()

* Report steady-state download progress
  • Loading branch information
tgoyne authored Jul 15, 2024
1 parent fac06bf commit 4f83c59
Showing 16 changed files with 384 additions and 97 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@

### Fixed
* <How do the end-user experience this issue? what was the impact?> ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?)
* None.
* FLX download progress was only updated when bootstraps completed, making it always be 0 before the first completion and then forever 1. ([PR #7869](https://github.com/realm/realm-core/issues/7869), since v14.10.2)

### Breaking changes
* None.
19 changes: 19 additions & 0 deletions src/realm/object-store/sync/async_open_task.cpp
Original file line number Diff line number Diff line change
@@ -65,6 +65,25 @@ void AsyncOpenTask::start(AsyncOpenCallback callback)
session->revive_if_needed();
}

util::Future<ThreadSafeReference> AsyncOpenTask::start()
{
auto pf = util::make_promise_future<ThreadSafeReference>();
start([promise = std::move(pf.promise)](ThreadSafeReference&& ref, std::exception_ptr e) mutable {
if (e) {
try {
std::rethrow_exception(e);
}
catch (...) {
promise.set_error(exception_to_status());
}
}
else {
promise.emplace_value(std::move(ref));
}
});
return std::move(pf.future);
}

void AsyncOpenTask::cancel()
{
std::shared_ptr<SyncSession> session;
9 changes: 9 additions & 0 deletions src/realm/object-store/sync/async_open_task.hpp
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@

#include <realm/util/checked_mutex.hpp>
#include <realm/util/functional.hpp>
#include <realm/util/future.hpp>

#include <memory>
#include <vector>
@@ -47,13 +48,21 @@ class AsyncOpenTask : public std::enable_shared_from_this<AsyncOpenTask> {
std::shared_ptr<realm::SyncSession> session, bool db_open_for_the_first_time);
AsyncOpenTask(const AsyncOpenTask&) = delete;
AsyncOpenTask& operator=(const AsyncOpenTask&) = delete;

// Starts downloading the Realm. The callback will be triggered either when the download completes
// or an error is encountered.
//
// If multiple AsyncOpenTasks all attempt to download the same Realm and one of them is canceled,
// the other tasks will receive a "Cancelled" exception.
void start(AsyncOpenCallback callback) REQUIRES(!m_mutex);

// Starts downloading the Realm. The future will be fulfilled either when the download completes
// or an error is encountered.
//
// If multiple AsyncOpenTasks all attempt to download the same Realm and one of them is canceled,
// the other tasks will receive a cancelled Status
util::Future<ThreadSafeReference> start() REQUIRES(!m_mutex);

// Cancels the download and stops the session. No further functions should be called on this class.
void cancel() REQUIRES(!m_mutex);

55 changes: 31 additions & 24 deletions src/realm/sync/client.cpp
Original file line number Diff line number Diff line change
@@ -149,12 +149,15 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener
uint64_t uploadable;
uint64_t downloaded;
uint64_t downloadable;
int64_t query_version;
double download_estimate;

// Does not check snapshot
bool operator==(const ReportedProgress& p) const noexcept
{
return uploaded == p.uploaded && uploadable == p.uploadable && downloaded == p.downloaded &&
downloadable == p.downloadable;
downloadable == p.downloadable && query_version == p.query_version &&
download_estimate == p.download_estimate;
}
};
std::optional<ReportedProgress> m_reported_progress;
@@ -800,27 +803,31 @@ void SessionImpl::update_subscription_version_info()
}
}

bool SessionImpl::process_flx_bootstrap_message(const SyncProgress& progress, DownloadBatchState batch_state,
int64_t query_version, const ReceivedChangesets& received_changesets)
bool SessionImpl::process_flx_bootstrap_message(const DownloadMessage& message)
{
// Ignore the call if the session is not active
if (m_state != State::Active) {
// Not a bootstrap message if this isn't a FLX download
if (!message.last_in_batch || !message.query_version) {
return false;
}

if (is_steady_state_download_message(batch_state, query_version)) {
REALM_ASSERT(m_is_flx_sync_session);

// Not a bootstrap message if it's for the already active query version
if (*message.last_in_batch && *message.query_version == m_wrapper.m_flx_active_version) {
return false;
}

auto batch_state = *message.last_in_batch ? DownloadBatchState::LastInBatch : DownloadBatchState::MoreToCome;
auto bootstrap_store = m_wrapper.get_flx_pending_bootstrap_store();
std::optional<SyncProgress> maybe_progress;
if (batch_state == DownloadBatchState::LastInBatch) {
maybe_progress = progress;
maybe_progress = message.progress;
}

bool new_batch = false;
try {
bootstrap_store->add_batch(query_version, std::move(maybe_progress), received_changesets, &new_batch);
bootstrap_store->add_batch(*message.query_version, std::move(maybe_progress), message.downloadable,
message.changesets, &new_batch);
}
catch (const LogicError& ex) {
if (ex.code() == ErrorCodes::LimitExceeded) {
@@ -836,11 +843,11 @@ bool SessionImpl::process_flx_bootstrap_message(const SyncProgress& progress, Do
// If we've started a new batch and there is more to come, call on_flx_sync_progress to mark the subscription as
// bootstrapping.
if (new_batch && batch_state == DownloadBatchState::MoreToCome) {
on_flx_sync_progress(query_version, DownloadBatchState::MoreToCome);
on_flx_sync_progress(*message.query_version, DownloadBatchState::MoreToCome);
}

auto hook_action = call_debug_hook(SyncClientHookEvent::BootstrapMessageProcessed, progress, query_version,
batch_state, received_changesets.size());
auto hook_action = call_debug_hook(SyncClientHookEvent::BootstrapMessageProcessed, message.progress,
*message.query_version, batch_state, message.changesets.size());
if (hook_action == SyncClientHookAction::EarlyReturn) {
return true;
}
@@ -904,7 +911,6 @@ void SessionImpl::process_pending_flx_bootstrap()

auto batch_state =
pending_batch.remaining_changesets > 0 ? DownloadBatchState::MoreToCome : DownloadBatchState::LastInBatch;
uint64_t downloadable_bytes = 0;
query_version = pending_batch.query_version;
bool simulate_integration_error =
(m_wrapper.m_simulate_integration_error && !pending_batch.changesets.empty());
@@ -916,8 +922,8 @@ void SessionImpl::process_pending_flx_bootstrap()
batch_state, pending_batch.changesets.size());

history.integrate_server_changesets(
*pending_batch.progress, downloadable_bytes, pending_batch.changesets, new_version, batch_state, logger,
transact, [&](const TransactionRef& tr, util::Span<Changeset> changesets_applied) {
*pending_batch.progress, 1.0, pending_batch.changesets, new_version, batch_state, logger, transact,
[&](const TransactionRef& tr, util::Span<Changeset> changesets_applied) {
REALM_ASSERT_3(changesets_applied.size(), <=, pending_batch.changesets.size());
bootstrap_store->pop_front_pending(tr, changesets_applied.size());
});
@@ -1647,6 +1653,7 @@ void SessionWrapper::check_progress()
DownloadableProgress downloadable;
ClientHistory::get_upload_download_state(*m_db, p.downloaded, downloadable, p.uploaded, p.uploadable, p.snapshot,
uploaded_version);
p.query_version = m_flx_last_seen_version;

report_progress(p, downloadable);
report_upload_completion(uploaded_version);
@@ -1701,28 +1708,28 @@ void SessionWrapper::report_progress(ReportedProgress& p, DownloadableProgress d
upload_estimate = calculate_progress(p.uploaded, p.uploadable, m_final_uploaded);

bool download_completed = p.downloaded == 0;
double download_estimate = 1.00;
p.download_estimate = 1.00;
if (m_flx_pending_bootstrap_store) {
p.download_estimate = downloadable.as_estimate();
if (m_flx_pending_bootstrap_store->has_pending()) {
download_estimate = downloadable.as_estimate();
p.downloaded += m_flx_pending_bootstrap_store->pending_stats().pending_changeset_bytes;
}
download_completed = download_estimate >= 1.0;
download_completed = p.download_estimate >= 1.0;

// for flx with download estimate these bytes are not known
// provide some sensible value for non-streaming version of object-store callbacks
// until these field are completely removed from the api after pbs deprecation
p.downloadable = p.downloaded;
if (download_estimate > 0 && download_estimate < 1.0 && p.downloaded > m_final_downloaded)
p.downloadable = m_final_downloaded + uint64_t((p.downloaded - m_final_downloaded) / download_estimate);
if (p.download_estimate > 0 && p.download_estimate < 1.0 && p.downloaded > m_final_downloaded)
p.downloadable = m_final_downloaded + uint64_t((p.downloaded - m_final_downloaded) / p.download_estimate);
}
else {
// uploadable_bytes is uploaded + remaining to upload, while downloadable_bytes
// is only the remaining to download. This is confusing, so make them use
// the same units.
p.downloadable = downloadable.as_bytes() + p.downloaded;
if (!download_completed)
download_estimate = calculate_progress(p.downloaded, p.downloadable, m_final_downloaded);
p.download_estimate = calculate_progress(p.downloaded, p.downloadable, m_final_downloaded);
}

if (download_completed)
@@ -1745,12 +1752,12 @@ void SessionWrapper::report_progress(ReportedProgress& p, DownloadableProgress d
m_sess->logger.debug(
"Progress handler called, downloaded = %1, downloadable = %2, estimate = %3, "
"uploaded = %4, uploadable = %5, estimate = %6, snapshot version = %7, query_version = %8",
p.downloaded, p.downloadable, to_str(download_estimate), p.uploaded, p.uploadable,
to_str(upload_estimate), p.snapshot, m_flx_active_version);
p.downloaded, p.downloadable, to_str(p.download_estimate), p.uploaded, p.uploadable,
to_str(upload_estimate), p.snapshot, p.query_version);
}

m_progress_handler(p.downloaded, p.downloadable, p.uploaded, p.uploadable, p.snapshot, download_estimate,
upload_estimate, m_flx_last_seen_version);
m_progress_handler(p.downloaded, p.downloadable, p.uploaded, p.uploadable, p.snapshot, p.download_estimate,
upload_estimate, p.query_version);
}

util::Future<std::string> SessionWrapper::send_test_command(std::string body)
12 changes: 12 additions & 0 deletions src/realm/sync/noinst/client_history_impl.cpp
Original file line number Diff line number Diff line change
@@ -953,6 +953,18 @@ void ClientHistory::update_sync_progress(const SyncProgress& progress, Downloada
trim_sync_history(); // Throws
}

void ClientHistory::set_download_progress(Transaction& tr, DownloadableProgress p)
{
using gf = _impl::GroupFriend;
ref_type ref = gf::get_history_ref(tr);
REALM_ASSERT(ref);
Array root(gf::get_alloc(tr));
root.init_from_ref(ref);
gf::set_history_parent(tr, root);
REALM_ASSERT(root.size() > s_progress_uploadable_bytes_iip);
root.set(s_progress_downloadable_bytes_iip,
RefOrTagged::make_tagged(p.as_bytes())); // Throws
}

void ClientHistory::trim_ct_history()
{
9 changes: 9 additions & 0 deletions src/realm/sync/noinst/client_history_impl.hpp
Original file line number Diff line number Diff line change
@@ -258,6 +258,15 @@ class ClientHistory final : public _impl::History, public TransformHistory {
std::uint_fast64_t&, std::uint_fast64_t&, version_type&);
static void get_upload_download_state(DB*, std::uint_fast64_t&, std::uint_fast64_t&);

/// Record the current download progress.
///
/// This is used when storing FLX bootstraps to make the progress available
/// to other processes which are observing the file. It must be called
/// inside of a write transaction. The data stored here is only meaningful
/// until the next call of integrate_server_changesets(), which will
/// overwrite it.
static void set_download_progress(Transaction& tr, DownloadableProgress);

// Overriding member functions in realm::TransformHistory
version_type find_history_entry(version_type, version_type, HistoryEntry&) const noexcept override;
ChunkedBinaryData get_reciprocal_transform(version_type, bool&) const override;
2 changes: 1 addition & 1 deletion src/realm/sync/noinst/client_impl_base.cpp
Original file line number Diff line number Diff line change
@@ -2467,7 +2467,7 @@ Status Session::receive_download_message(const DownloadMessage& message)
}
REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);

if (process_flx_bootstrap_message(progress, batch_state, query_version, message.changesets)) {
if (process_flx_bootstrap_message(message)) {
clear_resumption_delay_state();
return Status::OK();
}
3 changes: 1 addition & 2 deletions src/realm/sync/noinst/client_impl_base.hpp
Original file line number Diff line number Diff line change
@@ -920,8 +920,7 @@ class ClientImpl::Session {
// Processes an FLX download message, if it's a bootstrap message. If it's not a bootstrap
// message then this is a noop and will return false. Otherwise this will return true
// and no further processing of the download message should take place.
bool process_flx_bootstrap_message(const SyncProgress& progress, DownloadBatchState batch_state,
int64_t query_version, const ReceivedChangesets& received_changesets);
bool process_flx_bootstrap_message(const DownloadMessage& message);

// Processes any pending FLX bootstraps, if one exists. Otherwise this is a noop.
void process_pending_flx_bootstrap();
4 changes: 4 additions & 0 deletions src/realm/sync/noinst/pending_bootstrap_store.cpp
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@
#include "realm/list.hpp"
#include "realm/query.hpp"
#include "realm/sync/changeset_parser.hpp"
#include "realm/sync/noinst/client_history_impl.hpp"
#include "realm/sync/noinst/protocol_codec.hpp"
#include "realm/sync/noinst/sync_metadata_schema.hpp"
#include "realm/sync/protocol.hpp"
@@ -127,6 +128,7 @@ PendingBootstrapStore::PendingBootstrapStore(DBRef db, util::Logger& logger)
}

void PendingBootstrapStore::add_batch(int64_t query_version, util::Optional<SyncProgress> progress,
DownloadableProgress download_progress,
const _impl::ClientProtocol::ReceivedChangesets& changesets,
bool* created_new_batch_out)
{
@@ -176,6 +178,8 @@ void PendingBootstrapStore::add_batch(int64_t query_version, util::Optional<Sync
cur_changeset.set(m_changeset_data, compressed_data);
}

ClientHistory::set_download_progress(*tr, download_progress);

tr->commit();

if (created_new_batch_out) {
3 changes: 2 additions & 1 deletion src/realm/sync/noinst/pending_bootstrap_store.hpp
Original file line number Diff line number Diff line change
@@ -79,7 +79,8 @@ class PendingBootstrapStore {

// Adds a set of changesets to the store.
void add_batch(int64_t query_version, util::Optional<SyncProgress> progress,
const std::vector<RemoteChangeset>& changesets, bool* created_new_batch);
DownloadableProgress download_progress, const std::vector<RemoteChangeset>& changesets,
bool* created_new_batch);

void clear();
void clear(Transaction& wt);
17 changes: 1 addition & 16 deletions test/object-store/sync/app.cpp
Original file line number Diff line number Diff line change
@@ -4471,22 +4471,7 @@ TEST_CASE("app: full-text compatible with sync", "[sync][app][baas]") {
INFO("realm opened with async open");
auto async_open_task = Realm::get_synchronized_realm(config);

auto [realm_promise, realm_future] = util::make_promise_future<ThreadSafeReference>();
async_open_task->start(
[promise = std::move(realm_promise)](ThreadSafeReference ref, std::exception_ptr ouch) mutable {
if (ouch) {
try {
std::rethrow_exception(ouch);
}
catch (...) {
promise.set_error(exception_to_status());
}
}
else {
promise.emplace_value(std::move(ref));
}
});

auto realm_future = async_open_task->start();
realm = Realm::get_shared_realm(std::move(realm_future.get()));
}

16 changes: 2 additions & 14 deletions test/object-store/sync/flx_sync.cpp
Original file line number Diff line number Diff line change
@@ -4690,20 +4690,8 @@ TEST_CASE("flx sync: Client reset during async open", "[sync][flx][client reset]
};

auto realm_task = Realm::get_synchronized_realm(realm_config);
auto realm_pf = util::make_promise_future<SharedRealm>();
realm_task->start([&](ThreadSafeReference ref, std::exception_ptr ex) {
auto& promise = realm_pf.promise;
try {
if (ex) {
std::rethrow_exception(ex);
}
promise.emplace_value(Realm::get_shared_realm(std::move(ref), util::Scheduler::make_dummy()));
}
catch (...) {
promise.set_error(exception_to_status());
}
});
auto realm = realm_pf.future.get();
auto realm_future = realm_task->start();
auto realm = Realm::get_shared_realm(std::move(realm_future).get(), util::Scheduler::make_dummy());
before_callback_called.future.get();
after_callback_called.future.get();
REQUIRE(subscription_invoked.load());
Loading

0 comments on commit 4f83c59

Please sign in to comment.