Skip to content

Commit

Permalink
wrap the zenoh session with a shared pointer
Browse files Browse the repository at this point in the history
Signed-off-by: Alejandro Hernández Cordero <ahcorde@gmail.com>
  • Loading branch information
ahcorde committed Dec 10, 2024
1 parent a6a09f4 commit f8e85b1
Show file tree
Hide file tree
Showing 11 changed files with 48 additions and 21 deletions.
6 changes: 4 additions & 2 deletions rmw_zenoh_cpp/src/detail/rmw_client_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ namespace rmw_zenoh_cpp
{
///=============================================================================
std::shared_ptr<ClientData> ClientData::make(
const std::shared_ptr<zenoh::Session> & session,
std::shared_ptr<zenoh::Session> session,
const rmw_node_t * const node,
const rmw_client_t * client,
liveliness::NodeInfo node_info,
Expand Down Expand Up @@ -174,7 +174,7 @@ ClientData::ClientData(
}

///=============================================================================
bool ClientData::init(const std::shared_ptr<zenoh::Session> & session)
bool ClientData::init(const std::shared_ptr<zenoh::Session> session)
{
std::string topic_keyexpr = this->entity_->topic_info().value().topic_keyexpr_;
keyexpr_ = zenoh::KeyExpr(topic_keyexpr);
Expand All @@ -192,6 +192,7 @@ bool ClientData::init(const std::shared_ptr<zenoh::Session> & session)
return false;
}

sess_ = session;
initialized_ = true;

return true;
Expand Down Expand Up @@ -515,6 +516,7 @@ void ClientData::_shutdown()
return;
}

sess_.reset();
is_shutdown_ = true;
}

Expand Down
6 changes: 4 additions & 2 deletions rmw_zenoh_cpp/src/detail/rmw_client_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class ClientData final : public std::enable_shared_from_this<ClientData>
public:
// Make a shared_ptr of ClientData.
static std::shared_ptr<ClientData> make(
const std::shared_ptr<zenoh::Session> & session,
std::shared_ptr<zenoh::Session> session,
const rmw_node_t * const node,
const rmw_client_t * client,
liveliness::NodeInfo node_info,
Expand Down Expand Up @@ -120,7 +120,7 @@ class ClientData final : public std::enable_shared_from_this<ClientData>
std::shared_ptr<ResponseTypeSupport> response_type_support);

// Initialize the Zenoh objects for this entity.
bool init(const std::shared_ptr<zenoh::Session> & session);
bool init(const std::shared_ptr<zenoh::Session> session);

// Shutdown this client (the mutex is expected to be held by the caller).
void _shutdown();
Expand All @@ -132,6 +132,8 @@ class ClientData final : public std::enable_shared_from_this<ClientData>
const rmw_client_t * rmw_client_;
// The Entity generated for the service.
std::shared_ptr<liveliness::Entity> entity_;
// A shared session.
std::shared_ptr<zenoh::Session> sess_;
// An owned keyexpression.
std::optional<zenoh::KeyExpr> keyexpr_;
// Liveliness token for the service.
Expand Down
2 changes: 1 addition & 1 deletion rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,12 +262,12 @@ class rmw_context_impl_s::Data final
return RMW_RET_ERROR;
}

session_.reset();
is_shutdown_ = true;

// We specifically do *not* hold the mutex_ while tearing down the session; this allows us
// to avoid an AB/BA deadlock if shutdown is racing with graph_sub_data_handler().
}
session_.reset();
return RMW_RET_OK;
}

