Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update BindingCallbackThreadObserver to be one per sync client #6156

Merged
merged 31 commits into from
Feb 24, 2023
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
9297774
Added test and updated binding callback c_api - updated commented out…
Jan 20, 2023
7c95e91
REQUIRE() not REALM_ASSERT()
Jan 26, 2023
f60cc60
Update/Fix changelog
Jan 26, 2023
019fe2a
clang format
Jan 26, 2023
1b69942
Allow nullptr userdata
Jan 26, 2023
bfdcceb
Merge branch 'master' of github.com:realm/realm-core into mwb/event-l…
Jan 28, 2023
c6e7b59
Revert changelog
Jan 28, 2023
3261f7f
Reworked event loop in test to fix TSAN failure
Jan 28, 2023
2bbd6e4
Reworked binding callback thread observer to fix TSAN issues
Jan 29, 2023
d649105
Include mutex header file
Jan 29, 2023
5ac0e3e
Updated changelog and renamed func name
Jan 29, 2023
80e09e9
Merge branch 'master' of github.com:realm/realm-core into mwb/event-l…
Jan 30, 2023
24b2d08
Updates from review
Jan 30, 2023
6c648c8
Merge branch 'master' of github.com:realm/realm-core into mwb/event-l…
Jan 31, 2023
868f0f9
Updated test to remove operator==
Feb 1, 2023
147b6f2
Merge branch 'master' of github.com:realm/realm-core into mwb/event-l…
Feb 2, 2023
a8ea573
Merge branch 'master' of github.com:realm/realm-core into mwb/event-l…
Feb 6, 2023
f890ed1
Fix compile
Feb 6, 2023
b9a1cf0
Removed global binding callback thread observer
Feb 21, 2023
555e12d
Merge branch 'master' of github.com:realm/realm-core into mwb/event-l…
Feb 22, 2023
0a9812a
Merge branch 'master' of github.com:realm/realm-core into mwb/event-l…
Feb 22, 2023
4567abd
sync tests are no longer hanging
Feb 22, 2023
6d2f826
Merge branch 'master' of github.com:realm/realm-core into mwb/event-l…
Feb 24, 2023
a2c1241
Reverted sync client changes and disabled MultipleSyncAgents test
Feb 24, 2023
d8cb9d0
Removed service.reset()
Feb 24, 2023
b73d863
Merge branch 'master' of github.com:realm/realm-core into mwb/event-l…
Feb 24, 2023
b972b54
Updated changelog after merging release
Feb 24, 2023
047a9b6
Cleaned up binding callback observer interface and c_api
Feb 24, 2023
4f39bd2
Additional thread observer cleanup
Feb 24, 2023
0e92873
removed unused header files
Feb 24, 2023
552a97c
Found one more unused header file
Feb 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,22 @@

