Skip to content

Commit

Permalink
[core] new IsPublished API for CSubscriber, improved state logic for …
Browse files Browse the repository at this point in the history
…IsPublished and GetPublisherCount
  • Loading branch information
rex-schilasky authored Jun 25, 2024
1 parent 55eb50f commit 35c1fd9
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 4 deletions.
7 changes: 7 additions & 0 deletions ecal/core/include/ecal/ecal_subscriber.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,13 @@ namespace eCAL
**/
ECAL_API bool IsCreated() const {return(m_created);}

/**
* @brief Query if the subscriber is published.
*
* @return true if published, false if not.
**/
ECAL_API bool IsPublished() const;

/**
* @brief Query the number of publishers.
*
Expand Down
14 changes: 11 additions & 3 deletions ecal/core/src/pubsub/ecal_subgate.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -261,8 +261,16 @@ namespace eCAL
{
iter->second->ApplyLayerParameter(publication_info, tlayer.type, tlayer.par_layer);
}
// inform for publisher connection
iter->second->ApplyPublication(publication_info, topic_information, layer_states);
// we only inform the subscriber when the publisher has already recognized at least one subscriber
// this should avoid to set the "IsPublished" state before the publisher is able to send data
const bool local_publication = publication_info.host_name == Process::GetHostName();
const bool external_publication = !local_publication;
const bool local_confirmed = local_publication && (ecal_sample_.topic.connections_loc > 0);
const bool external_confirmed = external_publication && (ecal_sample_.topic.connections_ext > 0);
if(local_confirmed || external_confirmed)
{
iter->second->ApplyPublication(publication_info, topic_information, layer_states);
}
}
}

Expand Down
6 changes: 6 additions & 0 deletions ecal/core/src/pubsub/ecal_subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,12 @@ namespace eCAL
return(m_datareader->RemEventCallback(type_));
}

bool CSubscriber::IsPublished() const
{
if (m_datareader == nullptr) return(false);
return(m_datareader->IsPublished());
}

size_t CSubscriber::GetPublisherCount() const
{
if (m_datareader == nullptr) return(0);
Expand Down
96 changes: 95 additions & 1 deletion ecal/tests/cpp/pubsub_test/src/pubsub_receive_test.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -216,3 +216,97 @@ TEST(core_cpp_pubsub, SporadicEmptyReceives)
// finalize eCAL API
EXPECT_EQ(0, eCAL::Finalize());
}

TEST(PubSub, TestSubscriberSeen)
{
// initialize eCAL API
EXPECT_EQ(0, eCAL::Initialize(0, nullptr, "subscriber_seen"));

// enable data loopback
eCAL::Util::EnableLoopback(true);

std::atomic<bool> subscriber_seen_at_publication_start(false);
std::atomic<bool> subscriber_seen_at_publication_end(false);

std::atomic<bool> do_start_publication(false);
std::atomic<bool> publication_finished(false);

// publishing thread
auto publisher_thread = [&]() {
eCAL::Publisher::Configuration pub_config;
pub_config.shm.acknowledge_timeout_ms = 500;
eCAL::CPublisher pub("blob", pub_config);

int cnt(0);
const auto max_runs(1000);
while (eCAL::Ok())
{
if (do_start_publication && cnt < max_runs)
{
if (cnt == 0)
{
subscriber_seen_at_publication_start = pub.IsSubscribed();
}

pub.Send(std::to_string(cnt));
cnt++;

if (cnt == max_runs)
{
subscriber_seen_at_publication_end = pub.IsSubscribed();
publication_finished = true;
break;
}
}
}
};

// subscribing thread
auto subscriber_thread = [&]() {
eCAL::CSubscriber sub("blob");
bool received(false);
auto max_lines(10);
auto receive_lambda = [&received, &max_lines](const char* /*topic_name_*/, const struct eCAL::SReceiveCallbackData* data_)
{
if (max_lines)
{
// the final log should look like this
// -----------------------------------
// Receiving 0
// Receiving 1
// Receiving 2
// Receiving 3
// Receiving 4
// Receiving 5
// Receiving 6
// Receiving 7
// Receiving 8
// Receiving 9
// -----------------------------------
std::cout << "Receiving " << std::string(static_cast<const char*>(data_->buf), data_->size) << std::endl;
max_lines--;
}
};
sub.AddReceiveCallback(receive_lambda);

while (eCAL::Ok() && !publication_finished)
{
if (sub.IsPublished()) do_start_publication = true;
}
};

// create threads for publisher and subscriber
std::thread pub_thread(publisher_thread);
std::thread sub_thread(subscriber_thread);

// join threads to the main thread
pub_thread.join();
sub_thread.join();

// finalize eCAL API
eCAL::Finalize();

// check if the publisher has seen the subscriber
EXPECT_TRUE(subscriber_seen_at_publication_start);
EXPECT_TRUE(subscriber_seen_at_publication_end);
}

0 comments on commit 35c1fd9

Please sign in to comment.