Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update BindingCallbackThreadObserver to be one per sync client #6156

Merged
merged 31 commits into from
Feb 24, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
9297774
Added test and updated binding callback c_api - updated commented out…
Jan 20, 2023
7c95e91
REQUIRE() not REALM_ASSERT()
Jan 26, 2023
f60cc60
Update/Fix changelog
Jan 26, 2023
019fe2a
clang format
Jan 26, 2023
1b69942
Allow nullptr userdata
Jan 26, 2023
bfdcceb
Merge branch 'master' of github.com:realm/realm-core into mwb/event-l…
Jan 28, 2023
c6e7b59
Revert changelog
Jan 28, 2023
3261f7f
Reworked event loop in test to fix TSAN failure
Jan 28, 2023
2bbd6e4
Reworked binding callback thread observer to fix TSAN issues
Jan 29, 2023
d649105
Include mutex header file
Jan 29, 2023
5ac0e3e
Updated changelog and renamed func name
Jan 29, 2023
80e09e9
Merge branch 'master' of github.com:realm/realm-core into mwb/event-l…
Jan 30, 2023
24b2d08
Updates from review
Jan 30, 2023
6c648c8
Merge branch 'master' of github.com:realm/realm-core into mwb/event-l…
Jan 31, 2023
868f0f9
Updated test to remove operator==
Feb 1, 2023
147b6f2
Merge branch 'master' of github.com:realm/realm-core into mwb/event-l…
Feb 2, 2023
a8ea573
Merge branch 'master' of github.com:realm/realm-core into mwb/event-l…
Feb 6, 2023
f890ed1
Fix compile
Feb 6, 2023
b9a1cf0
Removed global binding callback thread observer
Feb 21, 2023
555e12d
Merge branch 'master' of github.com:realm/realm-core into mwb/event-l…
Feb 22, 2023
0a9812a
Merge branch 'master' of github.com:realm/realm-core into mwb/event-l…
Feb 22, 2023
4567abd
sync tests are no longer hanging
Feb 22, 2023
6d2f826
Merge branch 'master' of github.com:realm/realm-core into mwb/event-l…
Feb 24, 2023
a2c1241
Reverted sync client changes and disabled MultipleSyncAgents test
Feb 24, 2023
d8cb9d0
Removed service.reset()
Feb 24, 2023
b73d863
Merge branch 'master' of github.com:realm/realm-core into mwb/event-l…
Feb 24, 2023
b972b54
Updated changelog after merging release
Feb 24, 2023
047a9b6
Cleaned up binding callback observer interface and c_api
Feb 24, 2023
4f39bd2
Additional thread observer cleanup
Feb 24, 2023
0e92873
removed unused header files
Feb 24, 2023
552a97c
Found one more unused header file
Feb 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
* Client reset recovery froze Realms for the callbacks in an invalid way. It is unclear if this resulted in any actual problems.
* Fix default enabled debug output during realm-sync-tests ([#6233](https://github.com/realm/realm-core/issues/6233))
* Update ClientImpl::Connection and DefaultWebSocketImpl to use the new WebSocketObserver callbacks ([PR #6219](https://github.com/realm/realm-core/pull/6219))
* Migrate service and event loop into DefaultSyncSocket ([PR #6151](https://github.com/realm/realm-core/pull/6151))
* Move BindingCallbackThreadObserver from object-store to sync ([PR #6151](https://github.com/realm/realm-core/pull/6151))
* Add Binding Callback Thread Observer CAPI Test and updated Sync_MultipleSyncAgentsNotAllowed test ([PR #6156](https://github.com/realm/realm-core/pull/6156))

----------------------------------------------

Expand Down Expand Up @@ -80,8 +83,6 @@
* Add c_api error category for resolve errors instead of reporting unknown category. ([PR #6157](https://github.com/realm/realm-core/pull/6157))
* Add permanent redirect (308) as a supported redirect response from the server. ([#6162](https://github.com/realm/realm-core/issues/6162))
* Integrate DefaultSocketProvider as SyncSocketProvider in sync client. ([PR #6171](https://github.com/realm/realm-core/pull/6171))
* Migrate service and event loop into DefaultSyncSocket ([PR #6151](https://github.com/realm/realm-core/pull/6151))
* Move BindingCallbackThreadObserver from object-store to sync ([PR #6151](https://github.com/realm/realm-core/pull/6151))

----------------------------------------------

Expand Down
9 changes: 6 additions & 3 deletions src/realm.h
Original file line number Diff line number Diff line change
Expand Up @@ -4069,17 +4069,20 @@ typedef struct realm_thread_observer_token realm_thread_observer_token_t;

/**
* Register a callback handler for bindings interested in registering callbacks before/after the ObjectStore thread
* runs.
* runs. There can only be one callback handler registered at a time. Call realm_release() on the returned token
* pointer before assigning a new set of callback handlers.
* @param on_thread_create callback invoked when the object store thread is created
* @param on_thread_destroy callback invoked when the object store thread is destroyed
* @param on_error callback invoked to signal to the listener that some error has occured.
* @param user_data pointer to user defined data that is provided to each of the callback functions
* @param free_userdata callback invoked when the user_data is to be freed
* @return a token that has to be released in order to stop receiving notifications
*/
RLM_API realm_thread_observer_token_t*
realm_set_binding_callback_thread_observer(realm_on_object_store_thread_callback_t on_thread_create,
realm_on_object_store_thread_callback_t on_thread_destroy,
realm_on_object_store_error_callback_t on_error, realm_userdata_t,
realm_free_userdata_func_t free_userdata);
realm_on_object_store_error_callback_t on_error,
realm_userdata_t user_data, realm_free_userdata_func_t free_userdata);

#endif // REALM_ENABLE_SYNC

Expand Down
21 changes: 2 additions & 19 deletions src/realm/object-store/c_api/realm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ realm_refresh_callback_token::~realm_refresh_callback_token()
#if REALM_ENABLE_SYNC
realm_thread_observer_token::~realm_thread_observer_token()
{
realm::g_binding_callback_thread_observer = nullptr;
realm::c_api::CBindingThreadObserver::reset();
}
#endif // REALM_ENABLE_SYNC

Expand Down Expand Up @@ -365,24 +365,7 @@ realm_set_binding_callback_thread_observer(realm_on_object_store_thread_callback
realm_on_object_store_error_callback_t on_error, realm_userdata_t userdata,
realm_free_userdata_func_t free_userdata)
{
realm::c_api::CBindingThreadObserver::ThreadCallback thread_create =
[on_thread_create, userdata = UserdataPtr{userdata, free_userdata}]() {
on_thread_create(userdata.get());
};

realm::c_api::CBindingThreadObserver::ThreadCallback thread_destroyed =
[on_thread_destroy, userdata = UserdataPtr{userdata, free_userdata}]() {
on_thread_destroy(userdata.get());
};

realm::c_api::CBindingThreadObserver::ErrorCallback error =
[on_error, userdata = UserdataPtr{userdata, free_userdata}](const char* error) {
on_error(userdata.get(), error);
};

auto& instance = realm::c_api::CBindingThreadObserver::create();
instance.set(std::move(thread_create), std::move(thread_destroyed), std::move(error));
g_binding_callback_thread_observer = &instance;
realm::c_api::CBindingThreadObserver::set(on_thread_create, on_thread_destroy, on_error, userdata, free_userdata);
return new realm_thread_observer_token_t();
}

Expand Down
46 changes: 30 additions & 16 deletions src/realm/object-store/c_api/realm.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,45 +71,59 @@ class CBindingContext : public BindingContext {

class CBindingThreadObserver : public realm::BindingCallbackThreadObserver {
public:
using ThreadCallback = util::UniqueFunction<void()>;
using ErrorCallback = util::UniqueFunction<void(const char*)>;

static CBindingThreadObserver& create()
static void set(realm_on_object_store_thread_callback_t on_thread_create,
realm_on_object_store_thread_callback_t on_thread_destroy,
realm_on_object_store_error_callback_t on_error, realm_userdata_t userdata,
realm_free_userdata_func_t free_userdata)
{
static CBindingThreadObserver instance;
return instance;
auto& observer = CBindingThreadObserver::get_instance();
observer.m_create_callback = on_thread_create;
observer.m_destroy_callback = on_thread_destroy;
observer.m_error_callback = on_error;
observer.m_user_data = UserdataPtr(userdata, free_userdata);
g_binding_callback_thread_observer = &observer;
}

void set(ThreadCallback&& on_create, ThreadCallback&& on_destroy, ErrorCallback&& on_error)
static void reset()
{
m_create_callback = std::move(on_create);
m_destroy_callback = std::move(on_destroy);
m_error_callback = std::move(on_error);
g_binding_callback_thread_observer = nullptr;
auto& observer = CBindingThreadObserver::get_instance();
observer.m_create_callback = nullptr;
observer.m_destroy_callback = nullptr;
observer.m_error_callback = nullptr;
observer.m_user_data.reset();
}

void did_create_thread() override
{
if (m_create_callback)
m_create_callback();
m_create_callback(m_user_data.get());
}

void will_destroy_thread() override
{
if (m_destroy_callback)
m_destroy_callback();
m_destroy_callback(m_user_data.get());
}

void handle_error(std::exception const& e) override
{
if (m_error_callback)
m_error_callback(e.what());
m_error_callback(m_user_data.get(), e.what());
}

private:
static CBindingThreadObserver& get_instance()
{
static CBindingThreadObserver instance;
return instance;
}

CBindingThreadObserver() = default;
ThreadCallback m_create_callback;
ThreadCallback m_destroy_callback;
ErrorCallback m_error_callback;
realm_on_object_store_thread_callback_t m_create_callback = nullptr;
realm_on_object_store_thread_callback_t m_destroy_callback = nullptr;
realm_on_object_store_error_callback_t m_error_callback = nullptr;
UserdataPtr m_user_data;
};

#endif // REALM_ENABLE_SYNC
Expand Down
2 changes: 2 additions & 0 deletions src/realm/sync/binding_callback_thread_observer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ namespace realm {
// This is for example helpful to attach/detach the pthread to the JavaVM in order to be able to perform JNI calls.
class BindingCallbackThreadObserver {
public:
virtual ~BindingCallbackThreadObserver() = default;

// This method is called just before the thread is started
virtual void did_create_thread() = 0;

Expand Down
68 changes: 68 additions & 0 deletions test/object-store/c_api/c_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <realm/object-store/c_api/types.hpp>
#include <realm/object-store/c_api/conversion.hpp>
#include <realm/object-store/sync/generic_network_transport.hpp>
#include <realm/sync/binding_callback_thread_observer.hpp>
#include <realm/util/base64.hpp>

#include "sync/flx_sync_harness.hpp"
Expand Down Expand Up @@ -4910,6 +4911,73 @@ TEST_CASE("C API - async_open", "[c_api][sync]") {
realm_release(sync_config);
}
}

struct BCTOState {
static bool bcto_deleted;
bool thread_create_called = false;
bool thread_destroy_called = false;
std::string thread_on_error_message;
std::string id = "BTCO-STATE";
};

bool BCTOState::bcto_deleted = false;

TEST_CASE("C API - binding callback thread observer") {
auto bcto_user_data = new BCTOState();
BCTOState::bcto_deleted = false;

auto bcto_free_userdata = [](realm_userdata_t userdata) {
REQUIRE(userdata);
REQUIRE(BCTOState::bcto_deleted == false);
auto user_data = static_cast<BCTOState*>(userdata);
REQUIRE((user_data && user_data->id == "BTCO-STATE"));
user_data->id.clear();
delete user_data;
BCTOState::bcto_deleted = true;
};

auto bcto_on_thread_create = [](realm_userdata_t userdata) {
REQUIRE(userdata);
REQUIRE(BCTOState::bcto_deleted == false);
auto user_data = static_cast<BCTOState*>(userdata);
REQUIRE((user_data && user_data->id == "BTCO-STATE"));
user_data->thread_create_called = true;
};

auto bcto_on_thread_destroy = [](realm_userdata_t userdata) {
REQUIRE(userdata);
REQUIRE(BCTOState::bcto_deleted == false);
auto user_data = static_cast<BCTOState*>(userdata);
REQUIRE((user_data && user_data->id == "BTCO-STATE"));
user_data->thread_destroy_called = true;
};

auto bcto_on_thread_error = [](realm_userdata_t userdata, const char* err_message) {
REQUIRE(userdata);
REALM_ASSERT(err_message);
REQUIRE(BCTOState::bcto_deleted == false);
auto user_data = static_cast<BCTOState*>(userdata);
REQUIRE((user_data && user_data->id == "BTCO-STATE"));
user_data->thread_on_error_message = err_message;
};

auto bcto_token = realm_set_binding_callback_thread_observer(
bcto_on_thread_create, bcto_on_thread_destroy, bcto_on_thread_error,
static_cast<realm_userdata_t>(bcto_user_data), bcto_free_userdata);
REQUIRE(bcto_token);
REQUIRE(g_binding_callback_thread_observer);
g_binding_callback_thread_observer->did_create_thread();
REQUIRE(bcto_user_data->thread_create_called);
g_binding_callback_thread_observer->handle_error(MultipleSyncAgents());
REQUIRE(bcto_user_data->thread_on_error_message.find("Multiple sync agents attempted to join the same session") !=
std::string::npos);
g_binding_callback_thread_observer->will_destroy_thread();
REQUIRE(bcto_user_data->thread_destroy_called);

realm_release(static_cast<void*>(bcto_token));
REQUIRE(BCTOState::bcto_deleted == true);
REQUIRE(g_binding_callback_thread_observer == nullptr);
}
#endif

#ifdef REALM_ENABLE_AUTH_TESTS
Expand Down
87 changes: 74 additions & 13 deletions test/test_sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@
#include <mutex>
#include <condition_variable>
#include <thread>
#include <chrono>

#include <realm.hpp>
#include <realm/chunked_binary.hpp>
#include <realm/data_type.hpp>
#include <realm/history.hpp>
#include <realm/impl/simulated_failure.hpp>
#include <realm/list.hpp>
#include <realm/sync/binding_callback_thread_observer.hpp>
#include <realm/sync/changeset.hpp>
#include <realm/sync/changeset_encoder.hpp>
#include <realm/sync/client.hpp>
Expand Down Expand Up @@ -3732,8 +3734,6 @@ TEST(Sync_UploadDownloadProgress_6)
// The check is that we reach this point without deadlocking.
}

// Event Loop TODO: re-enable test once errors are propogating
#if 0
TEST(Sync_MultipleSyncAgentsNotAllowed)
{
// At most one sync agent is allowed to participate in a Realm file access
Expand All @@ -3743,18 +3743,79 @@ TEST(Sync_MultipleSyncAgentsNotAllowed)
// particular session participant over the "temporally overlapping access"
// relation.

TEST_CLIENT_DB(db);
Client::Config config;
config.logger = test_context.logger;
config.reconnect_mode = ReconnectMode::testing;
Client client{config};
Session session_1{client, db, nullptr};
Session session_2{client, db, nullptr};
session_1.bind("realm://foo/bar", "blablabla");
session_2.bind("realm://foo/bar", "blablabla");
CHECK_THROW(client.run(), MultipleSyncAgents);
struct TestThreadObserver : public BindingCallbackThreadObserver {
// This method is called just before the thread is started
void did_create_thread() final
{
create_thread_called = true;
}

// This method is called just before the thread is being destroyed
void will_destroy_thread() final
{
destroy_thread_called = true;
}

// This method is called with any exception thrown by client.run().
void handle_error(std::exception const& e) final
{
std::lock_guard<std::mutex> lock(m_error_mutex);
error_message = e.what(); // save a copy of the exception
error_cv.notify_all();
}

// Wait up to timeout seconds for the exception passed to handle_error to be
// thrown by this function. Returns false if exception was not thrown before timeout.
bool wait_for_error(std::chrono::seconds timeout)
{
std::unique_lock<std::mutex> lock(m_error_mutex);
// Skip the wait if the exception has already been processed
if (error_message.empty()) {
if (!error_cv.wait_for(lock, timeout, [this]() {
return !error_message.empty();
})) {
return false; // wait timed out - exception did not occur
}
}
return true;
}

// Return the captured exception or an empty std::exception if none captured
const std::string& get_error()
{
std::lock_guard<std::mutex> lock(m_error_mutex);
return error_message;
}

bool create_thread_called = false;
bool destroy_thread_called = false;
std::string error_message;
std::mutex m_error_mutex;
std::condition_variable error_cv;
};

auto callback_observer = std::make_unique<TestThreadObserver>();
g_binding_callback_thread_observer = callback_observer.get();
{
using namespace std::literals::chrono_literals;
TEST_CLIENT_DB(db);
Client::Config config;
config.logger = test_context.logger;
config.reconnect_mode = ReconnectMode::testing;
Client client{config};
Session session_1{client, db, nullptr};
Session session_2{client, db, nullptr};
session_1.bind("realm://foo/bar", "blablabla");
session_2.bind("realm://foo/bar", "blablabla");
// Wait up to 10 seconds for the exception to be thrown by event loop
CHECK(callback_observer->wait_for_error(10s));
CHECK_STRING_CONTAINS(callback_observer->get_error(),
"Multiple sync agents attempted to join the same session");
}
CHECK(callback_observer->create_thread_called);
CHECK(callback_observer->destroy_thread_called);
g_binding_callback_thread_observer = nullptr;
}
#endif

TEST(Sync_CancelReconnectDelay)
{
Expand Down