Skip to content

Commit

Permalink
Fix singleton destruction order (port of #1748)
Browse files Browse the repository at this point in the history
Signed-off-by: Iker Luengo <ikerluengo@eprosima.com>
  • Loading branch information
IkerLuengo committed Feb 12, 2021
1 parent 9aaa1e6 commit a5f3454
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 32 deletions.
30 changes: 19 additions & 11 deletions src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,10 @@ class SharedMemGlobal
MultiProducerConsumerRingBuffer<BufferDescriptor>* buffer;
};

static WatchTask& get()
static const std::shared_ptr<WatchTask>& get()
{
static WatchTask watch_task;
return watch_task;
static std::shared_ptr<WatchTask> watch_task_instance(new WatchTask());
return watch_task_instance;
}

/**
Expand Down Expand Up @@ -202,19 +202,23 @@ class SharedMemGlobal

}

virtual ~WatchTask()
{
shared_mem_watchdog_->remove_task(this);
}

private:

std::vector<std::shared_ptr<PortContext> > watched_ports_;
std::mutex watched_ports_mutex_;

WatchTask()
{
SharedMemWatchdog::get().add_task(this);
}
// Keep a reference to the SharedMemWatchdog so that it is not destroyed until this instance is destroyed
std::shared_ptr<SharedMemWatchdog> shared_mem_watchdog_;

~WatchTask()
WatchTask()
: shared_mem_watchdog_(SharedMemWatchdog::get())
{
SharedMemWatchdog::get().remove_task(this);
shared_mem_watchdog_->add_task(this);
}

bool update_status_all_listeners(
Expand Down Expand Up @@ -339,6 +343,9 @@ class SharedMemGlobal
return (listeners_found == node_->num_listeners);
}

// Keep a reference to the WatchTask so that it is not destroyed until the last Port instance is destroyed
std::shared_ptr<WatchTask> watch_task_;

public:

/**
Expand Down Expand Up @@ -378,6 +385,7 @@ class SharedMemGlobal
, node_(node)
, overflows_count_(0)
, read_exclusive_lock_(std::move(read_exclusive_lock))
, watch_task_(WatchTask::get())
{
auto buffer_base = static_cast<MultiProducerConsumerRingBuffer<BufferDescriptor>::Cell*>(
port_segment_->get_address_from_offset(node_->buffer));
Expand All @@ -392,12 +400,12 @@ class SharedMemGlobal

auto port_context = std::make_shared<Port::WatchTask::PortContext>();
*port_context = {port_segment_, node_, buffer_.get()};
Port::WatchTask::get().add_port(std::move(port_context));
Port::WatchTask::get()->add_port(std::move(port_context));
}

~Port()
{
Port::WatchTask::get().remove_port(node_);
Port::WatchTask::get()->remove_port(node_);

if (node_->ref_counter.fetch_sub(1) == 1)
{
Expand Down
30 changes: 19 additions & 11 deletions src/cpp/rtps/transport/shared_mem/SharedMemManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ class SharedMemManager :
const std::string& domain_name)
: segments_mem_(0)
, global_segment_(domain_name)
, watch_task_(SegmentWrapper::WatchTask::get())
{
static_assert(std::alignment_of<BufferNode>::value % 8 == 0, "SharedMemManager::BufferNode bad alignment");

Expand Down Expand Up @@ -999,10 +1000,10 @@ class SharedMemManager :
{
public:

static WatchTask& get()
static std::shared_ptr<WatchTask>& get()
{
static WatchTask watch_task;
return watch_task;
static std::shared_ptr<WatchTask> watch_task_instance(new WatchTask());
return watch_task_instance;
}

void add_segment(
Expand All @@ -1023,6 +1024,11 @@ class SharedMemManager :
to_remove_.push_back(segment);
}

virtual ~WatchTask()
{
shared_mem_watchdog_->remove_task(this);
}

private:

std::unordered_map<std::shared_ptr<SegmentWrapper>, uint32_t> watched_segments_;
Expand All @@ -1032,15 +1038,14 @@ class SharedMemManager :
std::vector<std::shared_ptr<SegmentWrapper> > to_add_;
std::vector<std::shared_ptr<SegmentWrapper> > to_remove_;

// Keep a reference to the SharedMemWatchdog so that it is not destroyed until this instance is destroyed
std::shared_ptr<SharedMemWatchdog> shared_mem_watchdog_;

WatchTask()
: watched_it_(watched_segments_.end())
, shared_mem_watchdog_(SharedMemWatchdog::get())
{
SharedMemWatchdog::get().add_task(this);
}

~WatchTask()
{
SharedMemWatchdog::get().remove_task(this);
shared_mem_watchdog_->add_task(this);
}

void update_watched_segments()
Expand Down Expand Up @@ -1191,6 +1196,9 @@ class SharedMemManager :

SharedMemGlobal global_segment_;

// Keep a reference to the WatchTask so that it is not destroyed until all Manger instances are destroyed
std::shared_ptr<SegmentWrapper::WatchTask> watch_task_;

std::shared_ptr<SharedMemSegment> find_segment(
SharedMemSegment::Id id)
{
Expand All @@ -1212,7 +1220,7 @@ class SharedMemManager :
ids_segments_[id.get()] = segment_wrapper;
segments_mem_ += segment->mem_size();

SegmentWrapper::WatchTask::get().add_segment(segment_wrapper);
SegmentWrapper::WatchTask::get()->add_segment(segment_wrapper);
}

return segment;
Expand Down Expand Up @@ -1243,7 +1251,7 @@ class SharedMemManager :

for (auto segment : ids_segments_)
{
SegmentWrapper::WatchTask::get().remove_segment(segment.second);
SegmentWrapper::WatchTask::get()->remove_segment(segment.second);
}
}

Expand Down
20 changes: 10 additions & 10 deletions src/cpp/rtps/transport/shared_mem/SharedMemWatchdog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ class SharedMemWatchdog
virtual void run() = 0;
};

static SharedMemWatchdog& get()
static std::shared_ptr<SharedMemWatchdog>& get()
{
static SharedMemWatchdog watch_dog;
return watch_dog;
static std::shared_ptr<SharedMemWatchdog> watch_dog_instance(new SharedMemWatchdog());
return watch_dog_instance;
}

/**
Expand Down Expand Up @@ -79,6 +79,13 @@ class SharedMemWatchdog
return std::chrono::milliseconds(1000);
}

~SharedMemWatchdog()
{
exit_thread_ = true;
wake_up();
thread_run_.join();
}

private:

std::unordered_set<Task*> tasks_;
Expand All @@ -98,13 +105,6 @@ class SharedMemWatchdog
thread_run_ = std::thread(&SharedMemWatchdog::run, this);
}

~SharedMemWatchdog()
{
exit_thread_ = true;
wake_up();
thread_run_.join();
}

/**
* Forces Wake-up of the checking thread
*/
Expand Down

0 comments on commit a5f3454

Please sign in to comment.