Skip to content

Commit

Permalink
[Impeller] Maintain separate queues of GLES operations for each threa…
Browse files Browse the repository at this point in the history
…d in the reactor (flutter#56573)

Threads that add operations to the ReactorGLES assume that those operations will be executed serially.

But prior to this change, the ReactorGLES added all operations into one queue.  The reactor would then execute those operations on any thread that can react. This could cause operations that were added to the reactor on the raster thread to be submitted to the GPU on the IO thread (or vice versa).
The reactor does not wait for the GPU to finish execution of those operations.  So other operations added on the raster thread could be submitted by a reaction before the GPU has completed the operation that was submitted on the IO thread.

This PR ensures that operations added to the reactor on a given thread will be executed during a reaction on that same thread.  If the thread can not currently react, then the operations will be queued until the thread enables reactions.

This also adds a call to CommandBuffer::WaitUntilScheduled to ImageDecoderImpeller.  This ensures that the command buffer submitted on the IO thread is flushed before the image is returned.

Fixes flutter/flutter#158535
Fixes flutter/flutter#158388
Fixes flutter/flutter#158390
  • Loading branch information
jason-simmons authored Nov 15, 2024
1 parent 1ce397f commit 644f85e
Show file tree
Hide file tree
Showing 14 changed files with 63 additions and 24 deletions.
4 changes: 2 additions & 2 deletions impeller/renderer/backend/gles/command_buffer_gles.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ bool CommandBufferGLES::OnSubmitCommands(CompletionCallback callback) {
}

// |CommandBuffer|
void CommandBufferGLES::OnWaitUntilScheduled() {
reactor_->GetProcTable().Flush();
void CommandBufferGLES::OnWaitUntilCompleted() {
reactor_->GetProcTable().Finish();
}

// |CommandBuffer|
Expand Down
2 changes: 1 addition & 1 deletion impeller/renderer/backend/gles/command_buffer_gles.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class CommandBufferGLES final : public CommandBuffer {
bool OnSubmitCommands(CompletionCallback callback) override;

// |CommandBuffer|
void OnWaitUntilScheduled() override;
void OnWaitUntilCompleted() override;

// |CommandBuffer|
std::shared_ptr<RenderPass> OnCreateRenderPass(RenderTarget target) override;
Expand Down
1 change: 1 addition & 0 deletions impeller/renderer/backend/gles/proc_table_gles.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ struct GLProc {
PROC(DrawElements); \
PROC(Enable); \
PROC(EnableVertexAttribArray); \
PROC(Finish); \
PROC(Flush); \
PROC(FramebufferRenderbuffer); \
PROC(FramebufferTexture2D); \
Expand Down
16 changes: 8 additions & 8 deletions impeller/renderer/backend/gles/reactor_gles.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,10 @@ bool ReactorGLES::RemoveWorker(WorkerID worker) {
}

bool ReactorGLES::HasPendingOperations() const {
auto thread_id = std::this_thread::get_id();
Lock ops_lock(ops_mutex_);
return !ops_.empty();
auto it = ops_.find(thread_id);
return it != ops_.end() ? !it->second.empty() : false;
}

const ProcTableGLES& ReactorGLES::GetProcTable() const {
Expand Down Expand Up @@ -136,9 +138,10 @@ bool ReactorGLES::AddOperation(Operation operation) {
if (!operation) {
return false;
}
auto thread_id = std::this_thread::get_id();
{
Lock ops_lock(ops_mutex_);
ops_.emplace_back(std::move(operation));
ops_[thread_id].emplace_back(std::move(operation));
}
// Attempt a reaction if able but it is not an error if this isn't possible.
[[maybe_unused]] auto result = React();
Expand Down Expand Up @@ -191,10 +194,6 @@ bool ReactorGLES::React() {
}
TRACE_EVENT0("impeller", "ReactorGLES::React");
while (HasPendingOperations()) {
// Both the raster thread and the IO thread can flush queued operations.
// Ensure that execution of the ops is serialized.
Lock execution_lock(ops_execution_mutex_);

if (!ReactOnce()) {
return false;
}
Expand Down Expand Up @@ -279,10 +278,11 @@ bool ReactorGLES::FlushOps() {

// Do NOT hold the ops or handles locks while performing operations in case
// the ops enqueue more ops.
decltype(ops_) ops;
decltype(ops_)::mapped_type ops;
auto thread_id = std::this_thread::get_id();
{
Lock ops_lock(ops_mutex_);
std::swap(ops_, ops);
std::swap(ops_[thread_id], ops);
}
for (const auto& op : ops) {
TRACE_EVENT0("impeller", "ReactorGLES::Operation");
Expand Down
6 changes: 3 additions & 3 deletions impeller/renderer/backend/gles/reactor_gles.h
Original file line number Diff line number Diff line change
Expand Up @@ -260,9 +260,9 @@ class ReactorGLES {

std::unique_ptr<ProcTableGLES> proc_table_;

Mutex ops_execution_mutex_;
mutable Mutex ops_mutex_;
std::vector<Operation> ops_ IPLR_GUARDED_BY(ops_mutex_);
std::map<std::thread::id, std::vector<Operation>> ops_ IPLR_GUARDED_BY(
ops_mutex_);

// Make sure the container is one where erasing items during iteration doesn't
// invalidate other iterators.
Expand All @@ -280,7 +280,7 @@ class ReactorGLES {
bool can_set_debug_labels_ = false;
bool is_valid_ = false;

bool ReactOnce() IPLR_REQUIRES(ops_execution_mutex_);
bool ReactOnce();

bool HasPendingOperations() const;

Expand Down
33 changes: 33 additions & 0 deletions impeller/renderer/backend/gles/test/reactor_unittests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// found in the LICENSE file.

#include <memory>
#include "flutter/fml/synchronization/waitable_event.h"
#include "flutter/testing/testing.h" // IWYU pragma: keep
#include "gtest/gtest.h"
#include "impeller/renderer/backend/gles/handle_gles.h"
Expand Down Expand Up @@ -60,5 +61,37 @@ TEST(ReactorGLES, DeletesHandlesDuringShutdown) {
calls.end());
}

TEST(ReactorGLES, PerThreadOperationQueues) {
auto mock_gles = MockGLES::Init();
ProcTableGLES::Resolver resolver = kMockResolverGLES;
auto proc_table = std::make_unique<ProcTableGLES>(resolver);
auto worker = std::make_shared<TestWorker>();
auto reactor = std::make_shared<ReactorGLES>(std::move(proc_table));
reactor->AddWorker(worker);

bool op1_called = false;
EXPECT_TRUE(
reactor->AddOperation([&](const ReactorGLES&) { op1_called = true; }));

fml::AutoResetWaitableEvent event;
bool op2_called = false;
std::thread thread([&] {
EXPECT_TRUE(
reactor->AddOperation([&](const ReactorGLES&) { op2_called = true; }));
event.Wait();
EXPECT_TRUE(reactor->React());
});

// Reacting on the main thread should only run the main thread's operation.
EXPECT_TRUE(reactor->React());
EXPECT_TRUE(op1_called);
EXPECT_FALSE(op2_called);

// Reacting on the second thread will run the second thread's operation.
event.Signal();
thread.join();
EXPECT_TRUE(op2_called);
}

} // namespace testing
} // namespace impeller
2 changes: 1 addition & 1 deletion impeller/renderer/backend/metal/command_buffer_mtl.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class CommandBufferMTL final : public CommandBuffer {
bool OnSubmitCommands(CompletionCallback callback) override;

// |CommandBuffer|
void OnWaitUntilScheduled() override;
void OnWaitUntilCompleted() override;

// |CommandBuffer|
std::shared_ptr<RenderPass> OnCreateRenderPass(RenderTarget target) override;
Expand Down
2 changes: 1 addition & 1 deletion impeller/renderer/backend/metal/command_buffer_mtl.mm
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ static bool LogMTLCommandBufferErrorIfPresent(id<MTLCommandBuffer> buffer) {
return true;
}

void CommandBufferMTL::OnWaitUntilScheduled() {}
void CommandBufferMTL::OnWaitUntilCompleted() {}

std::shared_ptr<RenderPass> CommandBufferMTL::OnCreateRenderPass(
RenderTarget target) {
Expand Down
2 changes: 1 addition & 1 deletion impeller/renderer/backend/vulkan/command_buffer_vk.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ bool CommandBufferVK::OnSubmitCommands(CompletionCallback callback) {
FML_UNREACHABLE()
}

void CommandBufferVK::OnWaitUntilScheduled() {}
void CommandBufferVK::OnWaitUntilCompleted() {}

std::shared_ptr<RenderPass> CommandBufferVK::OnCreateRenderPass(
RenderTarget target) {
Expand Down
2 changes: 1 addition & 1 deletion impeller/renderer/backend/vulkan/command_buffer_vk.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class CommandBufferVK final
bool OnSubmitCommands(CompletionCallback callback) override;

// |CommandBuffer|
void OnWaitUntilScheduled() override;
void OnWaitUntilCompleted() override;

// |CommandBuffer|
std::shared_ptr<RenderPass> OnCreateRenderPass(RenderTarget target) override;
Expand Down
4 changes: 2 additions & 2 deletions impeller/renderer/command_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ bool CommandBuffer::SubmitCommands() {
return SubmitCommands(nullptr);
}

void CommandBuffer::WaitUntilScheduled() {
return OnWaitUntilScheduled();
void CommandBuffer::WaitUntilCompleted() {
return OnWaitUntilCompleted();
}

std::shared_ptr<RenderPass> CommandBuffer::CreateRenderPass(
Expand Down
7 changes: 4 additions & 3 deletions impeller/renderer/command_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@ class CommandBuffer {
virtual void SetLabel(std::string_view label) const = 0;

//----------------------------------------------------------------------------
/// @brief Force execution of pending GPU commands.
/// @brief Block the current thread until the GPU has completed execution
/// of the commands.
///
void WaitUntilScheduled();
void WaitUntilCompleted();

//----------------------------------------------------------------------------
/// @brief Create a render pass to record render commands into.
Expand Down Expand Up @@ -102,7 +103,7 @@ class CommandBuffer {

[[nodiscard]] virtual bool OnSubmitCommands(CompletionCallback callback) = 0;

virtual void OnWaitUntilScheduled() = 0;
virtual void OnWaitUntilCompleted() = 0;

virtual std::shared_ptr<ComputePass> OnCreateComputePass() = 0;

Expand Down
2 changes: 1 addition & 1 deletion impeller/renderer/testing/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class MockCommandBuffer : public CommandBuffer {
OnSubmitCommands,
(CompletionCallback callback),
(override));
MOCK_METHOD(void, OnWaitUntilScheduled, (), (override));
MOCK_METHOD(void, OnWaitUntilCompleted, (), (override));
MOCK_METHOD(std::shared_ptr<ComputePass>,
OnCreateComputePass,
(),
Expand Down
4 changes: 4 additions & 0 deletions lib/ui/painting/image_decoder_impeller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,10 @@ ImageDecoderImpeller::UnsafeUploadTextureToPrivate(
return std::make_pair(nullptr, decode_error);
}

// Flush the pending command buffer to ensure that its output becomes visible
// to the raster thread.
command_buffer->WaitUntilCompleted();

context->DisposeThreadLocalCachedResources();

return std::make_pair(
Expand Down

0 comments on commit 644f85e

Please sign in to comment.