Skip to content

Commit

Permalink
dispatcher: Delay fd activation until the next itertion of the event …
Browse files Browse the repository at this point in the history
…loop. (#11750)

Processing injected fd events in the same loop they are generated can result in high-throughput connections proxying data multiple times per event loop iteration, effectively starving other connections and increasing small request latency.

Signed-off-by: Antonio Vicente <avd@google.com>
  • Loading branch information
antoniovicente authored Jun 28, 2020
1 parent 8614e83 commit 9898908
Show file tree
Hide file tree
Showing 12 changed files with 306 additions and 25 deletions.
7 changes: 7 additions & 0 deletions include/envoy/event/schedulable_cb.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ class SchedulableCallback {
*/
virtual void scheduleCallbackCurrentIteration() PURE;

/**
* Schedule the callback so it runs in the next iteration of the event loop. There are no
* ordering guarantees for callbacks scheduled for the next iteration, not even among
* next-iteration callbacks.
*/
virtual void scheduleCallbackNextIteration() PURE;

/**
* Cancel pending execution of the callback.
*/
Expand Down
1 change: 1 addition & 0 deletions source/common/event/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ envoy_cc_library(
"//source/common/network:connection_lib",
"//source/common/network:dns_lib",
"//source/common/network:listener_lib",
"//source/common/runtime:runtime_features_lib",
],
)

Expand Down
71 changes: 59 additions & 12 deletions source/common/event/file_event_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "common/common/assert.h"
#include "common/event/dispatcher_impl.h"
#include "common/runtime/runtime_features.h"

#include "event2/event.h"

Expand All @@ -12,31 +13,59 @@ namespace Event {

FileEventImpl::FileEventImpl(DispatcherImpl& dispatcher, os_fd_t fd, FileReadyCb cb,
FileTriggerType trigger, uint32_t events)
: cb_(cb), fd_(fd), trigger_(trigger) {
: cb_(cb), fd_(fd), trigger_(trigger),
activate_fd_events_next_event_loop_(Runtime::runtimeFeatureEnabled(
"envoy.reloadable_features.activate_fds_next_event_loop")) {
#ifdef WIN32
RELEASE_ASSERT(trigger_ == FileTriggerType::Level,
"libevent does not support edge triggers on Windows");
#endif
assignEvents(events, &dispatcher.base());
event_add(&raw_event_, nullptr);
if (activate_fd_events_next_event_loop_) {
activation_cb_ = dispatcher.createSchedulableCallback([this]() {
ASSERT(injected_activation_events_ != 0);
mergeInjectedEventsAndRunCb(0);
});
}
}

void FileEventImpl::activate(uint32_t events) {
int libevent_events = 0;
if (events & FileReadyType::Read) {
libevent_events |= EV_READ;
}
// events is not empty.
ASSERT(events != 0);
// Only supported event types are set.
ASSERT((events & (FileReadyType::Read | FileReadyType::Write | FileReadyType::Closed)) == events);

if (!activate_fd_events_next_event_loop_) {
// Legacy implementation
int libevent_events = 0;
if (events & FileReadyType::Read) {
libevent_events |= EV_READ;
}

if (events & FileReadyType::Write) {
libevent_events |= EV_WRITE;
}

if (events & FileReadyType::Closed) {
libevent_events |= EV_CLOSED;
}

if (events & FileReadyType::Write) {
libevent_events |= EV_WRITE;
ASSERT(libevent_events);
event_active(&raw_event_, libevent_events, 0);
return;
}

if (events & FileReadyType::Closed) {
libevent_events |= EV_CLOSED;
// Schedule the activation callback so it runs as part of the next loop iteration if it is not
// already scheduled.
if (injected_activation_events_ == 0) {
ASSERT(!activation_cb_->enabled());
activation_cb_->scheduleCallbackNextIteration();
}
ASSERT(activation_cb_->enabled());

ASSERT(libevent_events);
event_active(&raw_event_, libevent_events, 0);
// Merge new events with pending injected events.
injected_activation_events_ |= events;
}

void FileEventImpl::assignEvents(uint32_t events, event_base* base) {
Expand All @@ -63,17 +92,35 @@ void FileEventImpl::assignEvents(uint32_t events, event_base* base) {
}

ASSERT(events != 0);
event->cb_(events);
event->mergeInjectedEventsAndRunCb(events);
},
this);
}

void FileEventImpl::setEnabled(uint32_t events) {
if (activate_fd_events_next_event_loop_ && injected_activation_events_ != 0) {
// Clear pending events on updates to the fd event mask to avoid delivering events that are no
// longer relevant. Updating the event mask will reset the fd edge trigger state so the proxy
// will be able to determine the fd read/write state without need for the injected activation
// events.
injected_activation_events_ = 0;
activation_cb_->cancel();
}

auto* base = event_get_base(&raw_event_);
event_del(&raw_event_);
assignEvents(events, base);
event_add(&raw_event_, nullptr);
}

void FileEventImpl::mergeInjectedEventsAndRunCb(uint32_t events) {
if (activate_fd_events_next_event_loop_ && injected_activation_events_ != 0) {
events |= injected_activation_events_;
injected_activation_events_ = 0;
activation_cb_->cancel();
}
cb_(events);
}

} // namespace Event
} // namespace Envoy
11 changes: 11 additions & 0 deletions source/common/event/file_event_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,21 @@ class FileEventImpl : public FileEvent, ImplBase {

private:
void assignEvents(uint32_t events, event_base* base);
void mergeInjectedEventsAndRunCb(uint32_t events);

FileReadyCb cb_;
os_fd_t fd_;
FileTriggerType trigger_;

// Injected FileReadyType events that were scheduled by recent calls to activate() and are pending
// delivery.
uint32_t injected_activation_events_{};
// Used to schedule delayed event activation. Armed iff pending_activation_events_ != 0.
SchedulableCallbackPtr activation_cb_;
// Latched "envoy.reloadable_features.activate_fds_next_event_loop" runtime feature. If true, fd
// events scheduled via activate are evaluated in the next iteration of the event loop after
// polling and activating new fd events.
const bool activate_fd_events_next_event_loop_;
};

} // namespace Event
Expand Down
10 changes: 10 additions & 0 deletions source/common/event/schedulable_cb_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,19 @@ SchedulableCallbackImpl::SchedulableCallbackImpl(Libevent::BasePtr& libevent,
}

