Skip to content

Commit

Permalink
MessageLoopTaskQueue schedules Wakes (flutter#9316)
Browse files Browse the repository at this point in the history
* Refactor to move Task Queue to its own class

- This is to help with sharing task queue among
  multiple message loops going forward.

- currently there is 1:1 mapping between task queue
  and message loop, we are still maintaining the semantics
  for this change.

* Add mutex include

* Most of the waking up changes minus test failures

* Refactor MessageLoopImpl to be Wakeable

- Makes testing easier by letting us putting a TestWakeable

- Also move the waking up logic to the task queue

* add tests

* Fix formatting and license
  • Loading branch information
iskakaushik authored Jun 14, 2019
1 parent b9c790e commit 6f5347c
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 22 deletions.
1 change: 1 addition & 0 deletions ci/licenses_golden/licenses_flutter
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ FILE: ../../../flutter/fml/trace_event.h
FILE: ../../../flutter/fml/unique_fd.cc
FILE: ../../../flutter/fml/unique_fd.h
FILE: ../../../flutter/fml/unique_object.h
FILE: ../../../flutter/fml/wakeable.h
FILE: ../../../flutter/lib/io/dart_io.cc
FILE: ../../../flutter/lib/io/dart_io.h
FILE: ../../../flutter/lib/snapshot/libraries.json
Expand Down
1 change: 1 addition & 0 deletions fml/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ source_set("fml") {
"unique_fd.cc",
"unique_fd.h",
"unique_object.h",
"wakeable.h",
]

public_deps = []
Expand Down
8 changes: 3 additions & 5 deletions fml/message_loop_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ fml::RefPtr<MessageLoopImpl> MessageLoopImpl::Create() {

MessageLoopImpl::MessageLoopImpl() : terminated_(false) {
task_queue_ = std::make_unique<MessageLoopTaskQueue>();
task_queue_->SetWakeable(this);
}

MessageLoopImpl::~MessageLoopImpl() = default;
Expand All @@ -53,8 +54,7 @@ void MessageLoopImpl::PostTask(fml::closure task, fml::TimePoint target_time) {
// |task| synchronously within this function.
return;
}
const auto wake_up = task_queue_->RegisterTask(task, target_time);
WakeUp(wake_up);
task_queue_->RegisterTask(task, target_time);
}

void MessageLoopImpl::AddTaskObserver(intptr_t key, fml::closure callback) {
Expand Down Expand Up @@ -130,9 +130,7 @@ void MessageLoopImpl::FlushTasks(FlushType type) {
// gather invocations -> Swap -> execute invocations
// will lead us to run invocations on the wrong thread.
std::lock_guard<std::mutex> task_flush_lock(tasks_flushing_mutex_);

const auto wake_up = task_queue_->GetTasksToRunNow(type, invocations);
WakeUp(wake_up);
task_queue_->GetTasksToRunNow(type, invocations);

for (const auto& invocation : invocations) {
invocation();
Expand Down
6 changes: 3 additions & 3 deletions fml/message_loop_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
#include "flutter/fml/message_loop_task_queue.h"
#include "flutter/fml/synchronization/thread_annotations.h"
#include "flutter/fml/time/time_point.h"
#include "flutter/fml/wakeable.h"

namespace fml {

class MessageLoopImpl : public fml::RefCountedThreadSafe<MessageLoopImpl> {
class MessageLoopImpl : public Wakeable,
public fml::RefCountedThreadSafe<MessageLoopImpl> {
public:
static fml::RefPtr<MessageLoopImpl> Create();

Expand All @@ -33,8 +35,6 @@ class MessageLoopImpl : public fml::RefCountedThreadSafe<MessageLoopImpl> {

virtual void Terminate() = 0;

virtual void WakeUp(fml::TimePoint time_point) = 0;

void PostTask(fml::closure task, fml::TimePoint target_time);

void AddTaskObserver(intptr_t key, fml::closure callback);
Expand Down
23 changes: 17 additions & 6 deletions fml/message_loop_task_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#define FML_USED_ON_EMBEDDER

#include "flutter/fml/message_loop_task_queue.h"
#include "flutter/fml/message_loop_impl.h"

namespace fml {

Expand All @@ -17,19 +18,19 @@ void MessageLoopTaskQueue::Dispose() {
delayed_tasks_ = {};
}

fml::TimePoint MessageLoopTaskQueue::RegisterTask(fml::closure task,
fml::TimePoint target_time) {
void MessageLoopTaskQueue::RegisterTask(fml::closure task,
fml::TimePoint target_time) {
std::lock_guard<std::mutex> lock(delayed_tasks_mutex_);
delayed_tasks_.push({++order_, std::move(task), target_time});
return delayed_tasks_.top().GetTargetTime();
WakeUp(delayed_tasks_.top().GetTargetTime());
}

bool MessageLoopTaskQueue::HasPendingTasks() {
std::lock_guard<std::mutex> lock(delayed_tasks_mutex_);
return !delayed_tasks_.empty();
}

fml::TimePoint MessageLoopTaskQueue::GetTasksToRunNow(
void MessageLoopTaskQueue::GetTasksToRunNow(
FlushType type,
std::vector<fml::closure>& invocations) {
std::lock_guard<std::mutex> lock(delayed_tasks_mutex_);
Expand All @@ -48,9 +49,15 @@ fml::TimePoint MessageLoopTaskQueue::GetTasksToRunNow(
}

if (delayed_tasks_.empty()) {
return fml::TimePoint::Max();
WakeUp(fml::TimePoint::Max());
} else {
return delayed_tasks_.top().GetTargetTime();
WakeUp(delayed_tasks_.top().GetTargetTime());
}
}

void MessageLoopTaskQueue::WakeUp(fml::TimePoint time) {
if (wakeable_) {
wakeable_->WakeUp(time);
}
}

Expand Down Expand Up @@ -94,4 +101,8 @@ void MessageLoopTaskQueue::Swap(MessageLoopTaskQueue& other)
std::swap(delayed_tasks_, other.delayed_tasks_);
}

void MessageLoopTaskQueue::SetWakeable(fml::Wakeable* wakeable) {
wakeable_ = wakeable;
}

} // namespace fml
16 changes: 11 additions & 5 deletions fml/message_loop_task_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "flutter/fml/macros.h"
#include "flutter/fml/memory/ref_counted.h"
#include "flutter/fml/synchronization/thread_annotations.h"
#include "flutter/fml/wakeable.h"

namespace fml {

Expand All @@ -23,7 +24,8 @@ enum class FlushType {
};

// This class keeps track of all the tasks and observers that
// need to be run on it's MessageLoopImpl.
// need to be run on it's MessageLoopImpl. This also wakes up the
// loop at the required times.
class MessageLoopTaskQueue {
public:
// Lifecycle.
Expand All @@ -36,13 +38,11 @@ class MessageLoopTaskQueue {

// Tasks methods.

fml::TimePoint RegisterTask(fml::closure task, fml::TimePoint target_time);
void RegisterTask(fml::closure task, fml::TimePoint target_time);

bool HasPendingTasks();

// Returns the wake up time.
fml::TimePoint GetTasksToRunNow(FlushType type,
std::vector<fml::closure>& invocations);
void GetTasksToRunNow(FlushType type, std::vector<fml::closure>& invocations);

size_t GetNumPendingTasks();

Expand All @@ -58,7 +58,13 @@ class MessageLoopTaskQueue {

void Swap(MessageLoopTaskQueue& other);

void SetWakeable(fml::Wakeable* wakeable);

private:
void WakeUp(fml::TimePoint time);

Wakeable* wakeable_ = NULL;

std::mutex observers_mutex_;
std::map<intptr_t, fml::closure> task_observers_
FML_GUARDED_BY(observers_mutex_);
Expand Down
71 changes: 68 additions & 3 deletions fml/message_loop_task_queue_unittests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,37 @@
#define FML_USED_ON_EMBEDDER

#include "flutter/fml/message_loop_task_queue.h"
#include "flutter/fml/synchronization/count_down_latch.h"
#include "flutter/fml/synchronization/waitable_event.h"
#include "gtest/gtest.h"

class TestWakeable : public fml::Wakeable {
public:
using WakeUpCall = std::function<void(const fml::TimePoint)>;

TestWakeable(WakeUpCall call) : wake_up_call_(call) {}

void WakeUp(fml::TimePoint time_point) override { wake_up_call_(time_point); }

private:
WakeUpCall wake_up_call_;
};

TEST(MessageLoopTaskQueue, StartsWithNoPendingTasks) {
auto task_queue = std::make_unique<fml::MessageLoopTaskQueue>();
ASSERT_FALSE(task_queue->HasPendingTasks());
}

TEST(MessageLoopTaskQueue, RegisterOneTask) {
auto task_queue = std::make_unique<fml::MessageLoopTaskQueue>();
const auto time = fml::TimePoint::Max();
const auto wake_time = task_queue->RegisterTask([] {}, time);

auto task_queue = std::make_unique<fml::MessageLoopTaskQueue>();
task_queue->SetWakeable(new TestWakeable(
[&time](fml::TimePoint wake_time) { ASSERT_TRUE(wake_time == time); }));

task_queue->RegisterTask([] {}, time);
ASSERT_TRUE(task_queue->HasPendingTasks());
ASSERT_TRUE(task_queue->GetNumPendingTasks() == 1);
ASSERT_TRUE(wake_time == time);
}

TEST(MessageLoopTaskQueue, RegisterTwoTasksAndCount) {
Expand Down Expand Up @@ -68,3 +85,51 @@ TEST(MessageLoopTaskQueue, AddRemoveNotifyObservers) {
task_queue->NotifyObservers();
ASSERT_TRUE(test_val == 0);
}

TEST(MessageLoopTaskQueue, WakeUpIndependentOfTime) {
auto task_queue = std::make_unique<fml::MessageLoopTaskQueue>();

int num_wakes = 0;
task_queue->SetWakeable(new TestWakeable(
[&num_wakes](fml::TimePoint wake_time) { ++num_wakes; }));

task_queue->RegisterTask([]() {}, fml::TimePoint::Now());
task_queue->RegisterTask([]() {}, fml::TimePoint::Max());

ASSERT_TRUE(num_wakes == 2);
}

TEST(MessageLoopTaskQueue, WakeUpWithMaxIfNoInvocations) {
auto task_queue = std::make_unique<fml::MessageLoopTaskQueue>();
fml::AutoResetWaitableEvent ev;

task_queue->SetWakeable(new TestWakeable([&ev](fml::TimePoint wake_time) {
ASSERT_TRUE(wake_time == fml::TimePoint::Max());
ev.Signal();
}));

std::vector<fml::closure> invocations;
task_queue->GetTasksToRunNow(fml::FlushType::kAll, invocations);
ev.Wait();
}

TEST(MessageLoopTaskQueue, WokenUpWithNewerTime) {
auto task_queue = std::make_unique<fml::MessageLoopTaskQueue>();
fml::CountDownLatch latch(2);

fml::TimePoint expected = fml::TimePoint::Max();

task_queue->SetWakeable(
new TestWakeable([&latch, &expected](fml::TimePoint wake_time) {
ASSERT_TRUE(wake_time == expected);
latch.CountDown();
}));

task_queue->RegisterTask([]() {}, fml::TimePoint::Max());

const auto now = fml::TimePoint::Now();
expected = now;
task_queue->RegisterTask([]() {}, now);

latch.Wait();
}
21 changes: 21 additions & 0 deletions fml/wakeable.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright 2013 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef FLUTTER_FML_WAKEABLE_H_
#define FLUTTER_FML_WAKEABLE_H_

#include "flutter/fml/time/time_point.h"

namespace fml {

class Wakeable {
public:
virtual ~Wakeable() {}

virtual void WakeUp(fml::TimePoint time_point) = 0;
};

} // namespace fml

#endif // FLUTTER_FML_WAKEABLE_H_

0 comments on commit 6f5347c

Please sign in to comment.