Skip to content
This repository was archived by the owner on Feb 25, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 19 additions & 33 deletions fml/concurrent_message_loop.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@

namespace fml {

std::shared_ptr<ConcurrentMessageLoop> ConcurrentMessageLoop::Create(
std::unique_ptr<ConcurrentMessageLoop> ConcurrentMessageLoop::Create(
size_t worker_count) {
return std::shared_ptr<ConcurrentMessageLoop>{
return std::unique_ptr<ConcurrentMessageLoop>{
new ConcurrentMessageLoop(worker_count)};
}

ConcurrentMessageLoop::ConcurrentMessageLoop(size_t worker_count)
: worker_count_(std::max<size_t>(worker_count, 1ul)) {
: worker_count_(std::max<size_t>(worker_count, 1ul)),
task_runner_(new ConcurrentTaskRunner(this)) {
for (size_t i = 0; i < worker_count_; ++i) {
workers_.emplace_back([i, this]() {
fml::Thread::SetCurrentThreadName(fml::Thread::ThreadConfig(
Expand All @@ -33,6 +34,10 @@ ConcurrentMessageLoop::ConcurrentMessageLoop(size_t worker_count)
}

ConcurrentMessageLoop::~ConcurrentMessageLoop() {
{
std::scoped_lock lock(task_runner_->weak_loop_mutex_);
task_runner_->weak_loop_ = nullptr;
}
Terminate();
for (auto& worker : workers_) {
worker.join();
Expand All @@ -43,33 +48,18 @@ size_t ConcurrentMessageLoop::GetWorkerCount() const {
return worker_count_;
}

std::shared_ptr<ConcurrentTaskRunner> ConcurrentMessageLoop::GetTaskRunner() {
return std::make_shared<ConcurrentTaskRunner>(weak_from_this());
}

void ConcurrentMessageLoop::PostTask(const fml::closure& task) {
if (!task) {
return;
}

std::unique_lock lock(tasks_mutex_);

// Don't just drop tasks on the floor in case of shutdown.
if (shutdown_) {
FML_DLOG(WARNING)
<< "Tried to post a task to shutdown concurrent message "
"loop. The task will be executed on the callers thread.";
lock.unlock();
task();
return;
}

tasks_.push(task);

// Unlock the mutex before notifying the condition variable because that mutex
// has to be acquired on the other thread anyway. Waiting in this scope till
// it is acquired there is a pessimization.
lock.unlock();
{
std::unique_lock lock(tasks_mutex_);
tasks_.push(task);
}

tasks_condition_.notify_one();
}
Expand Down Expand Up @@ -148,9 +138,8 @@ std::vector<fml::closure> ConcurrentMessageLoop::GetThreadTasksLocked() {
return pending_tasks;
}

ConcurrentTaskRunner::ConcurrentTaskRunner(
std::weak_ptr<ConcurrentMessageLoop> weak_loop)
: weak_loop_(std::move(weak_loop)) {}
ConcurrentTaskRunner::ConcurrentTaskRunner(ConcurrentMessageLoop* weak_loop)
: weak_loop_(weak_loop) {}

ConcurrentTaskRunner::~ConcurrentTaskRunner() = default;

Expand All @@ -159,15 +148,12 @@ void ConcurrentTaskRunner::PostTask(const fml::closure& task) {
return;
}

if (auto loop = weak_loop_.lock()) {
loop->PostTask(task);
return;
{
std::scoped_lock lock(weak_loop_mutex_);
if (weak_loop_) {
weak_loop_->PostTask(task);
}
}

FML_DLOG(WARNING)
<< "Tried to post to a concurrent message loop that has already died. "
"Executing the task on the callers thread.";
task();
}

bool ConcurrentMessageLoop::RunsTasksOnCurrentThread() {
Expand Down
15 changes: 9 additions & 6 deletions fml/concurrent_message_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@ namespace fml {

class ConcurrentTaskRunner;

class ConcurrentMessageLoop
: public std::enable_shared_from_this<ConcurrentMessageLoop> {
// This class is final for the logic in ~ConcurrentMessageLoop().
class ConcurrentMessageLoop final {
public:
static std::shared_ptr<ConcurrentMessageLoop> Create(
static std::unique_ptr<ConcurrentMessageLoop> Create(
size_t worker_count = std::thread::hardware_concurrency());

~ConcurrentMessageLoop();

size_t GetWorkerCount() const;

std::shared_ptr<ConcurrentTaskRunner> GetTaskRunner();
std::shared_ptr<ConcurrentTaskRunner> GetTaskRunner() { return task_runner_; }

void Terminate();

Expand All @@ -47,6 +47,7 @@ class ConcurrentMessageLoop
std::vector<std::thread::id> worker_thread_ids_;
std::map<std::thread::id, std::vector<fml::closure>> thread_tasks_;
bool shutdown_ = false;
std::shared_ptr<ConcurrentTaskRunner> task_runner_;

explicit ConcurrentMessageLoop(size_t worker_count);

Expand All @@ -63,7 +64,7 @@ class ConcurrentMessageLoop

class ConcurrentTaskRunner : public BasicTaskRunner {
public:
explicit ConcurrentTaskRunner(std::weak_ptr<ConcurrentMessageLoop> weak_loop);
explicit ConcurrentTaskRunner(ConcurrentMessageLoop* weak_loop);

virtual ~ConcurrentTaskRunner();

Expand All @@ -72,7 +73,9 @@ class ConcurrentTaskRunner : public BasicTaskRunner {
private:
friend ConcurrentMessageLoop;

std::weak_ptr<ConcurrentMessageLoop> weak_loop_;
std::mutex weak_loop_mutex_;
// Raw pointer that is cleared out in ~ConcurrentMessageLoop.
ConcurrentMessageLoop* weak_loop_ FML_GUARDED_BY(weak_loop_mutex_);

FML_DISALLOW_COPY_AND_ASSIGN(ConcurrentTaskRunner);
};
Expand Down
8 changes: 8 additions & 0 deletions fml/macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,12 @@
TypeName() = delete; \
FML_DISALLOW_COPY_ASSIGN_AND_MOVE(TypeName)

#if defined(__clang__)
#define FML_THREAD_ANNOTATION_ATTRIBUTE__(x) __attribute__((x))
#else
#define FML_THREAD_ANNOTATION_ATTRIBUTE__(x) // no-op
#endif

#define FML_GUARDED_BY(x) FML_THREAD_ANNOTATION_ATTRIBUTE__(guarded_by(x))

#endif // FLUTTER_FML_MACROS_H_
7 changes: 3 additions & 4 deletions impeller/playground/backend/vulkan/playground_impl_vk.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,7 @@ void PlaygroundImplVK::DestroyWindowHandle(WindowHandle handle) {
}

PlaygroundImplVK::PlaygroundImplVK(PlaygroundSwitches switches)
: PlaygroundImpl(switches),
concurrent_loop_(fml::ConcurrentMessageLoop::Create()),
handle_(nullptr, &DestroyWindowHandle) {
: PlaygroundImpl(switches), handle_(nullptr, &DestroyWindowHandle) {
if (!::glfwVulkanSupported()) {
#ifdef TARGET_OS_MAC
VALIDATION_LOG << "Attempted to initialize a Vulkan playground on macOS "
Expand Down Expand Up @@ -83,7 +81,8 @@ PlaygroundImplVK::PlaygroundImplVK(PlaygroundSwitches switches)
&::glfwGetInstanceProcAddress);
context_settings.shader_libraries_data = ShaderLibraryMappingsForPlayground();
context_settings.cache_directory = fml::paths::GetCachesDirectory();
context_settings.worker_task_runner = concurrent_loop_->GetTaskRunner();
context_settings.worker_concurrent_loop =
fml::ConcurrentMessageLoop::Create();
context_settings.enable_validation = switches_.enable_vulkan_validation;

auto context = ContextVK::Create(std::move(context_settings));
Expand Down
2 changes: 1 addition & 1 deletion impeller/playground/backend/vulkan/playground_impl_vk.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class PlaygroundImplVK final : public PlaygroundImpl {
~PlaygroundImplVK();

private:
std::shared_ptr<fml::ConcurrentMessageLoop> concurrent_loop_;
std::unique_ptr<fml::ConcurrentMessageLoop> concurrent_loop_;
std::shared_ptr<Context> context_;

// Windows management.
Expand Down
3 changes: 3 additions & 0 deletions impeller/renderer/backend/vulkan/command_pool_vk.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ std::shared_ptr<CommandPoolVK> CommandPoolVK::GetThreadLocal(
}

void CommandPoolVK::ClearAllPools(const ContextVK* context) {
if (tls_command_pool.get()) {
tls_command_pool.get()->erase(context->GetHash());
}
Lock pool_lock(g_all_pools_mutex);
if (auto found = g_all_pools.find(context); found != g_all_pools.end()) {
for (auto& weak_pool : found->second) {
Expand Down
13 changes: 9 additions & 4 deletions impeller/renderer/backend/vulkan/context_vk.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ ContextVK::~ContextVK() {
if (device_) {
[[maybe_unused]] auto result = device_->waitIdle();
}
// Delete `worker_message_loop_` to ensure that ~ConcurrentMessageLoop() is
// executed before the instance variables are deleted. This will synchronize
// on joining ConcurrentMessageLoop's threads.
worker_message_loop_.reset();
CommandPoolVK::ClearAllPools(this);
}

Expand Down Expand Up @@ -305,11 +309,12 @@ void ContextVK::Setup(Settings settings) {
//----------------------------------------------------------------------------
/// Setup the pipeline library.
///
worker_message_loop_ = std::move(settings.worker_concurrent_loop);
auto pipeline_library = std::shared_ptr<PipelineLibraryVK>(
new PipelineLibraryVK(device.value.get(), //
caps, //
std::move(settings.cache_directory), //
settings.worker_task_runner //
new PipelineLibraryVK(device.value.get(), //
caps, //
std::move(settings.cache_directory), //
worker_message_loop_->GetTaskRunner() //
));

if (!pipeline_library->IsValid()) {
Expand Down
3 changes: 2 additions & 1 deletion impeller/renderer/backend/vulkan/context_vk.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class ContextVK final : public Context, public BackendCast<ContextVK, Context> {
PFN_vkGetInstanceProcAddr proc_address_callback = nullptr;
std::vector<std::shared_ptr<fml::Mapping>> shader_libraries_data;
fml::UniqueFD cache_directory;
std::shared_ptr<fml::ConcurrentTaskRunner> worker_task_runner;
std::unique_ptr<fml::ConcurrentMessageLoop> worker_concurrent_loop;
bool enable_validation = false;

Settings() = default;
Expand Down Expand Up @@ -137,6 +137,7 @@ class ContextVK final : public Context, public BackendCast<ContextVK, Context> {
std::shared_ptr<FenceWaiterVK> fence_waiter_;
std::string device_name_;
const uint64_t hash_;
std::unique_ptr<fml::ConcurrentMessageLoop> worker_message_loop_;

bool is_valid_ = false;

Expand Down
4 changes: 1 addition & 3 deletions impeller/renderer/backend/vulkan/test/mock_vulkan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -272,9 +272,7 @@ PFN_vkVoidFunction GetMockVulkanProcAddress(VkInstance instance,

std::shared_ptr<ContextVK> CreateMockVulkanContext(void) {
ContextVK::Settings settings;
auto message_loop = fml::ConcurrentMessageLoop::Create();
settings.worker_task_runner =
std::make_shared<fml::ConcurrentTaskRunner>(message_loop);
settings.worker_concurrent_loop = fml::ConcurrentMessageLoop::Create();
settings.proc_address_callback = GetMockVulkanProcAddress;
return ContextVK::Create(std::move(settings));
}
Expand Down
10 changes: 5 additions & 5 deletions shell/platform/android/android_surface_vulkan_impeller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ namespace flutter {

static std::shared_ptr<impeller::Context> CreateImpellerContext(
const fml::RefPtr<vulkan::VulkanProcTable>& proc_table,
const std::shared_ptr<fml::ConcurrentMessageLoop>& concurrent_loop,
std::unique_ptr<fml::ConcurrentMessageLoop> concurrent_loop,
bool enable_vulkan_validation) {
std::vector<std::shared_ptr<fml::Mapping>> shader_mappings = {
std::make_shared<fml::NonOwnedMapping>(impeller_entity_shaders_vk_data,
Expand All @@ -40,7 +40,7 @@ static std::shared_ptr<impeller::Context> CreateImpellerContext(
settings.proc_address_callback = instance_proc_addr;
settings.shader_libraries_data = std::move(shader_mappings);
settings.cache_directory = fml::paths::GetCachesDirectory();
settings.worker_task_runner = concurrent_loop->GetTaskRunner();
settings.worker_concurrent_loop = std::move(concurrent_loop);
settings.enable_validation = enable_vulkan_validation;
return impeller::ContextVK::Create(std::move(settings));
}
Expand All @@ -50,10 +50,10 @@ AndroidSurfaceVulkanImpeller::AndroidSurfaceVulkanImpeller(
const std::shared_ptr<PlatformViewAndroidJNI>& jni_facade,
bool enable_vulkan_validation)
: AndroidSurface(android_context),
proc_table_(fml::MakeRefCounted<vulkan::VulkanProcTable>()),
workers_(fml::ConcurrentMessageLoop::Create()) {
proc_table_(fml::MakeRefCounted<vulkan::VulkanProcTable>()) {
impeller_context_ =
CreateImpellerContext(proc_table_, workers_, enable_vulkan_validation);
CreateImpellerContext(proc_table_, fml::ConcurrentMessageLoop::Create(),
enable_vulkan_validation);
is_valid_ =
proc_table_->HasAcquiredMandatoryProcAddresses() && impeller_context_;
}
Expand Down
1 change: 0 additions & 1 deletion shell/platform/android/android_surface_vulkan_impeller.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ class AndroidSurfaceVulkanImpeller : public AndroidSurface {
private:
fml::RefPtr<vulkan::VulkanProcTable> proc_table_;
fml::RefPtr<AndroidNativeWindow> native_window_;
std::shared_ptr<fml::ConcurrentMessageLoop> workers_;
std::shared_ptr<impeller::Context> impeller_context_;
bool is_valid_ = false;

Expand Down