void SchedulableCallbackImpl::scheduleCallbackCurrentIteration() {
// event_active directly adds the event to the end of the work queue so it executes in the current
// iteration of the event loop.
event_active(&raw_event_, EV_TIMEOUT, 0);
}

void SchedulableCallbackImpl::scheduleCallbackNextIteration() {
// libevent computes the list of timers to move to the work list after polling for fd events, but
// iteration through the work list starts. Zero delay timers added while iterating through the
// work list execute on the next iteration of the event loop.
const timeval zero_tv{};
event_add(&raw_event_, &zero_tv);
}

void SchedulableCallbackImpl::cancel() { event_del(&raw_event_); }

bool SchedulableCallbackImpl::enabled() { return 0 != evtimer_pending(&raw_event_, nullptr); }
Expand Down
1 change: 1 addition & 0 deletions source/common/event/schedulable_cb_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class SchedulableCallbackImpl : public SchedulableCallback, ImplBase {

// SchedulableCallback implementation.
void scheduleCallbackCurrentIteration() override;
void scheduleCallbackNextIteration() override;
void cancel() override;
bool enabled() override;

Expand Down
1 change: 1 addition & 0 deletions source/common/runtime/runtime_features.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ constexpr const char* runtime_features[] = {
"envoy.reloadable_features.strict_authority_validation",
"envoy.reloadable_features.reject_unsupported_transfer_encodings",
// Begin alphabetically sorted section.
"envoy.reloadable_features.activate_fds_next_event_loop",
"envoy.deprecated_features.allow_deprecated_extension_names",
"envoy.reloadable_features.disallow_unbounded_access_logs",
"envoy.reloadable_features.early_errors_via_hcm",
Expand Down
1 change: 1 addition & 0 deletions test/common/event/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ envoy_cc_test(
"//source/common/stats:isolated_store_lib",
"//test/mocks:common_lib",
"//test/test_common:environment_lib",
"//test/test_common:test_runtime_lib",
"//test/test_common:utility_lib",
],
)
51 changes: 46 additions & 5 deletions test/common/event/dispatcher_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class SchedulableCallbackImplTest : public testing::Test {
}
};

