Skip to content

Commit

Permalink
Fix singleton destruction order (port of #1748)
Browse files Browse the repository at this point in the history
Signed-off-by: Iker Luengo <ikerluengo@eprosima.com>
  • Loading branch information
IkerLuengo committed Feb 12, 2021
1 parent 3da0cba commit d0d8ad9
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 38 deletions.
6 changes: 6 additions & 0 deletions src/cpp/fastdds/domain/DomainParticipantFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
#include <fastrtps/types/DynamicDataFactory.h>
#include <fastrtps/types/TypeObjectFactory.h>

#include <rtps/history/TopicPayloadPoolRegistry.hpp>

using namespace eprosima::fastrtps::xmlparser;

using eprosima::fastrtps::ParticipantAttributes;
Expand Down Expand Up @@ -97,6 +99,10 @@ DomainParticipantFactory::~DomainParticipantFactory()

DomainParticipantFactory* DomainParticipantFactory::get_instance()
{
// Keep a reference to the topic payload pool to avoid it to be destroyed before our own instance
using pool_registry_ref = eprosima::fastrtps::rtps::TopicPayloadPoolRegistry::reference;
static pool_registry_ref topic_pool_registry = eprosima::fastrtps::rtps::TopicPayloadPoolRegistry::instance();

static DomainParticipantFactory instance;
return &instance;
}
Expand Down
9 changes: 7 additions & 2 deletions src/cpp/rtps/history/TopicPayloadPoolRegistry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,16 @@ namespace eprosima {
namespace fastrtps {
namespace rtps {

const TopicPayloadPoolRegistry::reference& TopicPayloadPoolRegistry::instance()
{
return detail::TopicPayloadPoolRegistry::instance();
}

std::shared_ptr<ITopicPayloadPool> TopicPayloadPoolRegistry::get(
const std::string& topic_name,
const BasicPoolConfig& config)
{
return detail::TopicPayloadPoolRegistry::instance().get(topic_name, config);
return detail::TopicPayloadPoolRegistry::instance()->get(topic_name, config);
}

void TopicPayloadPoolRegistry::release(
Expand All @@ -43,7 +48,7 @@ void TopicPayloadPoolRegistry::release(

if (topic_pool)
{
detail::TopicPayloadPoolRegistry::instance().release(topic_pool);
detail::TopicPayloadPoolRegistry::instance()->release(topic_pool);
}
}

Expand Down
8 changes: 8 additions & 0 deletions src/cpp/rtps/history/TopicPayloadPoolRegistry.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,19 @@ namespace eprosima {
namespace fastrtps {
namespace rtps {

namespace detail {
class TopicPayloadPoolRegistry;
} // namespace detail

class TopicPayloadPoolRegistry
{

public:

using reference = std::shared_ptr<detail::TopicPayloadPoolRegistry>;

static const reference& instance();

static std::shared_ptr<ITopicPayloadPool> get(
const std::string& topic_name,
const BasicPoolConfig& config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,10 @@ class TopicPayloadPoolRegistry

public:

~TopicPayloadPoolRegistry() = default;

/// @return reference to singleton instance
static TopicPayloadPoolRegistry& instance()
static const std::shared_ptr<TopicPayloadPoolRegistry>& instance()
{
static TopicPayloadPoolRegistry pool_registry_instance;
static std::shared_ptr<TopicPayloadPoolRegistry> pool_registry_instance(new TopicPayloadPoolRegistry());
return pool_registry_instance;
}

Expand Down
30 changes: 19 additions & 11 deletions src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,10 @@ class SharedMemGlobal
MultiProducerConsumerRingBuffer<BufferDescriptor>* buffer;
};

static WatchTask& get()
static const std::shared_ptr<WatchTask>& get()
{
static WatchTask watch_task;
return watch_task;
static std::shared_ptr<WatchTask> watch_task_instance(new WatchTask());
return watch_task_instance;
}

/**
Expand Down Expand Up @@ -202,19 +202,23 @@ class SharedMemGlobal

}

virtual ~WatchTask()
{
shared_mem_watchdog_->remove_task(this);
}

private:

std::vector<std::shared_ptr<PortContext> > watched_ports_;
std::mutex watched_ports_mutex_;

WatchTask()
{
SharedMemWatchdog::get().add_task(this);
}
// Keep a reference to the SharedMemWatchdog so that it is not destroyed until this instance is destroyed
std::shared_ptr<SharedMemWatchdog> shared_mem_watchdog_;

~WatchTask()
WatchTask()
: shared_mem_watchdog_(SharedMemWatchdog::get())
{
SharedMemWatchdog::get().remove_task(this);
shared_mem_watchdog_->add_task(this);
}

bool update_status_all_listeners(
Expand Down Expand Up @@ -339,6 +343,9 @@ class SharedMemGlobal
return (listeners_found == node_->num_listeners);
}

// Keep a reference to the WatchTask so that it is not destroyed until the last Port instance is destroyed
std::shared_ptr<WatchTask> watch_task_;

public:

/**
Expand Down Expand Up @@ -378,6 +385,7 @@ class SharedMemGlobal
, node_(node)
, overflows_count_(0)
, read_exclusive_lock_(std::move(read_exclusive_lock))
, watch_task_(WatchTask::get())
{
auto buffer_base = static_cast<MultiProducerConsumerRingBuffer<BufferDescriptor>::Cell*>(
port_segment_->get_address_from_offset(node_->buffer));
Expand All @@ -392,12 +400,12 @@ class SharedMemGlobal

auto port_context = std::make_shared<Port::WatchTask::PortContext>();
*port_context = {port_segment_, node_, buffer_.get()};
Port::WatchTask::get().add_port(std::move(port_context));
Port::WatchTask::get()->add_port(std::move(port_context));
}

~Port()
{
Port::WatchTask::get().remove_port(node_);
Port::WatchTask::get()->remove_port(node_);

if (node_->ref_counter.fetch_sub(1) == 1)
{
Expand Down
30 changes: 19 additions & 11 deletions src/cpp/rtps/transport/shared_mem/SharedMemManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ class SharedMemManager :
const std::string& domain_name)
: segments_mem_(0)
, global_segment_(domain_name)
, watch_task_(SegmentWrapper::WatchTask::get())
{
static_assert(std::alignment_of<BufferNode>::value % 8 == 0, "SharedMemManager::BufferNode bad alignment");

Expand Down Expand Up @@ -999,10 +1000,10 @@ class SharedMemManager :
{
public:

static WatchTask& get()
static std::shared_ptr<WatchTask>& get()
{
static WatchTask watch_task;
return watch_task;
static std::shared_ptr<WatchTask> watch_task_instance(new WatchTask());
return watch_task_instance;
}

void add_segment(
Expand All @@ -1023,6 +1024,11 @@ class SharedMemManager :
to_remove_.push_back(segment);
}

virtual ~WatchTask()
{
shared_mem_watchdog_->remove_task(this);
}

private:

std::unordered_map<std::shared_ptr<SegmentWrapper>, uint32_t> watched_segments_;
Expand All @@ -1032,15 +1038,14 @@ class SharedMemManager :
std::vector<std::shared_ptr<SegmentWrapper> > to_add_;
std::vector<std::shared_ptr<SegmentWrapper> > to_remove_;

// Keep a reference to the SharedMemWatchdog so that it is not destroyed until this instance is destroyed
std::shared_ptr<SharedMemWatchdog> shared_mem_watchdog_;

WatchTask()
: watched_it_(watched_segments_.end())
, shared_mem_watchdog_(SharedMemWatchdog::get())
{
SharedMemWatchdog::get().add_task(this);
}

~WatchTask()
{
SharedMemWatchdog::get().remove_task(this);
shared_mem_watchdog_->add_task(this);
}

void update_watched_segments()
Expand Down Expand Up @@ -1191,6 +1196,9 @@ class SharedMemManager :

SharedMemGlobal global_segment_;

// Keep a reference to the WatchTask so that it is not destroyed until all Manger instances are destroyed
std::shared_ptr<SegmentWrapper::WatchTask> watch_task_;

std::shared_ptr<SharedMemSegment> find_segment(
SharedMemSegment::Id id)
{
Expand All @@ -1212,7 +1220,7 @@ class SharedMemManager :
ids_segments_[id.get()] = segment_wrapper;
segments_mem_ += segment->mem_size();

SegmentWrapper::WatchTask::get().add_segment(segment_wrapper);
SegmentWrapper::WatchTask::get()->add_segment(segment_wrapper);
}

return segment;
Expand Down Expand Up @@ -1243,7 +1251,7 @@ class SharedMemManager :

for (auto segment : ids_segments_)
{
SegmentWrapper::WatchTask::get().remove_segment(segment.second);
SegmentWrapper::WatchTask::get()->remove_segment(segment.second);
}
}

Expand Down
20 changes: 10 additions & 10 deletions src/cpp/rtps/transport/shared_mem/SharedMemWatchdog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ class SharedMemWatchdog
virtual void run() = 0;
};

static SharedMemWatchdog& get()
static std::shared_ptr<SharedMemWatchdog>& get()
{
static SharedMemWatchdog watch_dog;
return watch_dog;
static std::shared_ptr<SharedMemWatchdog> watch_dog_instance(new SharedMemWatchdog());
return watch_dog_instance;
}

/**
Expand Down Expand Up @@ -79,6 +79,13 @@ class SharedMemWatchdog
return std::chrono::milliseconds(1000);
}

~SharedMemWatchdog()
{
exit_thread_ = true;
wake_up();
thread_run_.join();
}

private:

std::unordered_set<Task*> tasks_;
Expand All @@ -98,13 +105,6 @@ class SharedMemWatchdog
thread_run_ = std::thread(&SharedMemWatchdog::run, this);
}

~SharedMemWatchdog()
{
exit_thread_ = true;
wake_up();
thread_run_.join();
}

/**
* Forces Wake-up of the checking thread
*/
Expand Down
19 changes: 19 additions & 0 deletions test/unittest/dds/publisher/DataWriterTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,25 @@ TEST(DataWriterTests, SetListener)
ASSERT_TRUE(DomainParticipantFactory::get_instance()->delete_participant(participant) == ReturnCode_t::RETCODE_OK);
}

TEST(DataWriterTests, TerminateWithoutDestroyingWriter)
{
DomainParticipant* participant =
DomainParticipantFactory::get_instance()->create_participant(0, PARTICIPANT_QOS_DEFAULT);
ASSERT_NE(participant, nullptr);

Publisher* publisher = participant->create_publisher(PUBLISHER_QOS_DEFAULT);
ASSERT_NE(publisher, nullptr);

TypeSupport type(new TopicDataTypeMock());
type.register_type(participant);

Topic* topic = participant->create_topic("footopic", type.get_type_name(), TOPIC_QOS_DEFAULT);
ASSERT_NE(topic, nullptr);

DataWriter* datawriter = publisher->create_datawriter(topic, DATAWRITER_QOS_DEFAULT);
ASSERT_NE(datawriter, nullptr);
}

} // namespace dds
} // namespace fastdds
} // namespace eprosima
Expand Down
21 changes: 21 additions & 0 deletions test/unittest/dds/subscriber/DataReaderTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,27 @@ TEST(DataReaderTests, ReadData)



TEST(DataReaderTests, TerminateWithoutDestroyingReader)
{
DomainParticipant* participant =
DomainParticipantFactory::get_instance()->create_participant(0, PARTICIPANT_QOS_DEFAULT);
ASSERT_NE(participant, nullptr);

Subscriber* subscriber = participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT);
ASSERT_NE(subscriber, nullptr);

TypeSupport type(new TopicDataTypeMock());
type.register_type(participant);

Topic* topic = participant->create_topic("footopic", type.get_type_name(), TOPIC_QOS_DEFAULT);
ASSERT_NE(topic, nullptr);

DataReader* data_reader = subscriber->create_datareader(topic, DATAREADER_QOS_DEFAULT);
ASSERT_NE(data_reader, nullptr);

// Do not destroy entities
}

void set_listener_test (
DataReader* reader,
DataReaderListener* listener,
Expand Down

0 comments on commit d0d8ad9

Please sign in to comment.