Skip to content

Commit

Permalink
[core] active-registration-logic-getter (#1671)
Browse files Browse the repository at this point in the history
  • Loading branch information
rex-schilasky authored Jul 24, 2024
1 parent 9211a88 commit 9b5f082
Show file tree
Hide file tree
Showing 19 changed files with 328 additions and 288 deletions.
7 changes: 3 additions & 4 deletions ecal/core/src/pubsub/ecal_pubgate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,16 +194,15 @@ namespace eCAL
}
}

void CPubGate::RefreshRegistrations()
void CPubGate::GetRegistrations(Registration::SampleList& reg_sample_list_)
{
if (!m_created) return;

// refresh publisher registrations
// read reader registrations
const std::shared_lock<std::shared_timed_mutex> lock(m_topic_name_datawriter_sync);
for (const auto& iter : m_topic_name_datawriter_map)
{
// force data writer to (re)register itself on registration provider
iter.second->RefreshRegistration();
reg_sample_list_.samples.emplace_back(iter.second->GetRegistration());
}
}
}
2 changes: 1 addition & 1 deletion ecal/core/src/pubsub/ecal_pubgate.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ namespace eCAL
void ApplySubRegistration(const Registration::Sample& ecal_sample_);
void ApplySubUnregistration(const Registration::Sample& ecal_sample_);

void RefreshRegistrations();
void GetRegistrations(Registration::SampleList& reg_sample_list_);

protected:
static std::atomic<bool> m_created;
Expand Down
7 changes: 3 additions & 4 deletions ecal/core/src/pubsub/ecal_subgate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,16 +300,15 @@ namespace eCAL
}
}

void CSubGate::RefreshRegistrations()
void CSubGate::GetRegistrations(Registration::SampleList& reg_sample_list_)
{
if (!m_created) return;

// refresh reader registrations
// read reader registrations
const std::shared_lock<std::shared_timed_mutex> lock(m_topic_name_datareader_sync);
for (const auto& iter : m_topic_name_datareader_map)
{
// force data reader to (re)register itself on registration provider
iter.second->RefreshRegistration();
reg_sample_list_.samples.emplace_back(iter.second->GetRegistration());
}
}
}
4 changes: 2 additions & 2 deletions ecal/core/src/pubsub/ecal_subgate.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -54,7 +54,7 @@ namespace eCAL
void ApplyPubRegistration(const Registration::Sample& ecal_sample_);
void ApplyPubUnregistration(const Registration::Sample& ecal_sample_);

void RefreshRegistrations();
void GetRegistrations(Registration::SampleList& reg_sample_list_);

protected:
static std::atomic<bool> m_created;
Expand Down
106 changes: 51 additions & 55 deletions ecal/core/src/readwrite/ecal_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,28 +351,6 @@ namespace eCAL
}
}

void CDataReader::RefreshRegistration()
{
if (!m_created) return;

// ensure that registration is not called within zero nanoseconds
// normally it will be called from registration logic every second

// register without send
Register(false);

// check connection timeouts
{
const std::lock_guard<std::mutex> lock(m_pub_map_mtx);
m_pub_map.erase_expired();

if (m_pub_map.empty())
{
FireDisconnectEvent();
}
}
}

void CDataReader::InitializeLayers()
{
// initialize udp layer
Expand Down Expand Up @@ -555,15 +533,56 @@ namespace eCAL
return(out.str());
}

bool CDataReader::Register(const bool force_)
void CDataReader::Register(bool force_)
{
#if ECAL_CORE_REGISTRATION
if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(GetRegistrationSample(), force_);

#ifndef NDEBUG
// log it
Logging::Log(log_level_debug4, m_topic_name + "::CDataReader::Register");
#endif
#endif // ECAL_CORE_REGISTRATION
}

void CDataReader::Unregister()
{
#if ECAL_CORE_REGISTRATION
if (!m_created) return(false);
if(m_topic_name.empty()) return(false);
if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(GetUnregistrationSample(), false);

#ifndef NDEBUG
// log it
Logging::Log(log_level_debug4, m_topic_name + "::CDataReader::Unregister");
#endif
#endif // ECAL_CORE_REGISTRATION
}

void CDataReader::CheckConnections()
{
const std::lock_guard<std::mutex> lock(m_pub_map_mtx);
m_pub_map.erase_expired();

// create command parameter
if (m_pub_map.empty())
{
FireDisconnectEvent();
}
}

Registration::Sample CDataReader::GetRegistration()
{
// check connection timeouts
CheckConnections();

// return registration
return GetRegistrationSample();
}

Registration::Sample CDataReader::GetRegistrationSample()
{
// create registration sample
Registration::Sample ecal_reg_sample;
ecal_reg_sample.cmd_type = bct_reg_subscriber;

auto& ecal_reg_sample_topic = ecal_reg_sample.topic;
ecal_reg_sample_topic.hname = m_host_name;
ecal_reg_sample_topic.hgname = m_host_group_name;
Expand Down Expand Up @@ -632,28 +651,15 @@ namespace eCAL
ecal_reg_sample_topic.connections_loc = 0;
ecal_reg_sample_topic.connections_ext = 0;

// register subscriber
if(g_registration_provider() != nullptr) g_registration_provider()->ApplySample(ecal_reg_sample, force_);
#ifndef NDEBUG
// log it
Logging::Log(log_level_debug4, m_topic_name + "::CDataReader::DoRegister");
#endif

return(true);
#else // ECAL_CORE_REGISTRATION
(void)force_;
return(false);
#endif // ECAL_CORE_REGISTRATION
return ecal_reg_sample;
}

