diff --git a/fml/concurrent_message_loop.cc b/fml/concurrent_message_loop.cc index f553654a1195f..b8d78ba5a2b63 100644 --- a/fml/concurrent_message_loop.cc +++ b/fml/concurrent_message_loop.cc @@ -11,14 +11,15 @@ namespace fml { -std::shared_ptr ConcurrentMessageLoop::Create( +std::unique_ptr ConcurrentMessageLoop::Create( size_t worker_count) { - return std::shared_ptr{ + return std::unique_ptr{ new ConcurrentMessageLoop(worker_count)}; } ConcurrentMessageLoop::ConcurrentMessageLoop(size_t worker_count) - : worker_count_(std::max(worker_count, 1ul)) { + : worker_count_(std::max(worker_count, 1ul)), + task_runner_(new ConcurrentTaskRunner(this)) { for (size_t i = 0; i < worker_count_; ++i) { workers_.emplace_back([i, this]() { fml::Thread::SetCurrentThreadName(fml::Thread::ThreadConfig( @@ -33,6 +34,10 @@ ConcurrentMessageLoop::ConcurrentMessageLoop(size_t worker_count) } ConcurrentMessageLoop::~ConcurrentMessageLoop() { + { + std::scoped_lock lock(task_runner_->weak_loop_mutex_); + task_runner_->weak_loop_ = nullptr; + } Terminate(); for (auto& worker : workers_) { worker.join(); @@ -43,33 +48,18 @@ size_t ConcurrentMessageLoop::GetWorkerCount() const { return worker_count_; } -std::shared_ptr ConcurrentMessageLoop::GetTaskRunner() { - return std::make_shared(weak_from_this()); -} - void ConcurrentMessageLoop::PostTask(const fml::closure& task) { if (!task) { return; } - std::unique_lock lock(tasks_mutex_); - - // Don't just drop tasks on the floor in case of shutdown. - if (shutdown_) { - FML_DLOG(WARNING) - << "Tried to post a task to shutdown concurrent message " - "loop. The task will be executed on the callers thread."; - lock.unlock(); - task(); - return; - } - - tasks_.push(task); - // Unlock the mutex before notifying the condition variable because that mutex // has to be acquired on the other thread anyway. Waiting in this scope till // it is acquired there is a pessimization. - lock.unlock(); + { + std::unique_lock lock(tasks_mutex_); + tasks_.push(task); + } tasks_condition_.notify_one(); } @@ -148,9 +138,8 @@ std::vector ConcurrentMessageLoop::GetThreadTasksLocked() { return pending_tasks; } -ConcurrentTaskRunner::ConcurrentTaskRunner( - std::weak_ptr weak_loop) - : weak_loop_(std::move(weak_loop)) {} +ConcurrentTaskRunner::ConcurrentTaskRunner(ConcurrentMessageLoop* weak_loop) + : weak_loop_(weak_loop) {} ConcurrentTaskRunner::~ConcurrentTaskRunner() = default; @@ -159,15 +148,12 @@ void ConcurrentTaskRunner::PostTask(const fml::closure& task) { return; } - if (auto loop = weak_loop_.lock()) { - loop->PostTask(task); - return; + { + std::scoped_lock lock(weak_loop_mutex_); + if (weak_loop_) { + weak_loop_->PostTask(task); + } } - - FML_DLOG(WARNING) - << "Tried to post to a concurrent message loop that has already died. " - "Executing the task on the callers thread."; - task(); } bool ConcurrentMessageLoop::RunsTasksOnCurrentThread() { diff --git a/fml/concurrent_message_loop.h b/fml/concurrent_message_loop.h index ab8534b6c946a..70d16830e26df 100644 --- a/fml/concurrent_message_loop.h +++ b/fml/concurrent_message_loop.h @@ -18,17 +18,17 @@ namespace fml { class ConcurrentTaskRunner; -class ConcurrentMessageLoop - : public std::enable_shared_from_this { +// This class is final for the logic in ~ConcurrentMessageLoop(). +class ConcurrentMessageLoop final { public: - static std::shared_ptr Create( + static std::unique_ptr Create( size_t worker_count = std::thread::hardware_concurrency()); ~ConcurrentMessageLoop(); size_t GetWorkerCount() const; - std::shared_ptr GetTaskRunner(); + std::shared_ptr GetTaskRunner() { return task_runner_; } void Terminate(); @@ -47,6 +47,7 @@ class ConcurrentMessageLoop std::vector worker_thread_ids_; std::map> thread_tasks_; bool shutdown_ = false; + std::shared_ptr task_runner_; explicit ConcurrentMessageLoop(size_t worker_count); @@ -63,7 +64,7 @@ class ConcurrentMessageLoop class ConcurrentTaskRunner : public BasicTaskRunner { public: - explicit ConcurrentTaskRunner(std::weak_ptr weak_loop); + explicit ConcurrentTaskRunner(ConcurrentMessageLoop* weak_loop); virtual ~ConcurrentTaskRunner(); @@ -72,7 +73,9 @@ class ConcurrentTaskRunner : public BasicTaskRunner { private: friend ConcurrentMessageLoop; - std::weak_ptr weak_loop_; + std::mutex weak_loop_mutex_; + // Raw pointer that is cleared out in ~ConcurrentMessageLoop. + ConcurrentMessageLoop* weak_loop_ FML_GUARDED_BY(weak_loop_mutex_); FML_DISALLOW_COPY_AND_ASSIGN(ConcurrentTaskRunner); }; diff --git a/fml/macros.h b/fml/macros.h index 0158d0e18c73e..beb9bc3bab68d 100644 --- a/fml/macros.h +++ b/fml/macros.h @@ -38,4 +38,12 @@ TypeName() = delete; \ FML_DISALLOW_COPY_ASSIGN_AND_MOVE(TypeName) +#if defined(__clang__) +#define FML_THREAD_ANNOTATION_ATTRIBUTE__(x) __attribute__((x)) +#else +#define FML_THREAD_ANNOTATION_ATTRIBUTE__(x) // no-op +#endif + +#define FML_GUARDED_BY(x) FML_THREAD_ANNOTATION_ATTRIBUTE__(guarded_by(x)) + #endif // FLUTTER_FML_MACROS_H_ diff --git a/impeller/playground/backend/vulkan/playground_impl_vk.cc b/impeller/playground/backend/vulkan/playground_impl_vk.cc index 84a2d50700228..3e340fcc4bdf4 100644 --- a/impeller/playground/backend/vulkan/playground_impl_vk.cc +++ b/impeller/playground/backend/vulkan/playground_impl_vk.cc @@ -49,9 +49,7 @@ void PlaygroundImplVK::DestroyWindowHandle(WindowHandle handle) { } PlaygroundImplVK::PlaygroundImplVK(PlaygroundSwitches switches) - : PlaygroundImpl(switches), - concurrent_loop_(fml::ConcurrentMessageLoop::Create()), - handle_(nullptr, &DestroyWindowHandle) { + : PlaygroundImpl(switches), handle_(nullptr, &DestroyWindowHandle) { if (!::glfwVulkanSupported()) { #ifdef TARGET_OS_MAC VALIDATION_LOG << "Attempted to initialize a Vulkan playground on macOS " @@ -83,7 +81,8 @@ PlaygroundImplVK::PlaygroundImplVK(PlaygroundSwitches switches) &::glfwGetInstanceProcAddress); context_settings.shader_libraries_data = ShaderLibraryMappingsForPlayground(); context_settings.cache_directory = fml::paths::GetCachesDirectory(); - context_settings.worker_task_runner = concurrent_loop_->GetTaskRunner(); + context_settings.worker_concurrent_loop = + fml::ConcurrentMessageLoop::Create(); context_settings.enable_validation = switches_.enable_vulkan_validation; auto context = ContextVK::Create(std::move(context_settings)); diff --git a/impeller/playground/backend/vulkan/playground_impl_vk.h b/impeller/playground/backend/vulkan/playground_impl_vk.h index bb0a7842a18c7..96ce4f956a0c0 100644 --- a/impeller/playground/backend/vulkan/playground_impl_vk.h +++ b/impeller/playground/backend/vulkan/playground_impl_vk.h @@ -18,7 +18,7 @@ class PlaygroundImplVK final : public PlaygroundImpl { ~PlaygroundImplVK(); private: - std::shared_ptr concurrent_loop_; + std::unique_ptr concurrent_loop_; std::shared_ptr context_; // Windows management. diff --git a/impeller/renderer/backend/vulkan/context_vk.cc b/impeller/renderer/backend/vulkan/context_vk.cc index d41ccd0a5fa5a..f6a0ee798c4db 100644 --- a/impeller/renderer/backend/vulkan/context_vk.cc +++ b/impeller/renderer/backend/vulkan/context_vk.cc @@ -111,6 +111,10 @@ ContextVK::~ContextVK() { [[maybe_unused]] auto result = device_->waitIdle(); } CommandPoolVK::ClearAllPools(this); + // Delete `worker_message_loop_` to ensure that ~ConcurrentMessageLoop() is + // executed before the instance variables are deleted. This will synchronize + // on joining ConcurrentMessageLoop's threads. + worker_message_loop_.reset(); } void ContextVK::Setup(Settings settings) { @@ -305,11 +309,12 @@ void ContextVK::Setup(Settings settings) { //---------------------------------------------------------------------------- /// Setup the pipeline library. /// + worker_message_loop_ = std::move(settings.worker_concurrent_loop); auto pipeline_library = std::shared_ptr( - new PipelineLibraryVK(device.value.get(), // - caps, // - std::move(settings.cache_directory), // - settings.worker_task_runner // + new PipelineLibraryVK(device.value.get(), // + caps, // + std::move(settings.cache_directory), // + worker_message_loop_->GetTaskRunner() // )); if (!pipeline_library->IsValid()) { diff --git a/impeller/renderer/backend/vulkan/context_vk.h b/impeller/renderer/backend/vulkan/context_vk.h index cc1376649bbbd..e302547ebcc2b 100644 --- a/impeller/renderer/backend/vulkan/context_vk.h +++ b/impeller/renderer/backend/vulkan/context_vk.h @@ -36,7 +36,7 @@ class ContextVK final : public Context, public BackendCast { PFN_vkGetInstanceProcAddr proc_address_callback = nullptr; std::vector> shader_libraries_data; fml::UniqueFD cache_directory; - std::shared_ptr worker_task_runner; + std::unique_ptr worker_concurrent_loop; bool enable_validation = false; Settings() = default; @@ -137,6 +137,7 @@ class ContextVK final : public Context, public BackendCast { std::shared_ptr fence_waiter_; std::string device_name_; const uint64_t hash_; + std::unique_ptr worker_message_loop_; bool is_valid_ = false; diff --git a/impeller/renderer/backend/vulkan/test/mock_vulkan.cc b/impeller/renderer/backend/vulkan/test/mock_vulkan.cc index 100b36b1cb175..4c0f8861c10b4 100644 --- a/impeller/renderer/backend/vulkan/test/mock_vulkan.cc +++ b/impeller/renderer/backend/vulkan/test/mock_vulkan.cc @@ -272,9 +272,7 @@ PFN_vkVoidFunction GetMockVulkanProcAddress(VkInstance instance, std::shared_ptr CreateMockVulkanContext(void) { ContextVK::Settings settings; - auto message_loop = fml::ConcurrentMessageLoop::Create(); - settings.worker_task_runner = - std::make_shared(message_loop); + settings.worker_concurrent_loop = fml::ConcurrentMessageLoop::Create(); settings.proc_address_callback = GetMockVulkanProcAddress; return ContextVK::Create(std::move(settings)); } diff --git a/shell/platform/android/android_surface_vulkan_impeller.cc b/shell/platform/android/android_surface_vulkan_impeller.cc index 0e5ac734e3b89..8833da717fd48 100644 --- a/shell/platform/android/android_surface_vulkan_impeller.cc +++ b/shell/platform/android/android_surface_vulkan_impeller.cc @@ -22,7 +22,7 @@ namespace flutter { static std::shared_ptr CreateImpellerContext( const fml::RefPtr& proc_table, - const std::shared_ptr& concurrent_loop, + std::unique_ptr concurrent_loop, bool enable_vulkan_validation) { std::vector> shader_mappings = { std::make_shared(impeller_entity_shaders_vk_data, @@ -40,7 +40,7 @@ static std::shared_ptr CreateImpellerContext( settings.proc_address_callback = instance_proc_addr; settings.shader_libraries_data = std::move(shader_mappings); settings.cache_directory = fml::paths::GetCachesDirectory(); - settings.worker_task_runner = concurrent_loop->GetTaskRunner(); + settings.worker_concurrent_loop = std::move(concurrent_loop); settings.enable_validation = enable_vulkan_validation; return impeller::ContextVK::Create(std::move(settings)); } @@ -50,10 +50,10 @@ AndroidSurfaceVulkanImpeller::AndroidSurfaceVulkanImpeller( const std::shared_ptr& jni_facade, bool enable_vulkan_validation) : AndroidSurface(android_context), - proc_table_(fml::MakeRefCounted()), - workers_(fml::ConcurrentMessageLoop::Create()) { + proc_table_(fml::MakeRefCounted()) { impeller_context_ = - CreateImpellerContext(proc_table_, workers_, enable_vulkan_validation); + CreateImpellerContext(proc_table_, fml::ConcurrentMessageLoop::Create(), + enable_vulkan_validation); is_valid_ = proc_table_->HasAcquiredMandatoryProcAddresses() && impeller_context_; } diff --git a/shell/platform/android/android_surface_vulkan_impeller.h b/shell/platform/android/android_surface_vulkan_impeller.h index 3fff43d9d974c..145fe24f7610f 100644 --- a/shell/platform/android/android_surface_vulkan_impeller.h +++ b/shell/platform/android/android_surface_vulkan_impeller.h @@ -50,7 +50,6 @@ class AndroidSurfaceVulkanImpeller : public AndroidSurface { private: fml::RefPtr proc_table_; fml::RefPtr native_window_; - std::shared_ptr workers_; std::shared_ptr impeller_context_; bool is_valid_ = false;