### Enhancements
* <New feature description> (PR [#????](https://github.com/realm/realm-core/pull/????))
* None.
* Add per app support for BindingCallbackThreadObserver ([#6250](https://github.com/realm/realm-core/issues/6250))

### Fixed
* <How do the end-user experience this issue? what was the impact?> ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?)
* None.

### Breaking changes
* None.
* BindingCallbackThreadObserver interface was updated to be part of SyncClientConfig and global instance was removed. ([PR #6156](https://github.com/realm/realm-core/pull/6156))

### Compatibility
* Fileformat: Generates files with format v23. Reads and automatically upgrade from fileformat v5.

-----------

### Internals
* None.
* Add CAPI test for Binding Callback Thread Observer. ([PR #6156](https://github.com/realm/realm-core/pull/6156))

----------------------------------------------

Expand Down
19 changes: 10 additions & 9 deletions src/realm.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ typedef struct realm_thread_safe_reference realm_thread_safe_reference_t;
typedef void (*realm_free_userdata_func_t)(realm_userdata_t userdata);
typedef realm_userdata_t (*realm_clone_userdata_func_t)(const realm_userdata_t userdata);
typedef void (*realm_on_object_store_thread_callback_t)(realm_userdata_t userdata);
typedef void (*realm_on_object_store_error_callback_t)(realm_userdata_t userdata, const char*);
typedef bool (*realm_on_object_store_error_callback_t)(realm_userdata_t userdata, const char*);

/* Accessor types */
typedef struct realm_object realm_object_t;
Expand Down Expand Up @@ -3850,18 +3850,19 @@ RLM_API void realm_register_user_code_callback_error(realm_userdata_t usercode_e
typedef struct realm_thread_observer_token realm_thread_observer_token_t;

/**
* Register a callback handler for bindings interested in registering callbacks before/after the ObjectStore thread
* runs.
* Register an app local callback handler for bindings interested in registering callbacks before/after
* the ObjectStore thread runs for this app. This only works for the default socket provider implementation.
* @param config SyncClientConfig ptr created by realm_sync_client_config_new()
* @param on_thread_create callback invoked when the object store thread is created
* @param on_thread_destroy callback invoked when the object store thread is destroyed
* @param on_error callback invoked to signal to the listener that some error has occured.
* @return a token that has to be released in order to stop receiving notifications
* @param user_data pointer to user defined data that is provided to each of the callback functions
* @param free_userdata callback invoked when the user_data is to be freed
*/
RLM_API realm_thread_observer_token_t*
realm_set_binding_callback_thread_observer(realm_on_object_store_thread_callback_t on_thread_create,
realm_on_object_store_thread_callback_t on_thread_destroy,
realm_on_object_store_error_callback_t on_error, realm_userdata_t,
realm_free_userdata_func_t free_userdata);
RLM_API void realm_sync_client_config_set_default_binding_thread_observer(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are both functions needed? Isn't this one enough?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One sets the global thread observer (the way it works now) and the other sets the thread observer in SyncClientConfig to specify one on a "per app" basis (the new way).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to keep using it the way it works now (with the global thread observer)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should keep both for now, and, if the Java/Kotlin SDK team decides to move one way or the other, we can remove the other method. I'm not sure if/when they will be ready to migrate to the app-specific usage.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we decided to just have the per-app version...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the (only) function for setting the thread observer in the SyncClientConfig. If you prefer, I can move this function declaration so it is with the other realm_sync_client_config_... functions

realm_sync_client_config_t* config, realm_on_object_store_thread_callback_t on_thread_create,
realm_on_object_store_thread_callback_t on_thread_destroy, realm_on_object_store_error_callback_t on_error,
realm_userdata_t user_data, realm_free_userdata_func_t free_userdata);

#endif // REALM_ENABLE_SYNC

Expand Down
38 changes: 7 additions & 31 deletions src/realm/object-store/c_api/realm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,6 @@ realm_refresh_callback_token::~realm_refresh_callback_token()
realm::c_api::CBindingContext::get(*m_realm).realm_pending_refresh_callbacks().remove(m_token);
}

#if REALM_ENABLE_SYNC
realm_thread_observer_token::~realm_thread_observer_token()
{
realm::g_binding_callback_thread_observer = nullptr;
}
#endif // REALM_ENABLE_SYNC

namespace realm::c_api {

Expand Down Expand Up @@ -360,31 +354,13 @@ void CBindingContext::did_change(std::vector<ObserverState> const&, std::vector<
#if REALM_ENABLE_SYNC

RLM_API
realm_thread_observer_token_t*
realm_set_binding_callback_thread_observer(realm_on_object_store_thread_callback_t on_thread_create,
realm_on_object_store_thread_callback_t on_thread_destroy,
realm_on_object_store_error_callback_t on_error, realm_userdata_t userdata,
realm_free_userdata_func_t free_userdata)
{
realm::c_api::CBindingThreadObserver::ThreadCallback thread_create =
[on_thread_create, userdata = UserdataPtr{userdata, free_userdata}]() {
on_thread_create(userdata.get());
};

realm::c_api::CBindingThreadObserver::ThreadCallback thread_destroyed =
[on_thread_destroy, userdata = UserdataPtr{userdata, free_userdata}]() {
on_thread_destroy(userdata.get());
};

realm::c_api::CBindingThreadObserver::ErrorCallback error =
[on_error, userdata = UserdataPtr{userdata, free_userdata}](const char* error) {
on_error(userdata.get(), error);
};

auto& instance = realm::c_api::CBindingThreadObserver::create();
instance.set(std::move(thread_create), std::move(thread_destroyed), std::move(error));
g_binding_callback_thread_observer = &instance;
return new realm_thread_observer_token_t();
void realm_sync_client_config_set_default_binding_thread_observer(
realm_sync_client_config_t* config, realm_on_object_store_thread_callback_t on_thread_create,
realm_on_object_store_thread_callback_t on_thread_destroy, realm_on_object_store_error_callback_t on_error,
realm_userdata_t user_data, realm_free_userdata_func_t free_userdata)
{
config->default_socket_provider_thread_observer = std::make_shared<realm::c_api::CBindingThreadObserver>(
on_thread_create, on_thread_destroy, on_error, user_data, free_userdata);
}

#endif // REALM_ENABLE_SYNC
Expand Down
68 changes: 45 additions & 23 deletions src/realm/object-store/c_api/realm.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,47 +69,69 @@ class CBindingContext : public BindingContext {

#if REALM_ENABLE_SYNC

class CBindingThreadObserver : public realm::BindingCallbackThreadObserver {
struct CBindingThreadObserver : public realm::BindingCallbackThreadObserver {
public:
using ThreadCallback = util::UniqueFunction<void()>;
using ErrorCallback = util::UniqueFunction<void(const char*)>;

static CBindingThreadObserver& create()
CBindingThreadObserver(realm_on_object_store_thread_callback_t on_thread_create,
realm_on_object_store_thread_callback_t on_thread_destroy,
realm_on_object_store_error_callback_t on_error, realm_userdata_t userdata,
realm_free_userdata_func_t free_userdata)
: m_create_callback_func{on_thread_create}
, m_destroy_callback_func{on_thread_destroy}
, m_error_callback_func{on_error}
, m_user_data{UserdataPtr(userdata, free_userdata)}
{
static CBindingThreadObserver instance;
return instance;
}

void set(ThreadCallback&& on_create, ThreadCallback&& on_destroy, ErrorCallback&& on_error)
virtual ~CBindingThreadObserver() = default;

/// {@
/// For testing: Return the values in this CBindingThreadObserver for comparing if two objects
/// have the same callback functions and userdata ptr values.
inline realm_on_object_store_thread_callback_t get_create_callback_func() const noexcept
{
return m_create_callback_func;
}
inline realm_on_object_store_thread_callback_t get_destroy_callback_func() const noexcept
{
return m_destroy_callback_func;
}
inline realm_on_object_store_error_callback_t get_error_callback_func() const noexcept
{
return m_error_callback_func;
}
inline realm_userdata_t get_userdata_ptr() const noexcept
{
m_create_callback = std::move(on_create);
m_destroy_callback = std::move(on_destroy);
m_error_callback = std::move(on_error);
return m_user_data.get();
}
/// @}

protected:
CBindingThreadObserver() = default;

void did_create_thread() override
{
if (m_create_callback)
m_create_callback();
if (m_create_callback_func)
m_create_callback_func(m_user_data.get());
}

void will_destroy_thread() override
{
if (m_destroy_callback)
m_destroy_callback();
if (m_destroy_callback_func)
m_destroy_callback_func(m_user_data.get());
}

void handle_error(std::exception const& e) override
bool handle_error(std::exception const& e) override
{
if (m_error_callback)
m_error_callback(e.what());
if (!m_error_callback_func)
return false;

return m_error_callback_func(m_user_data.get(), e.what());
}

private:
CBindingThreadObserver() = default;
ThreadCallback m_create_callback;
ThreadCallback m_destroy_callback;
ErrorCallback m_error_callback;
realm_on_object_store_thread_callback_t m_create_callback_func = nullptr;
realm_on_object_store_thread_callback_t m_destroy_callback_func = nullptr;
realm_on_object_store_error_callback_t m_error_callback_func = nullptr;
UserdataPtr m_user_data;
};

#endif // REALM_ENABLE_SYNC
Expand Down
3 changes: 2 additions & 1 deletion src/realm/object-store/sync/impl/sync_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ struct SyncClient {
}
auto user_agent = util::format("RealmSync/%1 (%2) %3 %4", REALM_VERSION_STRING, util::get_platform_info(),
config.user_agent_binding_info, config.user_agent_application_info);
return std::make_shared<sync::websocket::DefaultSocketProvider>(logger, std::move(user_agent));
return std::make_shared<sync::websocket::DefaultSocketProvider>(
logger, std::move(user_agent), config.default_socket_provider_thread_observer);
}())
, m_client([&] {
sync::Client::Config c;
Expand Down
5 changes: 5 additions & 0 deletions src/realm/object-store/sync/sync_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <realm/util/checked_mutex.hpp>
#include <realm/util/logger.hpp>
#include <realm/util/optional.hpp>
#include <realm/sync/binding_callback_thread_observer.hpp>
#include <realm/sync/config.hpp>
#include <realm/sync/socket_provider.hpp>

Expand Down Expand Up @@ -85,6 +86,10 @@ struct SyncClientConfig {
// and creating WebSockets. If not provided the default implementation will be used.
std::shared_ptr<sync::SyncSocketProvider> socket_provider;

// Optional thread observer for event loop thread events in the default SyncSocketProvider
// implementation. It is not used for custom SyncSocketProvider implementations.
std::shared_ptr<BindingCallbackThreadObserver> default_socket_provider_thread_observer;

// {@
// Optional information about the binding/application that is sent as part of the User-Agent
// when establishing a connection to the server. These values are only used by the default
Expand Down
48 changes: 47 additions & 1 deletion src/realm/sync/binding_callback_thread_observer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,51 @@
#include <realm/sync/binding_callback_thread_observer.hpp>

namespace realm {
BindingCallbackThreadObserver* g_binding_callback_thread_observer = nullptr;

void BindingCallbackThreadObserver::call_did_create_thread(
const std::shared_ptr<BindingCallbackThreadObserver>& observer_ptr)
{
// Call into the observer ptr if not null, otherwise, use the global thread observer
if (observer_ptr)
observer_ptr->did_create_thread();
}

void BindingCallbackThreadObserver::call_will_destroy_thread(
const std::shared_ptr<BindingCallbackThreadObserver>& observer_ptr)
{
// Call into the observer ptr if not null, otherwise, use the global thread observer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like there is no global observer? Do we still need these static functions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These were just helper functions... I'll remove them and just use the direct function calls in the default socket provider.

if (observer_ptr)
observer_ptr->will_destroy_thread();
}

bool BindingCallbackThreadObserver::call_handle_error(
const std::exception& e, const std::shared_ptr<BindingCallbackThreadObserver>& observer_ptr)
{
// Call into the observer ptr if not null, otherwise, use the global thread observer
if (observer_ptr)
return observer_ptr->handle_error(e);
return false;
}

void BindingCallbackThreadObserver::did_create_thread()
{
if (m_create_thread_callback) {
(*m_create_thread_callback)();
}
}

void BindingCallbackThreadObserver::will_destroy_thread()
{
if (m_destroy_thread_callback) {
(*m_destroy_thread_callback)();
}
}

bool BindingCallbackThreadObserver::handle_error(const std::exception& e)
{
if (!m_handle_error_callback)
return false;

return (*m_handle_error_callback)(e);
}
} // namespace realm
83 changes: 77 additions & 6 deletions src/realm/sync/binding_callback_thread_observer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,94 @@
#define REALM_OS_BINDING_CALLBACK_THREAD_OBSERVER_HPP

#include <exception>
#include <functional>
#include <memory>
#include <mutex>
#include <optional>
#include <thread>


namespace realm {
// Interface for bindings interested in registering callbacks before/after the ObjectStore thread runs.
// This is for example helpful to attach/detach the pthread to the JavaVM in order to be able to perform JNI calls.
class BindingCallbackThreadObserver {
public:
struct BindingCallbackThreadObserver {
using NotificationCallback = std::function<void()>;
using ErrorCallback = std::function<bool(const std::exception&)>;

// Create a BindingCallbackThreadObserver that can be used in SyncClientConfig
BindingCallbackThreadObserver(std::optional<NotificationCallback>&& did_create_thread,
std::optional<NotificationCallback>&& will_destroy_thread,
std::optional<ErrorCallback>&& error_handler)
: m_create_thread_callback{std::move(did_create_thread)}
, m_destroy_thread_callback{std::move(will_destroy_thread)}
, m_handle_error_callback{std::move(error_handler)}
{
}

virtual ~BindingCallbackThreadObserver() = default;

///
/// Execution Functions - check for a valid instance and if the function was set
///

// BindingCallbackThreadObserver class that will call will_destroy_thread() when destroyed
struct ThreadGuard {
~ThreadGuard()
{
BindingCallbackThreadObserver::call_will_destroy_thread(m_observer);
}
// Constructor that only works with the global thread observer
ThreadGuard() = default;

// Constructor that works with either the local or global thread observer
ThreadGuard(const std::shared_ptr<BindingCallbackThreadObserver>& observer_ptr)
: m_observer{observer_ptr}
{
}

private:
std::shared_ptr<BindingCallbackThreadObserver> m_observer;
};

// This method is called just before the thread is started
virtual void did_create_thread() = 0;
// This takes an optional reference to an observer_ptr and will call that if non null, otherwise the
// global thread observer will be used.
static void call_did_create_thread(const std::shared_ptr<BindingCallbackThreadObserver>& observer_ptr);

// This method is called just before the thread is being destroyed
virtual void will_destroy_thread() = 0;
// This takes an optional reference to an observer_ptr and will call that if non null, otherwise the
// global thread observer will be used.
static void call_will_destroy_thread(const std::shared_ptr<BindingCallbackThreadObserver>& observer_ptr);

// This method is called with any exception thrown by client.run().
virtual void handle_error(std::exception const& e) = 0;
// This takes an optional reference to an observer_ptr and will call that if non null, otherwise the
// global thread observer will be used.
// Return true if the exception was handled by this function, otherwise false
static bool call_handle_error(const std::exception& e,
const std::shared_ptr<BindingCallbackThreadObserver>& observer_ptr);

protected:
// Default constructor
BindingCallbackThreadObserver() = default;

// Call the stored create thread callback function with the id of this thread
// Can be overridden to provide a custom implementation
virtual void did_create_thread();

// Call the stored destroy thread callback function with the id of this thread
// Can be overridden to provide a custom implementation
virtual void will_destroy_thread();

// Call the stored handle error callback function with the id of this thread
// Can be overridden to provide a custom implementation
// Return true if the exception was handled by this function, otherwise false
virtual bool handle_error(const std::exception& e);

std::optional<NotificationCallback> m_create_thread_callback;
std::optional<NotificationCallback> m_destroy_thread_callback;
std::optional<ErrorCallback> m_handle_error_callback;
};

extern BindingCallbackThreadObserver* g_binding_callback_thread_observer;
} // namespace realm

#endif // REALM_OS_BINDING_CALLBACK_THREAD_OBSERVER_HPP
1 change: 1 addition & 0 deletions src/realm/sync/client_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define REALM_SYNC_CLIENT_BASE_HPP

#include <realm/transaction.hpp>
#include <realm/sync/binding_callback_thread_observer.hpp>
#include <realm/sync/config.hpp>
#include <realm/sync/protocol.hpp>
#include <realm/sync/socket_provider.hpp>
Expand Down
Loading