Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync client shall not block user writes #5844

Merged
merged 23 commits into from
Sep 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
e9322ee
Reenable sync benchmark
danieltabacaru Sep 12, 2022
9e7e4f6
Update sync benchmark
danieltabacaru Sep 12, 2022
313ed46
Add method to check if other threads are waiting for the write mutex
danieltabacaru Sep 12, 2022
5429e85
Allow users to commit changes while sync client is integrating remote…
danieltabacaru Sep 12, 2022
7c62329
Fix typos/grammar in comments
danieltabacaru Sep 12, 2022
54b35d1
Fix hack in async write unit test
danieltabacaru Sep 12, 2022
6ef9f0e
Add sync unit tests
danieltabacaru Sep 12, 2022
793517e
Adapt benchmark code to use proper arguments in function call
danieltabacaru Sep 12, 2022
433d591
Clang-format + minor changes
danieltabacaru Sep 12, 2022
aa74107
Code review changes
danieltabacaru Sep 16, 2022
d5843dc
Fix bugs introduced after code review changes
danieltabacaru Sep 17, 2022
c44c806
Use thread instead of future
danieltabacaru Sep 21, 2022
3d15242
Move _impl::ForEventLoopDispatcher out of realm::util namespace so re…
danieltabacaru Sep 21, 2022
168ef37
Refactor transforming and applying server changesets into separate me…
danieltabacaru Sep 21, 2022
07856ba
Merge branch 'master' into dt/sync_client_shall_not_block_user_writes_2
danieltabacaru Sep 21, 2022
36ef668
Fix test after introducing DownloadBatchState::SteadyState
danieltabacaru Sep 21, 2022
97c5417
Update changelog
danieltabacaru Sep 21, 2022
b3584c8
Add integration test for pbs
danieltabacaru Sep 21, 2022
5640dcb
Merge branch 'master' into dt/sync_client_shall_not_block_user_writes_2
danieltabacaru Sep 21, 2022
475b669
It is a violation to call on_flx_sync_progress with SteadyState
danieltabacaru Sep 23, 2022
cea8594
Remove flaky tests
danieltabacaru Sep 26, 2022
1e68dd3
Merge branch 'master' into dt/sync_client_shall_not_block_user_writes_2
danieltabacaru Sep 26, 2022
6091910
Remove unused test code
danieltabacaru Sep 26, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

