Skip to content

Commit

Permalink
Convert the remaining tests over to the client reset endpoint
Browse files Browse the repository at this point in the history
These tests were a giant race condition that only worked with fairly specific
timing from the server, and in practice didn't actually test the situation they
were intending to test.
  • Loading branch information
tgoyne committed Nov 29, 2023
1 parent e28f7d3 commit 950ec59
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 124 deletions.
2 changes: 1 addition & 1 deletion src/realm/object-store/sync/sync_session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
return session.send_test_command(std::move(request));
}

static sync::SaltedFileIdent get_file_ident(SyncSession& session)
static sync::SaltedFileIdent get_file_ident(const SyncSession& session)
{
return session.get_file_ident();
}
Expand Down
85 changes: 38 additions & 47 deletions test/object-store/sync/client_reset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1866,73 +1866,64 @@ TEST_CASE("sync: Client reset during async open", "[sync][pbs][client reset][baa
TestAppSession test_app_session(create_app(server_app_config));
auto app = test_app_session.app();

auto before_callback_called = util::make_promise_future<void>();
auto after_callback_called = util::make_promise_future<void>();
create_user_and_log_in(app);
SyncTestFile realm_config(app->current_user(), partition.value, std::nullopt,
[](std::shared_ptr<SyncSession>, SyncError) { /*noop*/ });
realm_config.sync_config->client_resync_mode = ClientResyncMode::Recover;

realm_config.sync_config->on_sync_client_event_hook =
[&, client_reset_triggered = false](std::weak_ptr<SyncSession> weak_sess,
const SyncClientHookData& event_data) mutable {
auto sess = weak_sess.lock();
if (!sess) {
return SyncClientHookAction::NoAction;
}
if (sess->path() != realm_config.path) {
return SyncClientHookAction::NoAction;
}
bool client_reset_triggered = false;
realm_config.sync_config->on_sync_client_event_hook = [&](std::weak_ptr<SyncSession> weak_sess,
const SyncClientHookData& event_data) mutable {
auto sess = weak_sess.lock();
if (!sess) {
return SyncClientHookAction::NoAction;
}
if (sess->path() != realm_config.path) {
return SyncClientHookAction::NoAction;
}

if (event_data.event != SyncClientHookEvent::DownloadMessageReceived) {
return SyncClientHookAction::NoAction;
}
if (event_data.event != SyncClientHookEvent::DownloadMessageReceived) {
return SyncClientHookAction::NoAction;
}

if (client_reset_triggered) {
return SyncClientHookAction::NoAction;
}
client_reset_triggered = true;
reset_utils::trigger_client_reset(test_app_session.app_session());
return SyncClientHookAction::EarlyReturn;
};
if (client_reset_triggered) {
return SyncClientHookAction::NoAction;
}
client_reset_triggered = true;
reset_utils::trigger_client_reset(test_app_session.app_session(), *sess);
return SyncClientHookAction::SuspendWithRetryableError;
};

// Expected behaviour is that the frozen realm passed in the callback should have no
// schema initialized if a client reset happens during an async open and the realm has never been opened before.
// SDK's should handle any edge cases which require the use of a schema i.e
// calling set_schema_subset(...)
realm_config.sync_config->notify_before_client_reset =
[promise = util::CopyablePromiseHolder(std::move(before_callback_called.promise))](
std::shared_ptr<Realm> realm) mutable {
CHECK(realm->schema_version() == ObjectStore::NotVersioned);
promise.get_promise().emplace_value();
};
auto before_callback_called = util::make_promise_future<void>();
realm_config.sync_config->notify_before_client_reset = [&](std::shared_ptr<Realm> realm) {
CHECK(realm->schema_version() == ObjectStore::NotVersioned);
before_callback_called.promise.emplace_value();
};

realm_config.sync_config->notify_after_client_reset =
[promise = util::CopyablePromiseHolder(std::move(after_callback_called.promise))](
std::shared_ptr<Realm> realm, ThreadSafeReference, bool) mutable {
CHECK(realm->schema_version() == ObjectStore::NotVersioned);
promise.get_promise().emplace_value();
};
auto after_callback_called = util::make_promise_future<void>();
realm_config.sync_config->notify_after_client_reset = [&](std::shared_ptr<Realm> realm, ThreadSafeReference,
bool) {
CHECK(realm->schema_version() == ObjectStore::NotVersioned);
after_callback_called.promise.emplace_value();
};

auto realm_task = Realm::get_synchronized_realm(realm_config);
auto realm_pf = util::make_promise_future<SharedRealm>();
realm_task->start([promise_holder = util::CopyablePromiseHolder(std::move(realm_pf.promise))](
ThreadSafeReference ref, std::exception_ptr ex) mutable {
auto promise = promise_holder.get_promise();
if (ex) {
try {
realm_task->start([&](ThreadSafeReference ref, std::exception_ptr ex) {
try {
if (ex) {
std::rethrow_exception(ex);
}
catch (...) {
promise.set_error(exception_to_status());
}
return;
auto realm = Realm::get_shared_realm(std::move(ref));
realm_pf.promise.emplace_value(std::move(realm));
}
auto realm = Realm::get_shared_realm(std::move(ref));
if (!realm) {
promise.set_error({ErrorCodes::RuntimeError, "could not get realm from threadsaferef"});
catch (...) {
realm_pf.promise.set_error(exception_to_status());
}
promise.emplace_value(std::move(realm));
});
auto realm = realm_pf.future.get();
before_callback_called.future.get();
Expand Down
86 changes: 39 additions & 47 deletions test/object-store/sync/flx_sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4306,67 +4306,59 @@ TEST_CASE("flx sync: Client reset during async open", "[sync][flx][client reset]
mutable_subscription.commit();
};

auto before_callback_called = util::make_promise_future<void>();
auto after_callback_called = util::make_promise_future<void>();
realm_config.sync_config->client_resync_mode = ClientResyncMode::Recover;
realm_config.sync_config->subscription_initializer = subscription_callback;

realm_config.sync_config->on_sync_client_event_hook =
[&, client_reset_triggered = false](std::weak_ptr<SyncSession> weak_sess,
const SyncClientHookData& event_data) mutable {
auto sess = weak_sess.lock();
if (!sess) {
return SyncClientHookAction::NoAction;
}
if (sess->path() != realm_config.path) {
return SyncClientHookAction::NoAction;
}
bool client_reset_triggered = false;
realm_config.sync_config->on_sync_client_event_hook = [&](std::weak_ptr<SyncSession> weak_sess,
const SyncClientHookData& event_data) {
auto sess = weak_sess.lock();
if (!sess) {
return SyncClientHookAction::NoAction;
}
if (sess->path() != realm_config.path) {
return SyncClientHookAction::NoAction;
}

if (event_data.event != SyncClientHookEvent::DownloadMessageReceived) {
return SyncClientHookAction::NoAction;
}
if (event_data.event != SyncClientHookEvent::DownloadMessageReceived) {
return SyncClientHookAction::NoAction;
}

if (client_reset_triggered) {
return SyncClientHookAction::NoAction;
}
client_reset_triggered = true;
reset_utils::trigger_client_reset(harness.session().app_session());
return SyncClientHookAction::EarlyReturn;
};
if (client_reset_triggered) {
return SyncClientHookAction::NoAction;
}

realm_config.sync_config->notify_before_client_reset =
[promise = util::CopyablePromiseHolder(std::move(before_callback_called.promise))](
std::shared_ptr<Realm> realm) mutable {
CHECK(realm->schema_version() == 1);
promise.get_promise().emplace_value();
};
client_reset_triggered = true;
reset_utils::trigger_client_reset(harness.session().app_session(), *sess);
return SyncClientHookAction::SuspendWithRetryableError;
};

realm_config.sync_config->notify_after_client_reset =
[promise = util::CopyablePromiseHolder(std::move(after_callback_called.promise))](
std::shared_ptr<Realm> realm, ThreadSafeReference, bool) mutable {
CHECK(realm->schema_version() == 1);
promise.get_promise().emplace_value();
};
auto before_callback_called = util::make_promise_future<void>();
realm_config.sync_config->notify_before_client_reset = [&](std::shared_ptr<Realm> realm) {
CHECK(realm->schema_version() == 1);
before_callback_called.promise.emplace_value();
};

auto after_callback_called = util::make_promise_future<void>();
realm_config.sync_config->notify_after_client_reset = [&](std::shared_ptr<Realm> realm, ThreadSafeReference,
bool) {
CHECK(realm->schema_version() == 1);
after_callback_called.promise.emplace_value();
};

auto realm_task = Realm::get_synchronized_realm(realm_config);
auto realm_pf = util::make_promise_future<SharedRealm>();
realm_task->start([promise_holder = util::CopyablePromiseHolder(std::move(realm_pf.promise))](
ThreadSafeReference ref, std::exception_ptr ex) mutable {
auto promise = promise_holder.get_promise();
if (ex) {
try {
realm_task->start([&](ThreadSafeReference ref, std::exception_ptr ex) {
auto& promise = realm_pf.promise;
try {
if (ex) {
std::rethrow_exception(ex);
}
catch (...) {
promise.set_error(exception_to_status());
}
return;
promise.emplace_value(Realm::get_shared_realm(std::move(ref)));
}
auto realm = Realm::get_shared_realm(std::move(ref));
if (!realm) {
promise.set_error({ErrorCodes::RuntimeError, "could not get realm from threadsaferef"});
catch (...) {
promise.set_error(exception_to_status());
}
promise.emplace_value(std::move(realm));
});
auto realm = realm_pf.future.get();
before_callback_called.future.get();
Expand Down
6 changes: 6 additions & 0 deletions test/object-store/util/sync/baas_admin_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1177,6 +1177,12 @@ AppSession create_app(const AppCreateConfig& config)
{"version", 1},
});

// Wait for initial sync to complete, as connecting while this is happening
// causes various problems
timed_sleeping_wait_for([&] {
return session.is_initial_sync_complete(app_id);
});

return {client_app_id, app_id, session, config};
}

Expand Down
33 changes: 5 additions & 28 deletions test/object-store/util/sync/sync_test_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -507,39 +507,16 @@ void wait_for_num_objects_in_atlas(std::shared_ptr<SyncUser> user, const AppSess
std::chrono::minutes(15), std::chrono::milliseconds(500));
}

void trigger_client_reset(const AppSession& app_session)
void trigger_client_reset(const AppSession& app_session, const SyncSession& sync_session)
{
// cause a client reset by restarting the sync service
// this causes the server's sync history to be resynthesized
auto baas_sync_service = app_session.admin_api.get_sync_service(app_session.server_app_id);
auto baas_sync_config = app_session.admin_api.get_config(app_session.server_app_id, baas_sync_service);

REQUIRE(app_session.admin_api.is_sync_enabled(app_session.server_app_id));
app_session.admin_api.disable_sync(app_session.server_app_id, baas_sync_service.id, baas_sync_config);
timed_sleeping_wait_for([&] {
return app_session.admin_api.is_sync_terminated(app_session.server_app_id);
});
app_session.admin_api.enable_sync(app_session.server_app_id, baas_sync_service.id, baas_sync_config);
REQUIRE(app_session.admin_api.is_sync_enabled(app_session.server_app_id));
if (app_session.config.dev_mode_enabled) { // dev mode is not sticky across a reset
app_session.admin_api.set_development_mode_to(app_session.server_app_id, true);
}

// In FLX sync, the server won't let you connect until the initial sync is complete. With PBS tho, we need
// to make sure we've actually copied all the data from atlas into the realm history before we do any of
// our remote changes.
if (!app_session.config.flx_sync_config) {
timed_sleeping_wait_for([&] {
return app_session.admin_api.is_initial_sync_complete(app_session.server_app_id);
});
}
auto file_ident = SyncSession::OnlyForTesting::get_file_ident(sync_session);
REQUIRE(file_ident.ident != 0);
app_session.admin_api.trigger_client_reset(app_session.server_app_id, file_ident.ident);
}

void trigger_client_reset(const AppSession& app_session, const SharedRealm& realm)
{
auto file_ident = SyncSession::OnlyForTesting::get_file_ident(*realm->sync_session());
REQUIRE(file_ident.ident != 0);
app_session.admin_api.trigger_client_reset(app_session.server_app_id, file_ident.ident);
trigger_client_reset(app_session, *realm->sync_session());
}

struct BaasClientReset : public TestClientReset {
Expand Down
2 changes: 1 addition & 1 deletion test/object-store/util/sync/sync_test_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ void wait_for_object_to_persist_to_atlas(std::shared_ptr<SyncUser> user, const A
void wait_for_num_objects_in_atlas(std::shared_ptr<SyncUser> user, const AppSession& app_session,
const std::string& schema_name, size_t expected_size);

void trigger_client_reset(const AppSession& app_session);
void trigger_client_reset(const AppSession& app_session, const SyncSession& sync_session);
void trigger_client_reset(const AppSession& app_session, const SharedRealm& realm);
#endif // REALM_ENABLE_AUTH_TESTS

Expand Down

0 comments on commit 950ec59

Please sign in to comment.