bool CDataReader::Unregister()
Registration::Sample CDataReader::GetUnregistrationSample()
{
#if ECAL_CORE_REGISTRATION
if (m_topic_name.empty()) return(false);

// create command parameter
// create unregistration sample
Registration::Sample ecal_unreg_sample;
ecal_unreg_sample.cmd_type = bct_unreg_subscriber;

auto& ecal_reg_sample_topic = ecal_unreg_sample.topic;
ecal_reg_sample_topic.hname = m_host_name;
ecal_reg_sample_topic.hgname = m_host_group_name;
Expand All @@ -663,19 +669,9 @@ namespace eCAL
ecal_reg_sample_topic.tid = m_topic_id;
ecal_reg_sample_topic.uname = Process::GetUnitName();

// unregister subscriber
if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(ecal_unreg_sample, false);
#ifndef NDEBUG
// log it
Logging::Log(log_level_debug4, m_topic_name + "::CDataReader::Unregister");
#endif

return(true);
#else // ECAL_CORE_REGISTRATION
return(false);
#endif // ECAL_CORE_REGISTRATION
return ecal_unreg_sample;
}

void CDataReader::StartTransportLayer()
{
#if ECAL_CORE_TRANSPORT_UDP
Expand Down
12 changes: 8 additions & 4 deletions ecal/core/src/readwrite/ecal_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@ namespace eCAL

void ApplyLayerParameter(const SPublicationInfo& publication_info_, eTLayerType type_, const Registration::ConnectionPar& parameter_);

void RefreshRegistration();

Registration::Sample GetRegistration();
bool IsCreated() const { return(m_created); }

bool IsPublished() const
Expand All @@ -126,8 +125,13 @@ namespace eCAL
std::string Dump(const std::string& indent_ = "");

protected:
bool Register(bool force_);
bool Unregister();
void Register(bool force_);
void Unregister();

void CheckConnections();

Registration::Sample GetRegistrationSample();
Registration::Sample GetUnregistrationSample();

void StartTransportLayer();
void StopTransportLayer();
Expand Down
101 changes: 48 additions & 53 deletions ecal/core/src/readwrite/ecal_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -547,25 +547,6 @@ namespace eCAL
#endif
}

void CDataWriter::RefreshRegistration()
{
if (!m_created) return;

// register without send
Register(false);

// check connection timeouts
{
const std::lock_guard<std::mutex> lock(m_sub_map_mtx);
m_sub_map.erase_expired();

if (m_sub_map.empty())
{
FireDisconnectEvent();
}
}
}

void CDataWriter::RefreshSendCounter()
{
// increase write clock
Expand Down Expand Up @@ -605,13 +586,53 @@ namespace eCAL
return(out.str());
}

bool CDataWriter::Register(bool force_)
void CDataWriter::Register(bool force_)
{
#if ECAL_CORE_REGISTRATION
if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(GetRegistrationSample(), force_);

#ifndef NDEBUG
// log it
Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::Register");
#endif
#endif // ECAL_CORE_REGISTRATION
}

void CDataWriter::Unregister()
{
#if ECAL_CORE_REGISTRATION
if (!m_created) return(false);
if (m_topic_name.empty()) return(false);
if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(GetUnregistrationSample(), false);

// create command parameter
#ifndef NDEBUG
// log it
Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::Unregister");
#endif
#endif // ECAL_CORE_REGISTRATION
}

void CDataWriter::CheckConnections()
{
const std::lock_guard<std::mutex> lock(m_sub_map_mtx);
m_sub_map.erase_expired();

if (m_sub_map.empty())
{
FireDisconnectEvent();
}
}

Registration::Sample CDataWriter::GetRegistration()
{
// check connection timeouts
CheckConnections();

if (m_created) return GetRegistrationSample();
else return GetUnregistrationSample();
}

Registration::Sample CDataWriter::GetRegistrationSample()
{
// create registration sample
Registration::Sample ecal_reg_sample;
ecal_reg_sample.cmd_type = bct_reg_publisher;

Expand Down Expand Up @@ -701,27 +722,12 @@ namespace eCAL
ecal_reg_sample_topic.connections_loc = static_cast<int32_t>(loc_connections);
ecal_reg_sample_topic.connections_ext = static_cast<int32_t>(ext_connections);

// register publisher
if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(ecal_reg_sample, force_);

#ifndef NDEBUG
// log it
Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::Register");
#endif

return(true);
#else // ECAL_CORE_REGISTRATION
(void)force_;
return(false);
#endif // ECAL_CORE_REGISTRATION
return ecal_reg_sample;
}

bool CDataWriter::Unregister()
Registration::Sample CDataWriter::GetUnregistrationSample()
{
#if ECAL_CORE_REGISTRATION
if (m_topic_name.empty()) return(false);

// create command parameter
// create unregistration sample
Registration::Sample ecal_unreg_sample;
ecal_unreg_sample.cmd_type = bct_unreg_publisher;

Expand All @@ -734,18 +740,7 @@ return(false);
ecal_reg_sample_topic.tid = m_topic_id;
ecal_reg_sample_topic.uname = Process::GetUnitName();

// unregister publisher
if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(ecal_unreg_sample, false);

#ifndef NDEBUG
// log it
Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::UnRegister");
#endif

return(true);
#else // ECAL_CORE_REGISTRATION
return(false);
#endif // ECAL_CORE_REGISTRATION
return ecal_unreg_sample;
}

void CDataWriter::FireConnectEvent(const std::string& tid_, const SDataTypeInformation& tinfo_)
Expand Down
Loading

0 comments on commit 9b5f082

Please sign in to comment.