diff --git a/src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp b/src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp index 517bcb53670..57136525c81 100644 --- a/src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp +++ b/src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp @@ -163,10 +163,10 @@ class SharedMemGlobal MultiProducerConsumerRingBuffer* buffer; }; - static WatchTask& get() + static const std::shared_ptr& get() { - static WatchTask watch_task; - return watch_task; + static std::shared_ptr watch_task_instance(new WatchTask()); + return watch_task_instance; } /** @@ -202,19 +202,23 @@ class SharedMemGlobal } + virtual ~WatchTask() + { + shared_mem_watchdog_->remove_task(this); + } + private: std::vector > watched_ports_; std::mutex watched_ports_mutex_; - WatchTask() - { - SharedMemWatchdog::get().add_task(this); - } + // Keep a reference to the SharedMemWatchdog so that it is not destroyed until this instance is destroyed + std::shared_ptr shared_mem_watchdog_; - ~WatchTask() + WatchTask() + : shared_mem_watchdog_(SharedMemWatchdog::get()) { - SharedMemWatchdog::get().remove_task(this); + shared_mem_watchdog_->add_task(this); } bool update_status_all_listeners( @@ -339,6 +343,9 @@ class SharedMemGlobal return (listeners_found == node_->num_listeners); } + // Keep a reference to the WatchTask so that it is not destroyed until the last Port instance is destroyed + std::shared_ptr watch_task_; + public: /** @@ -378,6 +385,7 @@ class SharedMemGlobal , node_(node) , overflows_count_(0) , read_exclusive_lock_(std::move(read_exclusive_lock)) + , watch_task_(WatchTask::get()) { auto buffer_base = static_cast::Cell*>( port_segment_->get_address_from_offset(node_->buffer)); @@ -392,12 +400,12 @@ class SharedMemGlobal auto port_context = std::make_shared(); *port_context = {port_segment_, node_, buffer_.get()}; - Port::WatchTask::get().add_port(std::move(port_context)); + Port::WatchTask::get()->add_port(std::move(port_context)); } ~Port() { - Port::WatchTask::get().remove_port(node_); + Port::WatchTask::get()->remove_port(node_); if (node_->ref_counter.fetch_sub(1) == 1) { diff --git a/src/cpp/rtps/transport/shared_mem/SharedMemManager.hpp b/src/cpp/rtps/transport/shared_mem/SharedMemManager.hpp index ca274c6c188..4a3f651f533 100644 --- a/src/cpp/rtps/transport/shared_mem/SharedMemManager.hpp +++ b/src/cpp/rtps/transport/shared_mem/SharedMemManager.hpp @@ -229,6 +229,7 @@ class SharedMemManager : const std::string& domain_name) : segments_mem_(0) , global_segment_(domain_name) + , watch_task_(SegmentWrapper::WatchTask::get()) { static_assert(std::alignment_of::value % 8 == 0, "SharedMemManager::BufferNode bad alignment"); @@ -999,10 +1000,10 @@ class SharedMemManager : { public: - static WatchTask& get() + static std::shared_ptr& get() { - static WatchTask watch_task; - return watch_task; + static std::shared_ptr watch_task_instance(new WatchTask()); + return watch_task_instance; } void add_segment( @@ -1023,6 +1024,11 @@ class SharedMemManager : to_remove_.push_back(segment); } + virtual ~WatchTask() + { + shared_mem_watchdog_->remove_task(this); + } + private: std::unordered_map, uint32_t> watched_segments_; @@ -1032,15 +1038,14 @@ class SharedMemManager : std::vector > to_add_; std::vector > to_remove_; + // Keep a reference to the SharedMemWatchdog so that it is not destroyed until this instance is destroyed + std::shared_ptr shared_mem_watchdog_; + WatchTask() : watched_it_(watched_segments_.end()) + , shared_mem_watchdog_(SharedMemWatchdog::get()) { - SharedMemWatchdog::get().add_task(this); - } - - ~WatchTask() - { - SharedMemWatchdog::get().remove_task(this); + shared_mem_watchdog_->add_task(this); } void update_watched_segments() @@ -1191,6 +1196,9 @@ class SharedMemManager : SharedMemGlobal global_segment_; + // Keep a reference to the WatchTask so that it is not destroyed until all Manger instances are destroyed + std::shared_ptr watch_task_; + std::shared_ptr find_segment( SharedMemSegment::Id id) { @@ -1212,7 +1220,7 @@ class SharedMemManager : ids_segments_[id.get()] = segment_wrapper; segments_mem_ += segment->mem_size(); - SegmentWrapper::WatchTask::get().add_segment(segment_wrapper); + SegmentWrapper::WatchTask::get()->add_segment(segment_wrapper); } return segment; @@ -1243,7 +1251,7 @@ class SharedMemManager : for (auto segment : ids_segments_) { - SegmentWrapper::WatchTask::get().remove_segment(segment.second); + SegmentWrapper::WatchTask::get()->remove_segment(segment.second); } } diff --git a/src/cpp/rtps/transport/shared_mem/SharedMemWatchdog.hpp b/src/cpp/rtps/transport/shared_mem/SharedMemWatchdog.hpp index d74e95191bc..c0e395bc812 100644 --- a/src/cpp/rtps/transport/shared_mem/SharedMemWatchdog.hpp +++ b/src/cpp/rtps/transport/shared_mem/SharedMemWatchdog.hpp @@ -40,10 +40,10 @@ class SharedMemWatchdog virtual void run() = 0; }; - static SharedMemWatchdog& get() + static std::shared_ptr& get() { - static SharedMemWatchdog watch_dog; - return watch_dog; + static std::shared_ptr watch_dog_instance(new SharedMemWatchdog()); + return watch_dog_instance; } /** @@ -79,6 +79,13 @@ class SharedMemWatchdog return std::chrono::milliseconds(1000); } + ~SharedMemWatchdog() + { + exit_thread_ = true; + wake_up(); + thread_run_.join(); + } + private: std::unordered_set tasks_; @@ -98,13 +105,6 @@ class SharedMemWatchdog thread_run_ = std::thread(&SharedMemWatchdog::run, this); } - ~SharedMemWatchdog() - { - exit_thread_ = true; - wake_up(); - thread_run_.join(); - } - /** * Forces Wake-up of the checking thread */