Skip to content
This repository was archived by the owner on Feb 25, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 19 additions & 33 deletions fml/concurrent_message_loop.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@

namespace fml {

std::shared_ptr<ConcurrentMessageLoop> ConcurrentMessageLoop::Create(
std::unique_ptr<ConcurrentMessageLoop> ConcurrentMessageLoop::Create(
size_t worker_count) {
return std::shared_ptr<ConcurrentMessageLoop>{
return std::unique_ptr<ConcurrentMessageLoop>{
new ConcurrentMessageLoop(worker_count)};
}

ConcurrentMessageLoop::ConcurrentMessageLoop(size_t worker_count)
: worker_count_(std::max<size_t>(worker_count, 1ul)) {
: worker_count_(std::max<size_t>(worker_count, 1ul)),
task_runner_(new ConcurrentTaskRunner(this)) {
for (size_t i = 0; i < worker_count_; ++i) {
workers_.emplace_back([i, this]() {
fml::Thread::SetCurrentThreadName(fml::Thread::ThreadConfig(
Expand All @@ -33,6 +34,10 @@ ConcurrentMessageLoop::ConcurrentMessageLoop(size_t worker_count)
}

ConcurrentMessageLoop::~ConcurrentMessageLoop() {
{
std::scoped_lock lock(task_runner_->weak_loop_mutex_);
task_runner_->weak_loop_ = nullptr;
}
Terminate();
for (auto& worker : workers_) {
worker.join();
Expand All @@ -43,33 +48,18 @@ size_t ConcurrentMessageLoop::GetWorkerCount() const {
return worker_count_;
}

std::shared_ptr<ConcurrentTaskRunner> ConcurrentMessageLoop::GetTaskRunner() {
return std::make_shared<ConcurrentTaskRunner>(weak_from_this());
}

void ConcurrentMessageLoop::PostTask(const fml::closure& task) {
if (!task) {
return;
}

std::unique_lock lock(tasks_mutex_);

// Don't just drop tasks on the floor in case of shutdown.
if (shutdown_) {
FML_DLOG(WARNING)
<< "Tried to post a task to shutdown concurrent message "
"loop. The task will be executed on the callers thread.";
lock.unlock();
task();
return;
}
Comment on lines -57 to -65
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tasks could always be dropped when shutting down a ConcurrentMessageLoop, there was no guarantee that they would all be executed despite a few places that attempted to make that a thing. Specifically if more than N tasks were queued up and the loop was deleted, task[N+1, ..] would be dropped.

Dropping late tasks matches he behavior elsewhere in the engine (for example platform channel messages).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mainly notes for myself:

  • This is ok because we don't give these workers to Dart. We only use them internally for SKP warmup and for image decoding.


tasks_.push(task);

// Unlock the mutex before notifying the condition variable because that mutex
// has to be acquired on the other thread anyway. Waiting in this scope till
// it is acquired there is a pessimization.
lock.unlock();
{
std::unique_lock lock(tasks_mutex_);
tasks_.push(task);
}

tasks_condition_.notify_one();
}
Expand Down Expand Up @@ -148,9 +138,8 @@ std::vector<fml::closure> ConcurrentMessageLoop::GetThreadTasksLocked() {
return pending_tasks;
}

ConcurrentTaskRunner::ConcurrentTaskRunner(
std::weak_ptr<ConcurrentMessageLoop> weak_loop)
: weak_loop_(std::move(weak_loop)) {}
ConcurrentTaskRunner::ConcurrentTaskRunner(ConcurrentMessageLoop* weak_loop)
: weak_loop_(weak_loop) {}

ConcurrentTaskRunner::~ConcurrentTaskRunner() = default;

Expand All @@ -159,15 +148,12 @@ void ConcurrentTaskRunner::PostTask(const fml::closure& task) {
return;
}

if (auto loop = weak_loop_.lock()) {
loop->PostTask(task);
return;
{
std::scoped_lock lock(weak_loop_mutex_);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This adds a small amount of overhead compared to the shared_ptr, but we could share the tasks_mutex_ if we wanted to add extra complexity.

if (weak_loop_) {
weak_loop_->PostTask(task);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There may be undefined behavior if a method like ConcurrentMessageLoop::PostTask is called on one thread after the object has begun deletion on a different thread.

At the point where this call happens, another thread may have already dropped the last reference to the ConcurrentTaskLoop, invoked delete, and called into the ConcurrentTaskLoop destructor.

The ConcurrentTaskLoop destructor will then try to acquire the weak_loop_mutex_, so the destructor will not make any further progress before the ConcurrentMessageLoop::PostTask call completes. However, I don't think it's guaranteed to be safe to execute the ConcurrentMessageLoop::PostTask call while the ConcurrentMessageLoop is in that state.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There may be undefined behavior if a method like ConcurrentMessageLoop::PostTask is called on one thread after the object has begun deletion on a different thread.

Yep, good observation. In order for it to get undefined behavior ConcurrentTaskLoop would need virtual methods and a subclass. That would mean that the subclass could already have been deallocated while another thread is executing PostTask. ConcurrentTaskLoop doesn't have virtual methods but that's easy to add and introduce undefined behavior on accident so I've made the class final to avoid this.

The order of operations of deallocation is well defined. At the point where we are clearing out the reference the ConcurrentMessageLoop instance variables will still be valid and the memory for this will still be around. Since it is final we can know this is the very first thing that will be called.

Let me know if you you think we are still missing something.

}
}

FML_DLOG(WARNING)
<< "Tried to post to a concurrent message loop that has already died. "
"Executing the task on the callers thread.";
task();
}

bool ConcurrentMessageLoop::RunsTasksOnCurrentThread() {
Expand Down
15 changes: 9 additions & 6 deletions fml/concurrent_message_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@ namespace fml {

class ConcurrentTaskRunner;

class ConcurrentMessageLoop
: public std::enable_shared_from_this<ConcurrentMessageLoop> {
// This class is final for the logic in ~ConcurrentMessageLoop().
class ConcurrentMessageLoop final {
public:
static std::shared_ptr<ConcurrentMessageLoop> Create(
static std::unique_ptr<ConcurrentMessageLoop> Create(
size_t worker_count = std::thread::hardware_concurrency());

~ConcurrentMessageLoop();

size_t GetWorkerCount() const;

std::shared_ptr<ConcurrentTaskRunner> GetTaskRunner();
std::shared_ptr<ConcurrentTaskRunner> GetTaskRunner() { return task_runner_; }

void Terminate();

Expand All @@ -47,6 +47,7 @@ class ConcurrentMessageLoop
std::vector<std::thread::id> worker_thread_ids_;
std::map<std::thread::id, std::vector<fml::closure>> thread_tasks_;
bool shutdown_ = false;
std::shared_ptr<ConcurrentTaskRunner> task_runner_;

explicit ConcurrentMessageLoop(size_t worker_count);

Expand All @@ -63,7 +64,7 @@ class ConcurrentMessageLoop

class ConcurrentTaskRunner : public BasicTaskRunner {
public:
explicit ConcurrentTaskRunner(std::weak_ptr<ConcurrentMessageLoop> weak_loop);
explicit ConcurrentTaskRunner(ConcurrentMessageLoop* weak_loop);

virtual ~ConcurrentTaskRunner();

Expand All @@ -72,7 +73,9 @@ class ConcurrentTaskRunner : public BasicTaskRunner {
private:
friend ConcurrentMessageLoop;

std::weak_ptr<ConcurrentMessageLoop> weak_loop_;
std::mutex weak_loop_mutex_;
// Raw pointer that is cleared out in ~ConcurrentMessageLoop.
ConcurrentMessageLoop* weak_loop_ FML_GUARDED_BY(weak_loop_mutex_);

FML_DISALLOW_COPY_AND_ASSIGN(ConcurrentTaskRunner);
};
Expand Down
8 changes: 8 additions & 0 deletions fml/macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,12 @@
TypeName() = delete; \
FML_DISALLOW_COPY_ASSIGN_AND_MOVE(TypeName)

#if defined(__clang__)
#define FML_THREAD_ANNOTATION_ATTRIBUTE__(x) __attribute__((x))
#else
#define FML_THREAD_ANNOTATION_ATTRIBUTE__(x) // no-op
#endif

#define FML_GUARDED_BY(x) FML_THREAD_ANNOTATION_ATTRIBUTE__(guarded_by(x))

#endif // FLUTTER_FML_MACROS_H_
7 changes: 3 additions & 4 deletions impeller/playground/backend/vulkan/playground_impl_vk.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,7 @@ void PlaygroundImplVK::DestroyWindowHandle(WindowHandle handle) {
}

PlaygroundImplVK::PlaygroundImplVK(PlaygroundSwitches switches)
: PlaygroundImpl(switches),
concurrent_loop_(fml::ConcurrentMessageLoop::Create()),
handle_(nullptr, &DestroyWindowHandle) {
: PlaygroundImpl(switches), handle_(nullptr, &DestroyWindowHandle) {
if (!::glfwVulkanSupported()) {
#ifdef TARGET_OS_MAC
VALIDATION_LOG << "Attempted to initialize a Vulkan playground on macOS "
Expand Down Expand Up @@ -83,7 +81,8 @@ PlaygroundImplVK::PlaygroundImplVK(PlaygroundSwitches switches)
&::glfwGetInstanceProcAddress);
context_settings.shader_libraries_data = ShaderLibraryMappingsForPlayground();
context_settings.cache_directory = fml::paths::GetCachesDirectory();
context_settings.worker_task_runner = concurrent_loop_->GetTaskRunner();
context_settings.worker_concurrent_loop =
fml::ConcurrentMessageLoop::Create();
context_settings.enable_validation = switches_.enable_vulkan_validation;

auto context = ContextVK::Create(std::move(context_settings));
Expand Down
2 changes: 1 addition & 1 deletion impeller/playground/backend/vulkan/playground_impl_vk.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class PlaygroundImplVK final : public PlaygroundImpl {
~PlaygroundImplVK();

private:
std::shared_ptr<fml::ConcurrentMessageLoop> concurrent_loop_;
std::unique_ptr<fml::ConcurrentMessageLoop> concurrent_loop_;
std::shared_ptr<Context> context_;

// Windows management.
Expand Down
13 changes: 9 additions & 4 deletions impeller/renderer/backend/vulkan/context_vk.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ ContextVK::~ContextVK() {
[[maybe_unused]] auto result = device_->waitIdle();
}
CommandPoolVK::ClearAllPools(this);
// Delete `worker_message_loop_` to ensure that ~ConcurrentMessageLoop() is
// executed before the instance variables are deleted. This will synchronize
// on joining ConcurrentMessageLoop's threads.
worker_message_loop_.reset();
}

void ContextVK::Setup(Settings settings) {
Expand Down Expand Up @@ -305,11 +309,12 @@ void ContextVK::Setup(Settings settings) {
//----------------------------------------------------------------------------
/// Setup the pipeline library.
///
worker_message_loop_ = std::move(settings.worker_concurrent_loop);
auto pipeline_library = std::shared_ptr<PipelineLibraryVK>(
new PipelineLibraryVK(device.value.get(), //
caps, //
std::move(settings.cache_directory), //
settings.worker_task_runner //
new PipelineLibraryVK(device.value.get(), //
caps, //
std::move(settings.cache_directory), //
worker_message_loop_->GetTaskRunner() //
));

if (!pipeline_library->IsValid()) {
Expand Down
3 changes: 2 additions & 1 deletion impeller/renderer/backend/vulkan/context_vk.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class ContextVK final : public Context, public BackendCast<ContextVK, Context> {
PFN_vkGetInstanceProcAddr proc_address_callback = nullptr;
std::vector<std::shared_ptr<fml::Mapping>> shader_libraries_data;
fml::UniqueFD cache_directory;
std::shared_ptr<fml::ConcurrentTaskRunner> worker_task_runner;
std::unique_ptr<fml::ConcurrentMessageLoop> worker_concurrent_loop;
bool enable_validation = false;

Settings() = default;
Expand Down Expand Up @@ -137,6 +137,7 @@ class ContextVK final : public Context, public BackendCast<ContextVK, Context> {
std::shared_ptr<FenceWaiterVK> fence_waiter_;
std::string device_name_;
const uint64_t hash_;
std::unique_ptr<fml::ConcurrentMessageLoop> worker_message_loop_;

bool is_valid_ = false;

Expand Down
4 changes: 1 addition & 3 deletions impeller/renderer/backend/vulkan/test/mock_vulkan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -272,9 +272,7 @@ PFN_vkVoidFunction GetMockVulkanProcAddress(VkInstance instance,

std::shared_ptr<ContextVK> CreateMockVulkanContext(void) {
ContextVK::Settings settings;
auto message_loop = fml::ConcurrentMessageLoop::Create();
settings.worker_task_runner =
std::make_shared<fml::ConcurrentTaskRunner>(message_loop);
settings.worker_concurrent_loop = fml::ConcurrentMessageLoop::Create();
settings.proc_address_callback = GetMockVulkanProcAddress;
return ContextVK::Create(std::move(settings));
}
Expand Down
10 changes: 5 additions & 5 deletions shell/platform/android/android_surface_vulkan_impeller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ namespace flutter {

static std::shared_ptr<impeller::Context> CreateImpellerContext(
const fml::RefPtr<vulkan::VulkanProcTable>& proc_table,
const std::shared_ptr<fml::ConcurrentMessageLoop>& concurrent_loop,
std::unique_ptr<fml::ConcurrentMessageLoop> concurrent_loop,
bool enable_vulkan_validation) {
std::vector<std::shared_ptr<fml::Mapping>> shader_mappings = {
std::make_shared<fml::NonOwnedMapping>(impeller_entity_shaders_vk_data,
Expand All @@ -40,7 +40,7 @@ static std::shared_ptr<impeller::Context> CreateImpellerContext(
settings.proc_address_callback = instance_proc_addr;
settings.shader_libraries_data = std::move(shader_mappings);
settings.cache_directory = fml::paths::GetCachesDirectory();
settings.worker_task_runner = concurrent_loop->GetTaskRunner();
settings.worker_concurrent_loop = std::move(concurrent_loop);
settings.enable_validation = enable_vulkan_validation;
return impeller::ContextVK::Create(std::move(settings));
}
Expand All @@ -50,10 +50,10 @@ AndroidSurfaceVulkanImpeller::AndroidSurfaceVulkanImpeller(
const std::shared_ptr<PlatformViewAndroidJNI>& jni_facade,
bool enable_vulkan_validation)
: AndroidSurface(android_context),
proc_table_(fml::MakeRefCounted<vulkan::VulkanProcTable>()),
workers_(fml::ConcurrentMessageLoop::Create()) {
proc_table_(fml::MakeRefCounted<vulkan::VulkanProcTable>()) {
impeller_context_ =
CreateImpellerContext(proc_table_, workers_, enable_vulkan_validation);
CreateImpellerContext(proc_table_, fml::ConcurrentMessageLoop::Create(),
enable_vulkan_validation);
is_valid_ =
proc_table_->HasAcquiredMandatoryProcAddresses() && impeller_context_;
}
Expand Down
1 change: 0 additions & 1 deletion shell/platform/android/android_surface_vulkan_impeller.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ class AndroidSurfaceVulkanImpeller : public AndroidSurface {
private:
fml::RefPtr<vulkan::VulkanProcTable> proc_table_;
fml::RefPtr<AndroidNativeWindow> native_window_;
std::shared_ptr<fml::ConcurrentMessageLoop> workers_;
std::shared_ptr<impeller::Context> impeller_context_;
bool is_valid_ = false;

Expand Down