Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix singleton destruction order <master> [10533] #1748

Merged
merged 5 commits into from
Feb 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/cpp/fastdds/domain/DomainParticipantFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
#include <fastrtps/types/DynamicDataFactory.h>
#include <fastrtps/types/TypeObjectFactory.h>

#include <rtps/history/TopicPayloadPoolRegistry.hpp>

using namespace eprosima::fastrtps::xmlparser;

using eprosima::fastrtps::ParticipantAttributes;
Expand Down Expand Up @@ -97,6 +99,10 @@ DomainParticipantFactory::~DomainParticipantFactory()

DomainParticipantFactory* DomainParticipantFactory::get_instance()
{
// Keep a reference to the topic payload pool to avoid it to be destroyed before our own instance
using pool_registry_ref = eprosima::fastrtps::rtps::TopicPayloadPoolRegistry::reference;
static pool_registry_ref topic_pool_registry = eprosima::fastrtps::rtps::TopicPayloadPoolRegistry::instance();

static DomainParticipantFactory instance;
return &instance;
}
Expand Down
7 changes: 6 additions & 1 deletion src/cpp/rtps/history/TopicPayloadPoolRegistry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,16 @@ namespace eprosima {
namespace fastrtps {
namespace rtps {

const TopicPayloadPoolRegistry::reference& TopicPayloadPoolRegistry::instance()
{
return detail::TopicPayloadPoolRegistry::instance();
}

std::shared_ptr<ITopicPayloadPool> TopicPayloadPoolRegistry::get(
const std::string& topic_name,
const BasicPoolConfig& config)
{
return detail::TopicPayloadPoolRegistry::instance().get(topic_name, config);
return detail::TopicPayloadPoolRegistry::instance()->get(topic_name, config);
}

} // namespace rtps
Expand Down
8 changes: 8 additions & 0 deletions src/cpp/rtps/history/TopicPayloadPoolRegistry.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,19 @@ namespace eprosima {
namespace fastrtps {
namespace rtps {

namespace detail {
class TopicPayloadPoolRegistry;
} // namespace detail

class TopicPayloadPoolRegistry
{

public:

using reference = std::shared_ptr<detail::TopicPayloadPoolRegistry>;

static const reference& instance();

static std::shared_ptr<ITopicPayloadPool> get(
const std::string& topic_name,
const BasicPoolConfig& config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,10 @@ class TopicPayloadPoolRegistry

public:

~TopicPayloadPoolRegistry() = default;

/// @return reference to singleton instance
static TopicPayloadPoolRegistry& instance()
static const std::shared_ptr<TopicPayloadPoolRegistry>& instance()
{
static TopicPayloadPoolRegistry pool_registry_instance;
static std::shared_ptr<TopicPayloadPoolRegistry> pool_registry_instance(new TopicPayloadPoolRegistry());
return pool_registry_instance;
}

Expand Down
30 changes: 19 additions & 11 deletions src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,10 @@ class SharedMemGlobal
MultiProducerConsumerRingBuffer<BufferDescriptor>* buffer;
};

static WatchTask& get()
static const std::shared_ptr<WatchTask>& get()
{
static WatchTask watch_task;
return watch_task;
static std::shared_ptr<WatchTask> watch_task_instance(new WatchTask());
return watch_task_instance;
}

/**
Expand Down Expand Up @@ -203,19 +203,23 @@ class SharedMemGlobal

}

virtual ~WatchTask()
{
shared_mem_watchdog_->remove_task(this);
}

private:

std::vector<std::shared_ptr<PortContext>> watched_ports_;
std::mutex watched_ports_mutex_;

WatchTask()
{
SharedMemWatchdog::get().add_task(this);
}
// Keep a reference to the SharedMemWatchdog so that it is not destroyed until this instance is destroyed
std::shared_ptr<SharedMemWatchdog> shared_mem_watchdog_;

~WatchTask()
WatchTask()
: shared_mem_watchdog_(SharedMemWatchdog::get())
{
SharedMemWatchdog::get().remove_task(this);
shared_mem_watchdog_->add_task(this);
}

bool update_status_all_listeners(
Expand Down Expand Up @@ -340,6 +344,9 @@ class SharedMemGlobal
return (listeners_found == node_->num_listeners);
}

// Keep a reference to the WatchTask so that it is not destroyed until the last Port instance is destroyed
std::shared_ptr<WatchTask> watch_task_;

public:

/**
Expand Down Expand Up @@ -379,6 +386,7 @@ class SharedMemGlobal
, node_(node)
, overflows_count_(0)
, read_exclusive_lock_(std::move(read_exclusive_lock))
, watch_task_(WatchTask::get())
{
auto buffer_base = static_cast<MultiProducerConsumerRingBuffer<BufferDescriptor>::Cell*>(
port_segment_->get_address_from_offset(node_->buffer));
Expand All @@ -393,12 +401,12 @@ class SharedMemGlobal

auto port_context = std::make_shared<Port::WatchTask::PortContext>();
*port_context = {port_segment_, node_, buffer_.get()};
Port::WatchTask::get().add_port(std::move(port_context));
Port::WatchTask::get()->add_port(std::move(port_context));
}

~Port()
{
Port::WatchTask::get().remove_port(node_);
Port::WatchTask::get()->remove_port(node_);

if (node_->ref_counter.fetch_sub(1) == 1)
{
Expand Down
30 changes: 19 additions & 11 deletions src/cpp/rtps/transport/shared_mem/SharedMemManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ class SharedMemManager :
const std::string& domain_name)
: segments_mem_(0)
, global_segment_(domain_name)
, watch_task_(SegmentWrapper::WatchTask::get())
{
static_assert(std::alignment_of<BufferNode>::value % 8 == 0, "SharedMemManager::BufferNode bad alignment");

Expand Down Expand Up @@ -1000,10 +1001,10 @@ class SharedMemManager :
{
public:

static WatchTask& get()
static std::shared_ptr<WatchTask>& get()
{
static WatchTask watch_task;
return watch_task;
static std::shared_ptr<WatchTask> watch_task_instance(new WatchTask());
return watch_task_instance;
}

void add_segment(
Expand All @@ -1024,6 +1025,11 @@ class SharedMemManager :
to_remove_.push_back(segment);
}

virtual ~WatchTask()
{
shared_mem_watchdog_->remove_task(this);
}

private:

std::unordered_map<std::shared_ptr<SegmentWrapper>, uint32_t> watched_segments_;
Expand All @@ -1033,15 +1039,14 @@ class SharedMemManager :
std::vector<std::shared_ptr<SegmentWrapper>> to_add_;
std::vector<std::shared_ptr<SegmentWrapper>> to_remove_;

// Keep a reference to the SharedMemWatchdog so that it is not destroyed until this instance is destroyed
std::shared_ptr<SharedMemWatchdog> shared_mem_watchdog_;

WatchTask()
: watched_it_(watched_segments_.end())
, shared_mem_watchdog_(SharedMemWatchdog::get())
{
SharedMemWatchdog::get().add_task(this);
}

~WatchTask()
{
SharedMemWatchdog::get().remove_task(this);
shared_mem_watchdog_->add_task(this);
}

void update_watched_segments()
Expand Down Expand Up @@ -1192,6 +1197,9 @@ class SharedMemManager :

SharedMemGlobal global_segment_;

// Keep a reference to the WatchTask so that it is not destroyed until all Manger instances are destroyed
std::shared_ptr<SegmentWrapper::WatchTask> watch_task_;

std::shared_ptr<SharedMemSegment> find_segment(
SharedMemSegment::Id id)
{
Expand All @@ -1213,7 +1221,7 @@ class SharedMemManager :
ids_segments_[id.get()] = segment_wrapper;
segments_mem_ += segment->mem_size();

SegmentWrapper::WatchTask::get().add_segment(segment_wrapper);
SegmentWrapper::WatchTask::get()->add_segment(segment_wrapper);
}

return segment;
Expand Down Expand Up @@ -1244,7 +1252,7 @@ class SharedMemManager :

for (auto segment : ids_segments_)
{
SegmentWrapper::WatchTask::get().remove_segment(segment.second);
SegmentWrapper::WatchTask::get()->remove_segment(segment.second);
}
}

Expand Down
20 changes: 10 additions & 10 deletions src/cpp/utils/shared_memory/SharedMemWatchdog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ class SharedMemWatchdog
virtual void run() = 0;
};

static SharedMemWatchdog& get()
static std::shared_ptr<SharedMemWatchdog>& get()
{
static SharedMemWatchdog watch_dog;
return watch_dog;
static std::shared_ptr<SharedMemWatchdog> watch_dog_instance(new SharedMemWatchdog());
return watch_dog_instance;
}

/**
Expand Down Expand Up @@ -79,6 +79,13 @@ class SharedMemWatchdog
return std::chrono::milliseconds(1000);
}

~SharedMemWatchdog()
{
exit_thread_ = true;
wake_up();
thread_run_.join();
}

private:

std::unordered_set<Task*> tasks_;
Expand All @@ -98,13 +105,6 @@ class SharedMemWatchdog
thread_run_ = std::thread(&SharedMemWatchdog::run, this);
}

~SharedMemWatchdog()
{
exit_thread_ = true;
wake_up();
thread_run_.join();
}

/**
* Forces Wake-up of the checking thread
*/
Expand Down
19 changes: 19 additions & 0 deletions test/unittest/dds/publisher/DataWriterTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,25 @@ TEST(DataWriterTests, SetListener)
ASSERT_TRUE(DomainParticipantFactory::get_instance()->delete_participant(participant) == ReturnCode_t::RETCODE_OK);
}

TEST(DataWriterTests, TerminateWithoutDestroyingWriter)
{
DomainParticipant* participant =
DomainParticipantFactory::get_instance()->create_participant(0, PARTICIPANT_QOS_DEFAULT);
ASSERT_NE(participant, nullptr);

Publisher* publisher = participant->create_publisher(PUBLISHER_QOS_DEFAULT);
ASSERT_NE(publisher, nullptr);

TypeSupport type(new TopicDataTypeMock());
type.register_type(participant);

Topic* topic = participant->create_topic("footopic", type.get_type_name(), TOPIC_QOS_DEFAULT);
ASSERT_NE(topic, nullptr);

DataWriter* datawriter = publisher->create_datawriter(topic, DATAWRITER_QOS_DEFAULT);
ASSERT_NE(datawriter, nullptr);
}

struct LoanableType
{
static constexpr uint32_t initialization_value()
Expand Down
12 changes: 12 additions & 0 deletions test/unittest/dds/subscriber/DataReaderTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ class DataReaderTests : public ::testing::Test

void TearDown() override
{
if (!destroy_entities_)
{
return;
}

if (data_writer_)
{
ASSERT_EQ(publisher_->delete_datawriter(data_writer_), ReturnCode_t::RETCODE_OK);
Expand Down Expand Up @@ -528,6 +533,7 @@ class DataReaderTests : public ::testing::Test
DataReader* data_reader_ = nullptr;
DataWriter* data_writer_ = nullptr;
TypeSupport type_;
bool destroy_entities_ = true;

InstanceHandle_t handle_ok_ = HANDLE_NIL;
InstanceHandle_t handle_wrong_ = HANDLE_NIL;
Expand Down Expand Up @@ -1272,6 +1278,12 @@ TEST_F(DataReaderTests, read_unread)
}
}

TEST_F(DataReaderTests, TerminateWithoutDestroyingReader)
{
destroy_entities_ = false;
create_entities();
}

void set_listener_test (
DataReader* reader,
DataReaderListener* listener,
Expand Down