### Enhancements
* <New feature description> (PR [#????](https://github.com/realm/realm-core/pull/????))
* None.
* Prioritize integration of local changes over remote changes - shorten the time users may have to wait when committing local changes. Stop storing downloaded changesets in history. ([PR #5844](https://github.com/realm/realm-core/pull/5844)).

### Fixed
* <How do the end-user experience this issue? what was the impact?> ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?)
Expand All @@ -17,7 +17,7 @@
-----------

### Internals
* None.
* Reenable sync benchmark.

----------------------------------------------

Expand Down Expand Up @@ -7549,4 +7549,4 @@ Format:
2012-06-??
----------
- Group() interfaced changed. Now with multiple options. default option changed from readonly...
+ Generated C++ highlevel API for tables with up to 15 columns
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happened here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure. But there is no change.

+ Generated C++ highlevel API for tables with up to 15 columns
2 changes: 1 addition & 1 deletion doc/algebra_of_changesets.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ stepwise for a concatenated changeset:
S(α, A + B) = S(S(α, A), B) (1)

**Definition:** Two changesets `A` and `B`, having the same base state, `α`, are
*equivalent*, written as `A ~ B`, if, and onlæy if they produce the same final
*equivalent*, written as `A ~ B`, if, and only if they produce the same final
state, that is, if, and only if `S(α, A) = S(α, B)`. This does not mean that `A`
and `B` are equal.

Expand Down
10 changes: 10 additions & 0 deletions evergreen/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ functions:
exit 1
fi

export UNITTEST_THREADS=1

BENCHMARK=$(./evergreen/abspath.sh ./build/test/benchmark-${benchmark_name}/${cmake_build_type|Debug}/realm-benchmark-${benchmark_name})
echo "Going to run benchmark $BENCHMARK"

Expand Down Expand Up @@ -525,6 +527,14 @@ tasks:
vars:
benchmark_name: crud

- name: benchmark-sync
exec_timeout_secs: 1800
tags: [ "benchmark" ]
commands:
- func: "run benchmark"
vars:
benchmark_name: sync

- name: sync-tests
tags: [ "test_suite", "for_pull_requests" ]
exec_timeout_secs: 1800
Expand Down
11 changes: 11 additions & 0 deletions src/realm/db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1518,6 +1518,17 @@ void DB::close_internal(std::unique_lock<InterprocessMutex> lock, bool allow_ope
}
}

bool DB::other_writers_waiting_for_lock() const
{
SharedInfo* info = m_file_map.get_addr();

uint32_t next_ticket = info->next_ticket.load(std::memory_order_relaxed);
uint32_t next_served = info->next_served.load(std::memory_order_relaxed);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's safe for us to read these in any order because we hold the write lock? is that correct?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, at least the way we use it.

But I can think of a scenario where a thread starts writing (i.e, acquires the lock) and checks if other threads are waiting for the lock while at the same time another thread closes the transaction causing next_served to be incremented. This could then cause some wrong results. Any thoughts @finnschiermer ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For future reference: the above scenario cannot happen. The only thread which can close the transaction is the one holding the lock.

// When holding the write lock, next_ticket = next_served + 1, hence, if the diference between 'next_ticket' and
// 'next_served' is greater than 1, there is at least one thread waiting to acquire the write lock.
return next_ticket > next_served + 1;
}

class DB::AsyncCommitHelper {
public:
AsyncCommitHelper(DB* db)
Expand Down
4 changes: 4 additions & 0 deletions src/realm/db.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,10 @@ class DB : public std::enable_shared_from_this<DB> {
void claim_sync_agent();
void release_sync_agent();

/// Returns true if there are threads waiting to acquire the write lock, false otherwise.
/// To be used only when already holding the lock.
bool other_writers_waiting_for_lock() const;

protected:
explicit DB(const DBOptions& options); // Is this ever used?

Expand Down
15 changes: 12 additions & 3 deletions src/realm/object-store/sync/sync_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -740,9 +740,10 @@ void SyncSession::create_sync_session()
session_config.simulate_integration_error = sync_config.simulate_integration_error;
if (sync_config.on_download_message_received_hook) {
session_config.on_download_message_received_hook =
[hook = sync_config.on_download_message_received_hook, anchor = weak_from_this()](
const sync::SyncProgress& progress, int64_t query_version, sync::DownloadBatchState batch_state) {
hook(anchor, progress, query_version, batch_state);
[hook = sync_config.on_download_message_received_hook,
anchor = weak_from_this()](const sync::SyncProgress& progress, int64_t query_version,
sync::DownloadBatchState batch_state, size_t num_changesets) {
hook(anchor, progress, query_version, batch_state, num_changesets);
};
}
if (sync_config.on_bootstrap_message_processed_hook) {
Expand All @@ -753,6 +754,14 @@ void SyncSession::create_sync_session()
return hook(anchor, progress, query_version, batch_state);
};
}
if (sync_config.on_download_message_integrated_hook) {
session_config.on_download_message_integrated_hook =
[hook = sync_config.on_download_message_integrated_hook,
anchor = weak_from_this()](const sync::SyncProgress& progress, int64_t query_version,
sync::DownloadBatchState batch_state, size_t num_changesets) {
hook(anchor, progress, query_version, batch_state, num_changesets);
};
}

{
std::string sync_route = m_sync_manager->sync_route();
Expand Down
4 changes: 4 additions & 0 deletions src/realm/object-store/util/event_loop_dispatcher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ class EventLoopDispatcher<void(Args...)> {
}
};

} // namespace util
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious why we needed to do this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was a problem on Windows due to the compiler. event_loop_dispatcher.hpp defined things in realm::util namespace, which results in _impl::EnableIfSpanCompatible being realm::util::_impl::EnableIfSpanCompatible if that header is included. But the code relies on realm::_impl::EnableIfSpanCompatible existing. It is a "fix" that Thomas suggested.


namespace _impl::ForEventLoopDispatcher {
template <typename Sig>
struct ExtractSignatureImpl {
Expand Down Expand Up @@ -106,6 +108,8 @@ template <typename T>
using ExtractSignature = typename ExtractSignatureImpl<T>::signature;
} // namespace _impl::ForEventLoopDispatcher

namespace util {

// Deduction guide for function pointers.
template <typename... Args>
EventLoopDispatcher(void (*)(Args...)) -> EventLoopDispatcher<void(Args...)>;
Expand Down
4 changes: 4 additions & 0 deletions src/realm/sync/changeset.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ struct Changeset {
/// be part of refactoring the ChangesetIndex
size_t transform_sequence = 0;

/// If the changeset was compacted during download, the size of the original
/// changeset. Only applies to changesets sent by the server.
std::size_t original_changeset_size = 0;

/// Compare for exact equality, including that interned strings have the
/// same integer values, and there is the same number of interned strings,
/// same topology of tombstones, etc.
Expand Down
54 changes: 43 additions & 11 deletions src/realm/sync/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,9 @@ class SessionWrapper final : public util::AtomicRefCountBase, public SyncTransac
util::UniqueFunction<ProgressHandler> m_progress_handler;
util::UniqueFunction<ConnectionStateChangeListener> m_connection_state_change_listener;

std::function<void(const SyncProgress&, int64_t, DownloadBatchState)> m_on_download_message_received_hook;
std::function<void(const SyncProgress&, int64_t, DownloadBatchState, size_t)> m_on_download_message_received_hook;
std::function<bool(const SyncProgress&, int64_t, DownloadBatchState)> m_on_bootstrap_message_processed_hook;
std::function<void(const SyncProgress&, int64_t, DownloadBatchState, size_t)> on_download_message_integrated_hook;

std::shared_ptr<SubscriptionStore> m_flx_subscription_store;
int64_t m_flx_active_version = 0;
Expand Down Expand Up @@ -731,12 +732,7 @@ void SessionImpl::on_resumed()
bool SessionImpl::process_flx_bootstrap_message(const SyncProgress& progress, DownloadBatchState batch_state,
int64_t query_version, const ReceivedChangesets& received_changesets)
{
if (!m_is_flx_sync_session) {
return false;
}

// If this is a steady state DOWNLOAD, no need for special handling.
if (batch_state == DownloadBatchState::LastInBatch && query_version == m_wrapper.m_flx_active_version) {
if (is_steady_state_download_message(batch_state, query_version)) {
return false;
}

Expand Down Expand Up @@ -810,12 +806,15 @@ void SessionImpl::process_pending_flx_bootstrap()

history.integrate_server_changesets(
*pending_batch.progress, &downloadable_bytes, pending_batch.changesets, new_version, batch_state, logger,
[&](const TransactionRef& tr) {
bootstrap_store->pop_front_pending(tr, pending_batch.changesets.size());
[&](const TransactionRef& tr, size_t count) {
REALM_ASSERT_3(count, <=, pending_batch.changesets.size());
bootstrap_store->pop_front_pending(tr, count);
},
get_transact_reporter());
progress = *pending_batch.progress;

download_message_integrated_hook(progress, query_version, batch_state, pending_batch.changesets.size());

logger.info("Integrated %1 changesets from pending bootstrap for query version %2, producing client version "
"%3. %4 changesets remaining in bootstrap",
pending_batch.changesets.size(), pending_batch.query_version, new_version.realm_version,
Expand Down Expand Up @@ -860,12 +859,40 @@ void SessionImpl::non_sync_flx_completion(int64_t version)
}

void SessionImpl::receive_download_message_hook(const SyncProgress& progress, int64_t query_version,
DownloadBatchState batch_state)
DownloadBatchState batch_state, size_t num_changesets)
{
if (REALM_LIKELY(!m_wrapper.m_on_download_message_received_hook)) {
return;
}
m_wrapper.m_on_download_message_received_hook(progress, query_version, batch_state);
m_wrapper.m_on_download_message_received_hook(progress, query_version, batch_state, num_changesets);
}

void SessionImpl::download_message_integrated_hook(const SyncProgress& progress, int64_t query_version,
DownloadBatchState batch_state, size_t num_changesets)
{
if (REALM_LIKELY(!m_wrapper.on_download_message_integrated_hook)) {
return;
}

m_wrapper.on_download_message_integrated_hook(progress, query_version, batch_state, num_changesets);
}

bool SessionImpl::is_steady_state_download_message(DownloadBatchState batch_state, int64_t query_version)
{
if (batch_state == DownloadBatchState::SteadyState) {
return true;
}

if (!m_is_flx_sync_session) {
return true;
}

// If this is a steady state DOWNLOAD, no need for special handling.
if (batch_state == DownloadBatchState::LastInBatch && query_version == m_wrapper.m_flx_active_version) {
return true;
}

return false;
}

// ################ SessionWrapper ################
Expand All @@ -891,6 +918,7 @@ SessionWrapper::SessionWrapper(ClientImpl& client, DBRef db, std::shared_ptr<Sub
, m_proxy_config{config.proxy_config} // Throws
, m_on_download_message_received_hook(std::move(config.on_download_message_received_hook))
, m_on_bootstrap_message_processed_hook(config.on_bootstrap_message_processed_hook)
, on_download_message_integrated_hook{std::move(config.on_download_message_integrated_hook)}
, m_flx_subscription_store(std::move(flx_sub_store))
{
REALM_ASSERT(m_db);
Expand Down Expand Up @@ -967,10 +995,14 @@ void SessionWrapper::on_flx_sync_progress(int64_t new_version, DownloadBatchStat
}
REALM_ASSERT(new_version >= m_flx_last_seen_version);
REALM_ASSERT(new_version >= m_flx_active_version);
REALM_ASSERT(batch_state != DownloadBatchState::SteadyState);

SubscriptionSet::State new_state = SubscriptionSet::State::Uncommitted; // Initialize to make compiler happy

switch (batch_state) {
case DownloadBatchState::SteadyState:
// Cannot be called with this value.
REALM_UNREACHABLE();
case DownloadBatchState::LastInBatch:
if (m_flx_active_version == new_version) {
return;
Expand Down
17 changes: 10 additions & 7 deletions src/realm/sync/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class BadServerUrl; // Exception
/// their bound state), as long as they are associated with the same client
/// object, or with two different client objects that do not overlap in
/// time. This means, in particular, that it is an error to create two bound
/// session objects for the same local Realm file, it they are associated with
/// session objects for the same local Realm file, if they are associated with
/// two different client objects that overlap in time, even if the session
/// objects do not overlap in time (in their bound state). It is the
/// responsibility of the application to ensure that these rules are adhered
Expand Down Expand Up @@ -313,15 +313,18 @@ class Session {
/// This feature exists exclusively for testing purposes at this time.
bool simulate_integration_error = false;

// Will be called after a download message is received and validated by
// the client but befefore it's been transformed or applied. To be used in
// testing only.
std::function<void(const sync::SyncProgress&, int64_t, sync::DownloadBatchState)>
/// Will be called after a download message is received and validated by
/// the client but befefore it's been transformed or applied. To be used in
/// testing only.
std::function<void(const sync::SyncProgress&, int64_t, sync::DownloadBatchState, size_t)>
on_download_message_received_hook;
// Will be called after each bootstrap message is added to the pending bootstrap store,
// but before processing a finalized bootstrap. For testing only.
/// Will be called after each bootstrap message is added to the pending bootstrap store,
/// but before processing a finalized bootstrap. For testing only.
std::function<bool(const sync::SyncProgress&, int64_t, sync::DownloadBatchState)>
on_bootstrap_message_processed_hook;
/// Will be called after a download message is integrated. For testing only.
std::function<void(const sync::SyncProgress&, int64_t, sync::DownloadBatchState, size_t)>
on_download_message_integrated_hook;
};

/// \brief Start a new session for the specified client-side Realm.
Expand Down
7 changes: 6 additions & 1 deletion src/realm/sync/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,17 @@ struct SyncConfig {

// Will be called after a download message is received and validated by the client but befefore it's been
// transformed or applied. To be used in testing only.
std::function<void(std::weak_ptr<SyncSession>, const sync::SyncProgress&, int64_t, sync::DownloadBatchState)>
std::function<void(std::weak_ptr<SyncSession>, const sync::SyncProgress&, int64_t, sync::DownloadBatchState,
size_t)>
on_download_message_received_hook;
// Will be called after each bootstrap message is added to the pending bootstrap store, but before
// processing a finalized bootstrap. For testing only.
std::function<bool(std::weak_ptr<SyncSession>, const sync::SyncProgress&, int64_t, sync::DownloadBatchState)>
on_bootstrap_message_processed_hook;
// Will be called after a download message is integrated. For testing only.
std::function<void(std::weak_ptr<SyncSession>, const sync::SyncProgress&, int64_t, sync::DownloadBatchState,
size_t)>
on_download_message_integrated_hook;

bool simulate_integration_error = false;

Expand Down
2 changes: 1 addition & 1 deletion src/realm/sync/noinst/changeset_index.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace _impl {

/// The ChangesetIndex is responsible for keeping track of exactly which
/// instructions touch which objects. It does this by recording ranges of
/// instructions in changesets, such that the merge algorithm can make do with
/// instructions in changesets, such that the merge algorithm can do with
/// just merging the "relevant" instructions. Due to the semantics of link
/// nullification, instruction ranges for objects that have ever been
/// "connected" by a link instruction must be joined together. In other words,
Expand Down
Loading