TEST_F(SchedulableCallbackImplTest, ScheduleAndCancel) {
TEST_F(SchedulableCallbackImplTest, ScheduleCurrentAndCancel) {
ReadyWatcher watcher;

auto cb = dispatcher_->createSchedulableCallback([&]() { watcher.ready(); });
Expand All @@ -72,17 +72,50 @@ TEST_F(SchedulableCallbackImplTest, ScheduleAndCancel) {
dispatcher_->run(Dispatcher::RunType::Block);
}

TEST_F(SchedulableCallbackImplTest, ScheduleNextAndCancel) {
ReadyWatcher watcher;

auto cb = dispatcher_->createSchedulableCallback([&]() { watcher.ready(); });

// Cancel is a no-op if not scheduled.
cb->cancel();
dispatcher_->run(Dispatcher::RunType::Block);

// Callback is not invoked if cancelled before it executes.
cb->scheduleCallbackNextIteration();
EXPECT_TRUE(cb->enabled());
cb->cancel();
EXPECT_FALSE(cb->enabled());
dispatcher_->run(Dispatcher::RunType::Block);

// Scheduled callback executes.
cb->scheduleCallbackNextIteration();
EXPECT_CALL(watcher, ready());
dispatcher_->run(Dispatcher::RunType::Block);

// Callbacks implicitly cancelled if runner is deleted.
cb->scheduleCallbackNextIteration();
cb.reset();
dispatcher_->run(Dispatcher::RunType::Block);
}

TEST_F(SchedulableCallbackImplTest, ScheduleOrder) {
ReadyWatcher watcher0;
createCallback([&]() { watcher0.ready(); });
ReadyWatcher watcher1;
createCallback([&]() { watcher1.ready(); });
ReadyWatcher watcher2;
createCallback([&]() { watcher2.ready(); });

// Callback run in the order they are scheduled.
callbacks_[0]->scheduleCallbackCurrentIteration();
// Current iteration callbacks run in the order they are scheduled. Next iteration callbacks run
// after current iteration callbacks.
callbacks_[0]->scheduleCallbackNextIteration();
callbacks_[1]->scheduleCallbackCurrentIteration();
EXPECT_CALL(watcher0, ready());
callbacks_[2]->scheduleCallbackCurrentIteration();
InSequence s;
EXPECT_CALL(watcher1, ready());
EXPECT_CALL(watcher2, ready());
EXPECT_CALL(watcher0, ready());
dispatcher_->run(Dispatcher::RunType::Block);
}

Expand All @@ -103,6 +136,7 @@ TEST_F(SchedulableCallbackImplTest, ScheduleChainingAndCancellation) {
callbacks_[2]->scheduleCallbackCurrentIteration();
callbacks_[3]->scheduleCallbackCurrentIteration();
callbacks_[4]->scheduleCallbackCurrentIteration();
callbacks_[5]->scheduleCallbackNextIteration();
});

ReadyWatcher watcher2;
Expand All @@ -120,14 +154,21 @@ TEST_F(SchedulableCallbackImplTest, ScheduleChainingAndCancellation) {
ReadyWatcher watcher4;
createCallback([&]() { watcher4.ready(); });

ReadyWatcher watcher5;
createCallback([&]() { watcher5.ready(); });

// Chained callbacks run in the same event loop iteration, as signaled by a single call to
// prepare_watcher.ready(). watcher3 and watcher4 are not invoked because cb2 cancels
// cb3 and deletes cb4 as part of its execution.
// cb3 and deletes cb4 as part of its execution. cb5 runs after a second call to the
// prepare callback since it's scheduled for the next iteration.
callbacks_[0]->scheduleCallbackCurrentIteration();
InSequence s;
EXPECT_CALL(prepare_watcher, ready());
EXPECT_CALL(watcher0, ready());
EXPECT_CALL(watcher1, ready());
EXPECT_CALL(watcher2, ready());
EXPECT_CALL(prepare_watcher, ready());
EXPECT_CALL(watcher5, ready());
dispatcher_->run(Dispatcher::RunType::Block);
}

Expand Down
Loading

0 comments on commit 9898908

Please sign in to comment.