Skip to content

Commit

Permalink
Merge branch 'master' of github.com:realm/realm-core into mwb/event-l…
Browse files Browse the repository at this point in the history
…oop-migration
  • Loading branch information
Michael Wilkerson-Barker committed Jan 21, 2023
2 parents c93aecd + f438b7a commit f5d2ff2
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* Fixed diverging history in flexible sync if writes occur during bootstrap to objects that just came into view ([#5804](https://github.com/realm/realm-core/issues/5804), since v11.7.0)
* Fix several data races when opening cached frozen Realms. New frozen Realms were added to the cache and the lock released before they were fully initialized, resulting in races if they were immediately read from the cache on another thread ([PR #6211](https://github.com/realm/realm-core/pull/6211), since v6.0.0).
* Properties and types not present in the requested schema would be missing from the reported schema in several scenarios, such as if the Realm was being opened with a different schema version than the persisted one, and if the new tables or columns were added while the Realm instance did not have an active read transaction ([PR #6211](https://github.com/realm/realm-core/pull/6211), since v13.2.0).
* If a client reset w/recovery or discard local is interrupted while the "fresh" realm is being downloaded, the sync client may crash with a MultpleSyncAgents exception ([#6217](https://github.com/realm/realm-core/issues/6217), since v11.13.0)

### Breaking changes
* `SyncSession::log_out()` has been renamed to `SyncSession::force_close()` to reflect what it actually does ([#6183](https://github.com/realm/realm-core/pull/6183))
Expand Down
54 changes: 32 additions & 22 deletions src/realm/object-store/sync/sync_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,9 @@ void SyncSession::download_fresh_realm(sync::ProtocolErrorInfo::Action server_re
auto mode = config(&SyncConfig::client_resync_mode);
if (mode == ClientResyncMode::Recover) {
handle_fresh_realm_downloaded(
nullptr, {"A client reset is required but the server does not permit recovery for this client"},
nullptr,
{ErrorCodes::RuntimeError,
"A client reset is required but the server does not permit recovery for this client"},
server_requests_action);
}
}
Expand Down Expand Up @@ -419,36 +421,36 @@ void SyncSession::download_fresh_realm(sync::ProtocolErrorInfo::Action server_re
db = DB::create(sync::make_client_replication(), fresh_path, options);
}
}
catch (std::exception const& e) {
catch (...) {
// Failed to open the fresh path after attempting to delete it, so we
// just can't do automatic recovery.
handle_fresh_realm_downloaded(nullptr, std::string(e.what()), server_requests_action);
handle_fresh_realm_downloaded(nullptr, exception_to_status(), server_requests_action);
return;
}

util::CheckedLockGuard state_lock(m_state_mutex);
if (m_state != State::Active) {
return;
}
std::shared_ptr<SyncSession> sync_session;
std::shared_ptr<SyncSession> fresh_sync_session;
{
util::CheckedLockGuard config_lock(m_config_mutex);
RealmConfig config = m_config;
config.path = fresh_path;
// deep copy the sync config so we don't modify the live session's config
config.sync_config = std::make_shared<SyncConfig>(*m_config.sync_config);
config.sync_config->stop_policy = SyncSessionStopPolicy::Immediately;
config.sync_config->client_resync_mode = ClientResyncMode::Manual;
sync_session = create(m_client, db, config, m_sync_manager);
fresh_sync_session = m_sync_manager->get_session(db, config);
auto& history = static_cast<sync::ClientReplication&>(*db->get_replication());
// the fresh Realm may apply writes to this db after it has outlived its sync session
// the writes are used to generate a changeset for recovery, but are never committed
history.set_write_validator_factory({});
}

sync_session->assert_mutex_unlocked();
fresh_sync_session->assert_mutex_unlocked();
if (m_flx_subscription_store) {
sync::SubscriptionSet active = m_flx_subscription_store->get_active();
auto fresh_sub_store = sync_session->get_flx_subscription_store();
auto fresh_sub_store = fresh_sync_session->get_flx_subscription_store();
REALM_ASSERT(fresh_sub_store);
auto fresh_mut_sub = fresh_sub_store->get_latest().make_mutable_copy();
fresh_mut_sub.import(active);
Expand All @@ -458,37 +460,41 @@ void SyncSession::download_fresh_realm(sync::ProtocolErrorInfo::Action server_re
.get_async([=, weak_self = weak_from_this()](StatusWith<sync::SubscriptionSet::State> s) {
// Keep the sync session alive while it's downloading, but then close
// it immediately
sync_session->close();
fresh_sync_session->force_close();
if (auto strong_self = weak_self.lock()) {
if (s.is_ok()) {
strong_self->handle_fresh_realm_downloaded(db, none, server_requests_action);
strong_self->handle_fresh_realm_downloaded(db, Status::OK(), server_requests_action);
}
else {
strong_self->handle_fresh_realm_downloaded(nullptr, s.get_status().reason(),
server_requests_action);
strong_self->handle_fresh_realm_downloaded(nullptr, s.get_status(), server_requests_action);
}
}
});
}
else { // pbs
sync_session->wait_for_download_completion([=, weak_self = weak_from_this()](std::error_code ec) {
fresh_sync_session->wait_for_download_completion([=, weak_self = weak_from_this()](std::error_code ec) {
// Keep the sync session alive while it's downloading, but then close
// it immediately
sync_session->close();
fresh_sync_session->force_close();
if (auto strong_self = weak_self.lock()) {
if (ec) {
strong_self->handle_fresh_realm_downloaded(nullptr, ec.message(), server_requests_action);
if (ec == util::error::operation_aborted) {
strong_self->handle_fresh_realm_downloaded(nullptr, {ErrorCodes::OperationAborted, ec.message()},
server_requests_action);
}
else if (ec) {
strong_self->handle_fresh_realm_downloaded(nullptr, {ErrorCodes::RuntimeError, ec.message()},
server_requests_action);
}
else {
strong_self->handle_fresh_realm_downloaded(db, none, server_requests_action);
strong_self->handle_fresh_realm_downloaded(db, Status::OK(), server_requests_action);
}
}
});
}
sync_session->revive_if_needed();
fresh_sync_session->revive_if_needed();
}

void SyncSession::handle_fresh_realm_downloaded(DBRef db, util::Optional<std::string> error_message,
void SyncSession::handle_fresh_realm_downloaded(DBRef db, Status status,
sync::ProtocolErrorInfo::Action server_requests_action)
{
util::CheckedUniqueLock lock(m_state_mutex);
Expand All @@ -499,7 +505,10 @@ void SyncSession::handle_fresh_realm_downloaded(DBRef db, util::Optional<std::st
// - unable to write the fresh copy to the file system
// - during download of the fresh copy, the fresh copy itself is reset
// - in FLX mode there was a problem fulfilling the previously active subscription
if (error_message) {
if (!status.is_ok()) {
if (status == ErrorCodes::OperationAborted) {
return;
}
lock.unlock();
if (m_flx_subscription_store) {
// In DiscardLocal mode, only the active subscription set is preserved
Expand All @@ -508,12 +517,13 @@ void SyncSession::handle_fresh_realm_downloaded(DBRef db, util::Optional<std::st
auto mut_sub = m_flx_subscription_store->get_active().make_mutable_copy();
m_flx_subscription_store->supercede_all_except(mut_sub);
mut_sub.update_state(sync::SubscriptionSet::State::Error,
util::make_optional<std::string_view>(*error_message));
util::make_optional<std::string_view>(status.reason()));
std::move(mut_sub).commit();
}
const bool is_fatal = true;
SyncError synthetic(make_error_code(sync::Client::Error::auto_client_reset_failure),
util::format("A fatal error occured during client reset: '%1'", error_message), is_fatal);
util::format("A fatal error occured during client reset: '%1'", status.reason()),
is_fatal);
handle_error(synthetic);
return;
}
Expand Down
2 changes: 1 addition & 1 deletion src/realm/object-store/sync/sync_session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {

void download_fresh_realm(sync::ProtocolErrorInfo::Action server_requests_action)
REQUIRES(!m_config_mutex, !m_state_mutex, !m_connection_state_mutex);
void handle_fresh_realm_downloaded(DBRef db, util::Optional<std::string> error_message,
void handle_fresh_realm_downloaded(DBRef db, Status status,
sync::ProtocolErrorInfo::Action server_requests_action)
REQUIRES(!m_state_mutex, !m_config_mutex, !m_connection_state_mutex);
void handle_error(SyncError) REQUIRES(!m_state_mutex, !m_config_mutex, !m_connection_state_mutex);
Expand Down
95 changes: 95 additions & 0 deletions test/object-store/sync/client_reset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,101 @@ TableRef get_table(Realm& realm, StringData object_type)
namespace cf = realm::collection_fixtures;
using reset_utils::create_object;

TEST_CASE("sync: large reset with recovery is restartable", "[client reset]") {
const reset_utils::Partition partition{"realm_id", random_string(20)};
Property partition_prop = {partition.property_name, PropertyType::String | PropertyType::Nullable};
Schema schema{
{"object",
{
{"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
{"value", PropertyType::String},
partition_prop,
}},
};

std::string base_url = get_base_url();
REQUIRE(!base_url.empty());
auto server_app_config = minimal_app_config(base_url, "client_reset_tests", schema);
server_app_config.partition_key = partition_prop;
TestAppSession test_app_session(create_app(server_app_config));
auto app = test_app_session.app();

create_user_and_log_in(app);
SyncTestFile realm_config(app->current_user(), partition.value, schema);
realm_config.sync_config->client_resync_mode = ClientResyncMode::Recover;
realm_config.sync_config->error_handler = [&](std::shared_ptr<SyncSession>, SyncError err) {
if (err.error_code == util::make_error_code(util::MiscExtErrors::end_of_input)) {
return;
}

if (err.server_requests_action == sync::ProtocolErrorInfo::Action::Warning ||
err.server_requests_action == sync::ProtocolErrorInfo::Action::Transient) {
return;
}

FAIL(util::format("got error from server: %1: %2", err.error_code.value(), err.message));
};

auto realm = Realm::get_shared_realm(realm_config);
std::vector<ObjectId> expected_obj_ids;
{
auto obj_id = ObjectId::gen();
expected_obj_ids.push_back(obj_id);
realm->begin_transaction();
CppContext c(realm);
Object::create(c, realm, "object",
std::any(AnyDict{{"_id", obj_id},
{"value", std::string{"hello world"}},
{partition.property_name, partition.value}}));
realm->commit_transaction();
wait_for_upload(*realm);
reset_utils::wait_for_object_to_persist_to_atlas(app->current_user(), test_app_session.app_session(),
"object", {{"_id", obj_id}});
realm->sync_session()->pause();
}

reset_utils::trigger_client_reset(test_app_session.app_session());
{
SyncTestFile realm_config(app->current_user(), partition.value, schema);
auto second_realm = Realm::get_shared_realm(realm_config);

second_realm->begin_transaction();
CppContext c(second_realm);
for (size_t i = 0; i < 100; ++i) {
auto obj_id = ObjectId::gen();
expected_obj_ids.push_back(obj_id);
Object::create(c, second_realm, "object",
std::any(AnyDict{{"_id", obj_id},
{"value", random_string(1024 * 128)},
{partition.property_name, partition.value}}));
}
second_realm->commit_transaction();

wait_for_upload(*second_realm);
}

realm->sync_session()->resume();
timed_wait_for([&] {
return util::File::exists(_impl::ClientResetOperation::get_fresh_path_for(realm_config.path));
});
realm->sync_session()->pause();
realm->sync_session()->resume();
wait_for_upload(*realm);
wait_for_download(*realm);

realm->refresh();
auto table = realm->read_group().get_table("class_object");
REQUIRE(table->size() == expected_obj_ids.size());
std::vector<ObjectId> found_object_ids;
for (const auto& obj : *table) {
found_object_ids.push_back(obj.get_primary_key().get_object_id());
}

std::stable_sort(expected_obj_ids.begin(), expected_obj_ids.end());
std::stable_sort(found_object_ids.begin(), found_object_ids.end());
REQUIRE(expected_obj_ids == found_object_ids);
}

TEST_CASE("sync: pending client resets are cleared when downloads are complete", "[client reset]") {
const reset_utils::Partition partition{"realm_id", random_string(20)};
Property partition_prop = {partition.property_name, PropertyType::String | PropertyType::Nullable};
Expand Down

0 comments on commit f5d2ff2

Please sign in to comment.