Expand Down
10 changes: 5 additions & 5 deletions rmw_zenoh_cpp/src/detail/rmw_node_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ namespace rmw_zenoh_cpp
std::shared_ptr<NodeData> NodeData::make(
const rmw_node_t * const node,
std::size_t id,
const std::shared_ptr<zenoh::Session> & session,
std::shared_ptr<zenoh::Session> session,
std::size_t domain_id,
const std::string & namespace_,
const std::string & node_name,
Expand Down Expand Up @@ -118,7 +118,7 @@ std::size_t NodeData::id() const
///=============================================================================
bool NodeData::create_pub_data(
const rmw_publisher_t * const publisher,
const std::shared_ptr<zenoh::Session> & session,
std::shared_ptr<zenoh::Session> session,
std::size_t id,
const std::string & topic_name,
const rosidl_message_type_support_t * type_support,
Expand Down Expand Up @@ -184,7 +184,7 @@ void NodeData::delete_pub_data(const rmw_publisher_t * const publisher)
///=============================================================================
bool NodeData::create_sub_data(
const rmw_subscription_t * const subscription,
const std::shared_ptr<zenoh::Session> & session,
std::shared_ptr<zenoh::Session> session,
std::shared_ptr<GraphCache> graph_cache,
std::size_t id,
const std::string & topic_name,
Expand Down Expand Up @@ -252,7 +252,7 @@ void NodeData::delete_sub_data(const rmw_subscription_t * const subscription)
///=============================================================================
bool NodeData::create_service_data(
const rmw_service_t * const service,
const std::shared_ptr<zenoh::Session> & session,
std::shared_ptr<zenoh::Session> session,
std::size_t id,
const std::string & service_name,
const rosidl_service_type_support_t * type_supports,
Expand Down Expand Up @@ -319,7 +319,7 @@ void NodeData::delete_service_data(const rmw_service_t * const service)
///=============================================================================
bool NodeData::create_client_data(
const rmw_client_t * const client,
const std::shared_ptr<zenoh::Session> & session,
std::shared_ptr<zenoh::Session> session,
std::size_t id,
const std::string & service_name,
const rosidl_service_type_support_t * type_supports,
Expand Down
10 changes: 5 additions & 5 deletions rmw_zenoh_cpp/src/detail/rmw_node_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class NodeData final
static std::shared_ptr<NodeData> make(
const rmw_node_t * const node,
std::size_t id,
const std::shared_ptr<zenoh::Session> & session,
std::shared_ptr<zenoh::Session> session,
std::size_t domain_id,
const std::string & namespace_,
const std::string & node_name,
Expand All @@ -55,7 +55,7 @@ class NodeData final
// Create a new PublisherData for a given rmw_publisher_t.
bool create_pub_data(
const rmw_publisher_t * const publisher,
const std::shared_ptr<zenoh::Session> & session,
std::shared_ptr<zenoh::Session> session,
std::size_t id,
const std::string & topic_name,
const rosidl_message_type_support_t * type_support,
Expand All @@ -70,7 +70,7 @@ class NodeData final
// Create a new SubscriptionData for a given rmw_subscription_t.
bool create_sub_data(
const rmw_subscription_t * const subscription,
const std::shared_ptr<zenoh::Session> & session,
std::shared_ptr<zenoh::Session> session,
std::shared_ptr<GraphCache> graph_cache,
std::size_t id,
const std::string & topic_name,
Expand All @@ -86,7 +86,7 @@ class NodeData final
// Create a new ServiceData for a given rmw_service_t.
bool create_service_data(
const rmw_service_t * const service,
const std::shared_ptr<zenoh::Session> & session,
std::shared_ptr<zenoh::Session> session,
std::size_t id,
const std::string & service_name,
const rosidl_service_type_support_t * type_support,
Expand All @@ -101,7 +101,7 @@ class NodeData final
// Create a new ClientData for a given rmw_client_t.
bool create_client_data(
const rmw_client_t * const client,
const std::shared_ptr<zenoh::Session> & session,
std::shared_ptr<zenoh::Session> session,
std::size_t id,
const std::string & service_name,
const rosidl_service_type_support_t * type_support,
Expand Down
6 changes: 5 additions & 1 deletion rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ namespace rmw_zenoh_cpp

///=============================================================================
std::shared_ptr<PublisherData> PublisherData::make(
const std::shared_ptr<zenoh::Session> & session,
std::shared_ptr<zenoh::Session> session,
const rmw_node_t * const node,
liveliness::NodeInfo node_info,
std::size_t node_id,
Expand Down Expand Up @@ -158,6 +158,7 @@ std::shared_ptr<PublisherData> PublisherData::make(
new PublisherData{
node,
std::move(entity),
std::move(session),
std::move(pub),
std::move(pub_cache),
std::move(token),
Expand All @@ -170,13 +171,15 @@ std::shared_ptr<PublisherData> PublisherData::make(
PublisherData::PublisherData(
const rmw_node_t * rmw_node,
std::shared_ptr<liveliness::Entity> entity,
std::shared_ptr<zenoh::Session> sess,
zenoh::Publisher pub,
std::optional<zenoh::ext::PublicationCache> pub_cache,
zenoh::LivelinessToken token,
const void * type_support_impl,
std::unique_ptr<MessageTypeSupport> type_support)
: rmw_node_(rmw_node),
entity_(std::move(entity)),
sess_(std::move(sess)),
pub_(std::move(pub)),
pub_cache_(std::move(pub_cache)),
token_(std::move(token)),
Expand Down Expand Up @@ -421,6 +424,7 @@ rmw_ret_t PublisherData::shutdown()
return RMW_RET_ERROR;
}

sess_.reset();
is_shutdown_ = true;
return RMW_RET_OK;
}
Expand Down
5 changes: 4 additions & 1 deletion rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class PublisherData final
public:
// Make a shared_ptr of PublisherData.
static std::shared_ptr<PublisherData> make(
const std::shared_ptr<zenoh::Session> & session,
std::shared_ptr<zenoh::Session> session,
const rmw_node_t * const node,
liveliness::NodeInfo node_info,
std::size_t node_id,
Expand Down Expand Up @@ -92,6 +92,7 @@ class PublisherData final
PublisherData(
const rmw_node_t * rmw_node,
std::shared_ptr<liveliness::Entity> entity,
std::shared_ptr<zenoh::Session> session,
zenoh::Publisher pub,
std::optional<zenoh::ext::PublicationCache> pub_cache,
zenoh::LivelinessToken token,
Expand All @@ -104,6 +105,8 @@ class PublisherData final
const rmw_node_t * rmw_node_;
// The Entity generated for the publisher.
std::shared_ptr<liveliness::Entity> entity_;
// A shared session.
std::shared_ptr<zenoh::Session> sess_;
// An owned publisher.
zenoh::Publisher pub_;
// Optional publication cache when durability is transient_local.
Expand Down
6 changes: 5 additions & 1 deletion rmw_zenoh_cpp/src/detail/rmw_service_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ namespace rmw_zenoh_cpp
{
///=============================================================================
std::shared_ptr<ServiceData> ServiceData::make(
const std::shared_ptr<zenoh::Session> & session,
std::shared_ptr<zenoh::Session> session,
const rmw_node_t * const node,
liveliness::NodeInfo node_info,
std::size_t node_id,
Expand Down Expand Up @@ -128,6 +128,7 @@ std::shared_ptr<ServiceData> ServiceData::make(
new ServiceData{
node,
std::move(entity),
session,
request_members,
response_members,
std::move(request_type_support),
Expand Down Expand Up @@ -191,12 +192,14 @@ std::shared_ptr<ServiceData> ServiceData::make(
ServiceData::ServiceData(
const rmw_node_t * rmw_node,
std::shared_ptr<liveliness::Entity> entity,
std::shared_ptr<zenoh::Session> session,
const void * request_type_support_impl,
const void * response_type_support_impl,
std::unique_ptr<RequestTypeSupport> request_type_support,
std::unique_ptr<ResponseTypeSupport> response_type_support)
: rmw_node_(rmw_node),
entity_(std::move(entity)),
sess_(std::move(session)),
request_type_support_impl_(request_type_support_impl),
response_type_support_impl_(response_type_support_impl),
request_type_support_(std::move(request_type_support)),
Expand Down Expand Up @@ -536,6 +539,7 @@ rmw_ret_t ServiceData::shutdown()
}
}

sess_.reset();
is_shutdown_ = true;
return RMW_RET_OK;
}
Expand Down
6 changes: 5 additions & 1 deletion rmw_zenoh_cpp/src/detail/rmw_service_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class ServiceData final : public std::enable_shared_from_this<ServiceData>
public:
// Make a shared_ptr of ServiceData.
static std::shared_ptr<ServiceData> make(
const std::shared_ptr<zenoh::Session> & session,
std::shared_ptr<zenoh::Session> session,
const rmw_node_t * const node,
liveliness::NodeInfo node_info,
std::size_t node_id,
Expand Down Expand Up @@ -99,6 +99,7 @@ class ServiceData final : public std::enable_shared_from_this<ServiceData>
ServiceData(
const rmw_node_t * rmw_node,
std::shared_ptr<liveliness::Entity> entity,
std::shared_ptr<zenoh::Session> session,
const void * request_type_support_impl,
const void * response_type_support_impl,
std::unique_ptr<RequestTypeSupport> request_type_support,
Expand All @@ -110,6 +111,9 @@ class ServiceData final : public std::enable_shared_from_this<ServiceData>
const rmw_node_t * rmw_node_;
// The Entity generated for the service.
std::shared_ptr<liveliness::Entity> entity_;

// A shared session
std::shared_ptr<zenoh::Session> sess_;
// The keyexpr string.
std::string keyexpr_;
// An owned queryable.
Expand Down
7 changes: 6 additions & 1 deletion rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ SubscriptionData::Message::~Message()

///=============================================================================
std::shared_ptr<SubscriptionData> SubscriptionData::make(
const std::shared_ptr<zenoh::Session> & session,
std::shared_ptr<zenoh::Session> session,
std::shared_ptr<GraphCache> graph_cache,
const rmw_node_t * const node,
liveliness::NodeInfo node_info,
Expand Down Expand Up @@ -123,6 +123,7 @@ std::shared_ptr<SubscriptionData> SubscriptionData::make(
node,
graph_cache,
std::move(entity),
std::move(session),
type_support->data,
std::move(message_type_support)
});
Expand All @@ -140,11 +141,13 @@ SubscriptionData::SubscriptionData(
const rmw_node_t * rmw_node,
std::shared_ptr<GraphCache> graph_cache,
std::shared_ptr<liveliness::Entity> entity,
std::shared_ptr<zenoh::Session> session,
const void * type_support_impl,
std::unique_ptr<MessageTypeSupport> type_support)
: rmw_node_(rmw_node),
graph_cache_(std::move(graph_cache)),
entity_(std::move(entity)),
sess_(std::move(session)),
type_support_impl_(type_support_impl),
type_support_(std::move(type_support)),
last_known_published_msg_({}),
Expand Down Expand Up @@ -175,6 +178,8 @@ bool SubscriptionData::init()

rmw_context_impl_t * context_impl = static_cast<rmw_context_impl_t *>(rmw_node_->context->impl);

sess_ = context_impl->session();

// Instantiate the subscription with suitable options depending on the
// adapted_qos_profile.
// TODO(Yadunund): Rely on a separate function to return the sub
Expand Down
5 changes: 4 additions & 1 deletion rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class SubscriptionData final : public std::enable_shared_from_this<SubscriptionD

// Make a shared_ptr of SubscriptionData.
static std::shared_ptr<SubscriptionData> make(
const std::shared_ptr<zenoh::Session> & session,
std::shared_ptr<zenoh::Session> session,
std::shared_ptr<GraphCache> graph_cache,
const rmw_node_t * const node,
liveliness::NodeInfo node_info,
Expand Down Expand Up @@ -126,6 +126,7 @@ class SubscriptionData final : public std::enable_shared_from_this<SubscriptionD
const rmw_node_t * rmw_node,
std::shared_ptr<GraphCache> graph_cache,
std::shared_ptr<liveliness::Entity> entity,
std::shared_ptr<zenoh::Session> session,
const void * type_support_impl,
std::unique_ptr<MessageTypeSupport> type_support);

Expand All @@ -139,6 +140,8 @@ class SubscriptionData final : public std::enable_shared_from_this<SubscriptionD
std::shared_ptr<GraphCache> graph_cache_;
// The Entity generated for the subscription.
std::shared_ptr<liveliness::Entity> entity_;
// A shared session
std::shared_ptr<zenoh::Session> sess_;
// An owned subscriber or querying_subscriber depending on the QoS settings.
std::optional<std::variant<zenoh::Subscriber<void>, zenoh::ext::QueryingSubscriber<void>>> sub_;
// Liveliness token for the subscription.
Expand Down

0 comments on commit f8e85b1

Please sign in to comment.