Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into feature/exception-u…
Browse files Browse the repository at this point in the history
…nification

* origin/master:
  Fix a data race in notifier packaging (#5892)
  Install util/http.hpp (#5893)
  Greatly improve performance of sorting dictionaries (#5168)
  Sync client shall not block user writes (#5844)
  update err message check (#5884)
  • Loading branch information
tgoyne committed Sep 27, 2022
2 parents dd011d1 + 18582a8 commit eb06da5
Show file tree
Hide file tree
Showing 44 changed files with 895 additions and 705 deletions.
11 changes: 6 additions & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

### Enhancements
* <New feature description> (PR [#????](https://github.com/realm/realm-core/pull/????))
* None.
* Prioritize integration of local changes over remote changes - shorten the time users may have to wait when committing local changes. Stop storing downloaded changesets in history. ([PR #5844](https://github.com/realm/realm-core/pull/5844)).
* Greatly improve the performance of sorting or distincting a Dictionary's keys or values. The most expensive operation is now performed O(log N) rather than O(N log N) times, and large Dictionaries can see upwards of 99% reduction in time to sort. ([PR #5166](https://github.com/realm/realm-core/pulls/5166))

### Fixed
* <How do the end-user experience this issue? what was the impact?> ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?)
* The C API type `realm_sync_error_code_t` did not include a textural representation of the underlying category. ([#5399](https://github.com/realm/realm-core/issues/5399)),
* Calling Results::sum/min/max on a collection field throws an exception rather than aggregating on the underlying memory addresses. ([#5137](https://github.com/realm/realm-core/issues/5137))

* Fix a data race reported by thread sanitizer when preparing to deliver change notifications. This probably did not cause observable problems in practice ([PR #5892](https://github.com/realm/realm-core/pull/5892) since 12.7.0).

### Breaking changes
* All exceptions thrown out of Core are now of type 'Exception'. All use of std::runtime_error and std::logical_error etc. has stopped and the specialized error classes that beforehand were based on these are now based on Exception.

Expand All @@ -18,7 +18,8 @@
-----------

### Internals
* None.
* Reenable sync benchmark.
* Add util/http.hpp to the release package.

----------------------------------------------

Expand Down
2 changes: 1 addition & 1 deletion dependencies.list
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
PACKAGE_NAME=realm-core
VERSION=12.8.0
OPENSSL_VERSION=1.1.1n
MDBREALM_TEST_SERVER_TAG=2022-09-01
MDBREALM_TEST_SERVER_TAG=2022-09-22
2 changes: 1 addition & 1 deletion doc/algebra_of_changesets.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ stepwise for a concatenated changeset:
S(α, A + B) = S(S(α, A), B) (1)

**Definition:** Two changesets `A` and `B`, having the same base state, `α`, are
*equivalent*, written as `A ~ B`, if, and onlæy if they produce the same final
*equivalent*, written as `A ~ B`, if, and only if they produce the same final
state, that is, if, and only if `S(α, A) = S(α, B)`. This does not mean that `A`
and `B` are equal.

Expand Down
12 changes: 11 additions & 1 deletion evergreen/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ functions:
exit 1
fi
export UNITTEST_THREADS=1
BENCHMARK=$(./evergreen/abspath.sh ./build/test/benchmark-${benchmark_name}/${cmake_build_type|Debug}/realm-benchmark-${benchmark_name})
echo "Going to run benchmark $BENCHMARK"
Expand Down Expand Up @@ -525,6 +527,14 @@ tasks:
vars:
benchmark_name: crud

- name: benchmark-sync
exec_timeout_secs: 1800
tags: [ "benchmark" ]
commands:
- func: "run benchmark"
vars:
benchmark_name: sync

- name: sync-tests
tags: [ "test_suite", "for_pull_requests" ]
exec_timeout_secs: 1800
Expand Down Expand Up @@ -567,7 +577,7 @@ tasks:
export DEVELOPER_DIR="${xcode_developer_dir}"
fi
./evergreen/install_baas.sh -w ./baas-work-dir 2>&1 | tee install_baas_output.log
./evergreen/install_baas.sh -w ./baas-work-dir -b master 2>&1 | tee install_baas_output.log
fi
- command: shell.exec
Expand Down
1 change: 1 addition & 0 deletions src/realm/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ set(REALM_INSTALL_HEADERS
util/functional.hpp
util/future.hpp
util/hex_dump.hpp
util/http.hpp
util/input_stream.hpp
util/inspect.hpp
util/interprocess_condvar.hpp
Expand Down
11 changes: 11 additions & 0 deletions src/realm/db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1519,6 +1519,17 @@ void DB::close_internal(std::unique_lock<InterprocessMutex> lock, bool allow_ope
}
}

bool DB::other_writers_waiting_for_lock() const
{
SharedInfo* info = m_file_map.get_addr();

uint32_t next_ticket = info->next_ticket.load(std::memory_order_relaxed);
uint32_t next_served = info->next_served.load(std::memory_order_relaxed);
// When holding the write lock, next_ticket = next_served + 1, hence, if the diference between 'next_ticket' and
// 'next_served' is greater than 1, there is at least one thread waiting to acquire the write lock.
return next_ticket > next_served + 1;
}

class DB::AsyncCommitHelper {
public:
AsyncCommitHelper(DB* db)
Expand Down
4 changes: 4 additions & 0 deletions src/realm/db.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,10 @@ class DB : public std::enable_shared_from_this<DB> {
void claim_sync_agent();
void release_sync_agent();

/// Returns true if there are threads waiting to acquire the write lock, false otherwise.
/// To be used only when already holding the lock.
bool other_writers_waiting_for_lock() const;

protected:
explicit DB(const DBOptions& options); // Is this ever used?

Expand Down
74 changes: 47 additions & 27 deletions src/realm/dictionary.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -485,27 +485,49 @@ void Dictionary::align_indices(std::vector<size_t>& indices) const
}
}

void Dictionary::sort(std::vector<size_t>& indices, bool ascending) const
namespace {
std::vector<Mixed> get_keys(const Dictionary& dict)
{
std::vector<Mixed> values;
values.reserve(dict.size());
for (auto it = dict.begin(), end = dict.end(); it != end; ++it)
values.push_back(it.key());
return values;
}

std::vector<Mixed> get_values(const Dictionary& dict)
{
std::vector<Mixed> values;
values.reserve(dict.size());
for (auto it = dict.begin(), end = dict.end(); it != end; ++it)
values.push_back(it.value());
return values;
}

void do_sort(std::vector<size_t>& indices, bool ascending, const std::vector<Mixed>& values)
{
align_indices(indices);
auto b = indices.begin();
auto e = indices.end();
std::sort(b, e, [this, ascending](size_t i1, size_t i2) {
auto v1 = get_any(i1);
auto v2 = get_any(i2);
return ascending ? v1 < v2 : v2 < v1;
std::sort(b, e, [ascending, &values](size_t i1, size_t i2) {
return ascending ? values[i1] < values[i2] : values[i2] < values[i1];
});
}
} // anonymous namespace

void Dictionary::distinct(std::vector<size_t>& indices, util::Optional<bool> ascending) const
void Dictionary::sort(std::vector<size_t>& indices, bool ascending) const
{
align_indices(indices);
do_sort(indices, ascending, get_values(*this));
}

bool sort_ascending = ascending ? *ascending : true;
sort(indices, sort_ascending);
void Dictionary::distinct(std::vector<size_t>& indices, util::Optional<bool> ascending) const
{
align_indices(indices);
auto values = get_values(*this);
do_sort(indices, ascending.value_or(true), values);
indices.erase(std::unique(indices.begin(), indices.end(),
[this](size_t i1, size_t i2) {
return get_any(i1) == get_any(i2);
[&values](size_t i1, size_t i2) {
return values[i1] == values[i2];
}),
indices.end());

Expand All @@ -518,13 +540,7 @@ void Dictionary::distinct(std::vector<size_t>& indices, util::Optional<bool> asc
void Dictionary::sort_keys(std::vector<size_t>& indices, bool ascending) const
{
align_indices(indices);
auto b = indices.begin();
auto e = indices.end();
std::sort(b, e, [this, ascending](size_t i1, size_t i2) {
auto k1 = get_key(i1);
auto k2 = get_key(i2);
return ascending ? k1 < k2 : k2 < k1;
});
do_sort(indices, ascending, get_keys(*this));
}

void Dictionary::distinct_keys(std::vector<size_t>& indices, util::Optional<bool>) const
Expand Down Expand Up @@ -1106,34 +1122,38 @@ Dictionary::Iterator::Iterator(const Dictionary* dict, size_t pos)
{
}

auto Dictionary::Iterator::operator*() const -> value_type
Mixed Dictionary::Iterator::key() const
{
update();
Mixed key;
switch (m_key_type) {
case type_String: {
ArrayString keys(m_tree.get_alloc());
ref_type ref = to_ref(Array::get(m_leaf.get_mem().get_addr(), 1));
keys.init_from_ref(ref);
key = Mixed(keys.get(m_state.m_current_index));
break;
return Mixed(keys.get(m_state.m_current_index));
}
case type_Int: {
ArrayInteger keys(m_tree.get_alloc());
ref_type ref = to_ref(Array::get(m_leaf.get_mem().get_addr(), 1));
keys.init_from_ref(ref);
key = Mixed(keys.get(m_state.m_current_index));
break;
return Mixed(keys.get(m_state.m_current_index));
}
default:
REALM_UNREACHABLE();
break;
}
}

Mixed Dictionary::Iterator::value() const
{
ArrayMixed values(m_tree.get_alloc());
ref_type ref = to_ref(Array::get(m_leaf.get_mem().get_addr(), 2));
values.init_from_ref(ref);
return values.get(m_state.m_current_index);
}

return std::make_pair(key, values.get(m_state.m_current_index));
auto Dictionary::Iterator::operator*() const -> value_type
{
update();
return std::make_pair(key(), value());
}


Expand Down
3 changes: 3 additions & 0 deletions src/realm/dictionary.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,9 @@ class Dictionary::Iterator : public ClusterTree::Iterator {

using ClusterTree::Iterator::get_position;

Mixed key() const;
Mixed value() const;

private:
friend class Dictionary;

Expand Down
15 changes: 5 additions & 10 deletions src/realm/object-store/impl/collection_notifier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,28 +413,23 @@ NotifierPackage::NotifierPackage(std::vector<std::shared_ptr<CollectionNotifier>
{
}

NotifierPackage::NotifierPackage(std::vector<std::shared_ptr<CollectionNotifier>> notifiers)
: m_notifiers(std::move(notifiers))
NotifierPackage::NotifierPackage(std::vector<std::shared_ptr<CollectionNotifier>> notifiers,
std::optional<VersionID> version)
: m_version(version)
, m_notifiers(std::move(notifiers))
{
set_version();
}

void NotifierPackage::package_and_wait(VersionID::version_type target_version)
{
if (!m_coordinator || !*this)
return;

m_coordinator->package_notifiers(m_notifiers, target_version);
set_version();
m_version = m_coordinator->package_notifiers(m_notifiers, target_version);
if (m_version && m_version->version < target_version)
m_version = util::none;
}

void NotifierPackage::set_version()
{
m_version = m_notifiers.empty() ? util::none : std::make_optional(m_notifiers.front()->version());
}

void NotifierPackage::before_advance()
{
for (auto& notifier : m_notifiers)
Expand Down
4 changes: 1 addition & 3 deletions src/realm/object-store/impl/collection_notifier.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ class NotifierPackage {
NotifierPackage() = default;

// Create a package which contains notifiers which have already been pacakged for delivery
NotifierPackage(std::vector<std::shared_ptr<CollectionNotifier>> notifiers);
NotifierPackage(std::vector<std::shared_ptr<CollectionNotifier>> notifiers, std::optional<VersionID> version);
// Create a package which can have package_and_wait() called on it later
NotifierPackage(std::vector<std::shared_ptr<CollectionNotifier>> notifiers, RealmCoordinator* coordinator);

Expand Down Expand Up @@ -362,8 +362,6 @@ class NotifierPackage {
util::Optional<VersionID> m_version;
std::vector<std::shared_ptr<CollectionNotifier>> m_notifiers;
RealmCoordinator* m_coordinator = nullptr;

void set_version();
};

} // namespace realm::_impl
Expand Down
15 changes: 9 additions & 6 deletions src/realm/object-store/impl/realm_coordinator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1151,7 +1151,8 @@ void RealmCoordinator::advance_to_ready(Realm& realm)
}

// We have notifiers for a newer version, so advance to that
transaction::advance(tr, realm.m_binding_context.get(), std::move(notifiers));
transaction::advance(tr, realm.m_binding_context.get(),
_impl::NotifierPackage(std::move(notifiers), notifier_version));
}

std::vector<std::shared_ptr<_impl::CollectionNotifier>> RealmCoordinator::notifiers_for_realm(Realm& realm)
Expand All @@ -1177,11 +1178,11 @@ bool RealmCoordinator::advance_to_latest(Realm& realm)
util::CheckedUniqueLock lock(m_notifier_mutex);
notifiers = notifiers_for_realm(realm);
}
package_notifiers(notifiers, m_db->get_version_of_latest_snapshot());
auto version = package_notifiers(notifiers, m_db->get_version_of_latest_snapshot());

auto version = tr->get_version_of_current_transaction();
transaction::advance(tr, realm.m_binding_context.get(), _impl::NotifierPackage(std::move(notifiers)));
return !realm.is_closed() && version != tr->get_version_of_current_transaction();
auto prev_version = tr->get_version_of_current_transaction();
transaction::advance(tr, realm.m_binding_context.get(), _impl::NotifierPackage(std::move(notifiers), version));
return !realm.is_closed() && prev_version != tr->get_version_of_current_transaction();
}

void RealmCoordinator::promote_to_write(Realm& realm)
Expand Down Expand Up @@ -1236,7 +1237,8 @@ void RealmCoordinator::process_available_async(Realm& realm)
realm.m_binding_context->did_send_notifications();
}

void RealmCoordinator::package_notifiers(NotifierVector& notifiers, VersionID::version_type target_version)
std::optional<VersionID> RealmCoordinator::package_notifiers(NotifierVector& notifiers,
VersionID::version_type target_version)
{
auto ready = [&] {
util::CheckedUniqueLock notifier_lock(m_notifier_mutex);
Expand All @@ -1259,6 +1261,7 @@ void RealmCoordinator::package_notifiers(NotifierVector& notifiers, VersionID::v
notifier->version().version < target_version;
};
notifiers.erase(std::remove_if(begin(notifiers), end(notifiers), package), end(notifiers));
return notifiers.empty() ? std::optional<VersionID>{} : notifiers.front()->version();
}

bool RealmCoordinator::compact()
Expand Down
2 changes: 1 addition & 1 deletion src/realm/object-store/impl/realm_coordinator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ class RealmCoordinator : public std::enable_shared_from_this<RealmCoordinator> {
// Called by NotifierPackage in the cases where we don't know what version
// we need notifiers for until after we begin advancing (e.g. when
// starting a write transaction).
void package_notifiers(NotifierVector& notifiers, VersionID::version_type)
std::optional<VersionID> package_notifiers(NotifierVector& notifiers, VersionID::version_type)
REQUIRES(!m_notifier_mutex, !m_running_notifiers_mutex);

// testing hook only to verify that notifiers are not being run at times
Expand Down
15 changes: 12 additions & 3 deletions src/realm/object-store/sync/sync_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -740,9 +740,10 @@ void SyncSession::create_sync_session()
session_config.simulate_integration_error = sync_config.simulate_integration_error;
if (sync_config.on_download_message_received_hook) {
session_config.on_download_message_received_hook =
[hook = sync_config.on_download_message_received_hook, anchor = weak_from_this()](
const sync::SyncProgress& progress, int64_t query_version, sync::DownloadBatchState batch_state) {
hook(anchor, progress, query_version, batch_state);
[hook = sync_config.on_download_message_received_hook,
anchor = weak_from_this()](const sync::SyncProgress& progress, int64_t query_version,
sync::DownloadBatchState batch_state, size_t num_changesets) {
hook(anchor, progress, query_version, batch_state, num_changesets);
};
}
if (sync_config.on_bootstrap_message_processed_hook) {
Expand All @@ -753,6 +754,14 @@ void SyncSession::create_sync_session()
return hook(anchor, progress, query_version, batch_state);
};
}
if (sync_config.on_download_message_integrated_hook) {
session_config.on_download_message_integrated_hook =
[hook = sync_config.on_download_message_integrated_hook,
anchor = weak_from_this()](const sync::SyncProgress& progress, int64_t query_version,
sync::DownloadBatchState batch_state, size_t num_changesets) {
hook(anchor, progress, query_version, batch_state, num_changesets);
};
}

{
std::string sync_route = m_sync_manager->sync_route();
Expand Down
4 changes: 4 additions & 0 deletions src/realm/object-store/util/event_loop_dispatcher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ class EventLoopDispatcher<void(Args...)> {
}
};

} // namespace util

namespace _impl::ForEventLoopDispatcher {
template <typename Sig>
struct ExtractSignatureImpl {
Expand Down Expand Up @@ -106,6 +108,8 @@ template <typename T>
using ExtractSignature = typename ExtractSignatureImpl<T>::signature;
} // namespace _impl::ForEventLoopDispatcher

namespace util {

// Deduction guide for function pointers.
template <typename... Args>
EventLoopDispatcher(void (*)(Args...)) -> EventLoopDispatcher<void(Args...)>;
Expand Down
Loading

0 comments on commit eb06da5

Please sign in to comment.