Skip to content

Commit

Permalink
Overload: Reset expensive streams using byte accounting (envoyproxy#1…
Browse files Browse the repository at this point in the history
…7702)

Signed-off-by: Kevin Baichoo <kbaichoo@google.com>
  • Loading branch information
KBaichoo authored Sep 1, 2021
1 parent d001296 commit 7bf466c
Show file tree
Hide file tree
Showing 20 changed files with 567 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ The following overload actions are supported:
- Envoy will reduce the waiting period for a configured set of timeouts. See
:ref:`below <config_overload_manager_reducing_timeouts>` for details on configuration.

* - envoy.overload_actions.reset_high_memory_stream
- Envoy will reset expensive streams to terminate them. See
:ref:`below <config_overload_manager_reset_streams>` for details on configuration.

.. _config_overload_manager_reducing_timeouts:

Reducing timeouts
Expand Down Expand Up @@ -163,6 +167,85 @@ all listeners.

An example configuration can be found in the :ref:`edge best practices document <best_practices_edge>`.

.. _config_overload_manager_reset_streams:

Reset Streams
^^^^^^^^^^^^^

.. warning::
Resetting streams via an overload action currently only works with HTTP2.

The ``envoy.overload_actions.reset_high_memory_stream`` overload action will reset
expensive streams. This requires `minimum_account_to_track_power_of_two` to be
configured via :ref:`buffer_factory_config
<envoy_v3_api_field_config.overload.v3.OverloadManager.buffer_factory_config>`.
To understand the memory class scheme in detail see :ref:`minimum_account_to_track_power_of_two
<envoy_v3_api_field_config.overload.v3.BufferFactoryConfig.minimum_account_to_track_power_of_two>`

As an example, here is a partial Overload Manager configuration with minimum
threshold for tracking and a single overload action entry that resets streams:

.. code-block:: yaml
buffer_factory_config:
minimum_account_to_track_power_of_two: 20
actions:
name: "envoy.overload_actions.reset_high_memory_stream"
triggers:
- name: "envoy.resource_monitors.fixed_heap"
scaled:
scaling_threshold: 0.85
saturation_threshold: 0.95
...
We will only track streams using >=
:math:`2^minimum_account_to_track_power_of_two` worth of allocated memory in
buffers. In this case, by setting the `minimum_account_to_track_power_of_two`
to `20` we will track streams using >= 1MiB since :math:`2^20` is 1MiB. Streams
using >= 1MiB will be classified into 8 power of two sized buckets. Currently,
the number of buckets is hardcoded to 8. For this example, the buckets are as
follows:

.. list-table::
:header-rows: 1
:widths: 1, 2

* - Bucket index
- Contains streams using
* - 0
- [1MiB,2MiB)
* - 1
- [2MiB,4MiB)
* - 2
- [4MiB,8MiB)
* - 3
- [8MiB,16MiB)
* - 4
- [16MiB,32MiB)
* - 5
- [32MiB,64MiB)
* - 6
- [64MiB,128MiB)
* - 7
- >= 128MiB

The above configuration also configures the overload manager to reset our tracked
streams based on heap usage as a trigger. When the heap usage is less than 85%,
no streams will be reset. When heap usage is at or above 85%, we start to
reset buckets according to the strategy described below. When the heap
usage is at 95% all streams using >= 1MiB memory are eligible for reset.

Given that there are only 8 buckets, we partition the space with a gradation of
:math:`gradation = (saturation_threshold - scaling_threshold)/8`. Hence at 85%
heap usage we reset streams in the last bucket e.g. those using `>= 128MiB`. At
:math:`85% + 1 * gradation` heap usage we reset streams in the last two buckets
e.g. those using `>= 64MiB`. And so forth as the heap usage is higher.

It's expected that the first few gradations shouldn't trigger anything, unless
there's something seriously wrong e.g. in this example streams using `>=
128MiB` in buffers.


Statistics
----------

Expand Down
1 change: 1 addition & 0 deletions docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ New Features
* http: sanitizing the referer header as documented :ref:`here <config_http_conn_man_headers_referer>`. This feature can be temporarily turned off by setting runtime guard ``envoy.reloadable_features.sanitize_http_header_referer`` to false.
* jwt_authn: added support for :ref:`Jwt Cache <envoy_v3_api_field_extensions.filters.http.jwt_authn.v3.JwtProvider.jwt_cache_config>` and its size can be specified by :ref:`jwt_cache_size <envoy_v3_api_field_extensions.filters.http.jwt_authn.v3.JwtCacheConfig.jwt_cache_size>`.
* listener: new listener metric ``downstream_cx_transport_socket_connect_timeout`` to track transport socket timeouts.
* overload: add a new overload action that resets streams using a lot of memory. To enable the tracking of allocated bytes in buffers that a stream is using we need to configure the minimum threshold for tracking via:ref:`buffer_factory_config <envoy_v3_api_field_config.overload.v3.OverloadManager.buffer_factory_config>`. We have an overload action ``Envoy::Server::OverloadActionNameValues::ResetStreams`` that takes advantage of the tracking to reset the most expensive stream first.
* rbac: added :ref:`destination_port_range <envoy_v3_api_field_config.rbac.v3.Permission.destination_port_range>` for matching range of destination ports.
* route config: added :ref:`dynamic_metadata <envoy_v3_api_field_config.route.v3.RouteMatch.dynamic_metadata>` for routing based on dynamic metadata.
* thrift_proxy: added support for :ref:`mirroring requests <envoy_v3_api_field_extensions.filters.network.thrift_proxy.v3.RouteAction.request_mirror_policies>`.
Expand Down
13 changes: 12 additions & 1 deletion envoy/buffer/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -518,9 +518,20 @@ class WatermarkFactory {
*
* @param reset_handler supplies the stream_reset_handler the account will
* invoke to reset the stream.
* @return a BufferMemoryAccountSharedPtr of the newly created account.
* @return a BufferMemoryAccountSharedPtr of the newly created account or
* nullptr if tracking is disabled.
*/
virtual BufferMemoryAccountSharedPtr createAccount(Http::StreamResetHandler& reset_handler) PURE;

/**
* Goes through the tracked accounts, resetting the accounts and their
* corresponding stream depending on the pressure.
*
* @param pressure scaled threshold pressure used to compute the buckets to
* reset internally.
* @return the number of streams reset
*/
virtual uint64_t resetAccountsGivenPressure(float pressure) PURE;
};

using WatermarkFactoryPtr = std::unique_ptr<WatermarkFactory>;
Expand Down
14 changes: 14 additions & 0 deletions envoy/server/overload/overload_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,24 @@ class OverloadActionNameValues {

// Overload action to reduce some subset of configured timeouts.
const std::string ReduceTimeouts = "envoy.overload_actions.reduce_timeouts";

// Overload action to reset streams using excessive memory.
const std::string ResetStreams = "envoy.overload_actions.reset_high_memory_stream";
};

using OverloadActionNames = ConstSingleton<OverloadActionNameValues>;

/**
* Well-known overload action stats.
*/
class OverloadActionStatsNameValues {
public:
// Count of ther number of streams the reset streams action has reset
const std::string ResetStreamsCount = "envoy.overload_actions.reset_high_memory_stream.count";
};

using OverloadActionStatsNames = ConstSingleton<OverloadActionStatsNameValues>;

/**
* The OverloadManager protects the Envoy instance from being overwhelmed by client
* requests. It monitors a set of resources and notifies registered listeners if
Expand Down
82 changes: 62 additions & 20 deletions source/common/buffer/watermark_buffer.cc
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
#include "source/common/buffer/watermark_buffer.h"
#include "watermark_buffer.h"

#include <cstdint>
#include <memory>

#include "envoy/buffer/buffer.h"

#include "source/common/common/assert.h"
#include "source/common/common/logger.h"
#include "source/common/runtime/runtime_features.h"

namespace Envoy {
Expand Down Expand Up @@ -147,38 +150,77 @@ void WatermarkBuffer::checkHighAndOverflowWatermarks() {

BufferMemoryAccountSharedPtr
WatermarkBufferFactory::createAccount(Http::StreamResetHandler& reset_handler) {
if (bitshift_ == kEffectivelyDisableTrackingBitshift) {
return nullptr; // No tracking
}
return BufferMemoryAccountImpl::createAccount(this, reset_handler);
}

void WatermarkBufferFactory::updateAccountClass(const BufferMemoryAccountSharedPtr& account,
int current_class, int new_class) {
absl::optional<uint32_t> current_class,
absl::optional<uint32_t> new_class) {
ASSERT(current_class != new_class, "Expected the current_class and new_class to be different");

if (current_class == -1 && new_class >= 0) {
if (!current_class.has_value()) {
// Start tracking
ASSERT(!size_class_account_sets_[new_class].contains(account));
size_class_account_sets_[new_class].insert(account);
} else if (current_class >= 0 && new_class == -1) {
ASSERT(new_class.has_value());
ASSERT(!size_class_account_sets_[new_class.value()].contains(account));
size_class_account_sets_[new_class.value()].insert(account);
} else if (!new_class.has_value()) {
// No longer track
ASSERT(size_class_account_sets_[current_class].contains(account));
size_class_account_sets_[current_class].erase(account);
ASSERT(current_class.has_value());
ASSERT(size_class_account_sets_[current_class.value()].contains(account));
size_class_account_sets_[current_class.value()].erase(account);
} else {
// Moving between buckets
ASSERT(size_class_account_sets_[current_class].contains(account));
ASSERT(!size_class_account_sets_[new_class].contains(account));
size_class_account_sets_[new_class].insert(
std::move(size_class_account_sets_[current_class].extract(account).value()));
ASSERT(size_class_account_sets_[current_class.value()].contains(account));
ASSERT(!size_class_account_sets_[new_class.value()].contains(account));
size_class_account_sets_[new_class.value()].insert(
std::move(size_class_account_sets_[current_class.value()].extract(account).value()));
}
}

void WatermarkBufferFactory::unregisterAccount(const BufferMemoryAccountSharedPtr& account,
int current_class) {
if (current_class >= 0) {
ASSERT(size_class_account_sets_[current_class].contains(account));
size_class_account_sets_[current_class].erase(account);
absl::optional<uint32_t> current_class) {
if (current_class.has_value()) {
ASSERT(size_class_account_sets_[current_class.value()].contains(account));
size_class_account_sets_[current_class.value()].erase(account);
}
}

uint64_t WatermarkBufferFactory::resetAccountsGivenPressure(float pressure) {
ASSERT(pressure >= 0.0 && pressure <= 1.0, "Provided pressure is out of range [0, 1].");

// Compute buckets to clear
const uint32_t buckets_to_clear = std::min<uint32_t>(
std::floor(pressure * BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_) + 1, 8);
uint32_t bucket_idx = BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_ - buckets_to_clear;

ENVOY_LOG_MISC(warn, "resetting streams in buckets >= {}", bucket_idx);
uint64_t num_streams_reset = 0;
// TODO(kbaichoo): Add a limit to the number of streams we reset
// per-invocation of this function.
// Clear buckets
while (bucket_idx < BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_) {
ENVOY_LOG_MISC(warn, "resetting {} streams in bucket {}.",
size_class_account_sets_[bucket_idx].size(), bucket_idx);

auto it = size_class_account_sets_[bucket_idx].begin();
while (it != size_class_account_sets_[bucket_idx].end()) {
auto next = std::next(it);
// This will trigger an erase, which avoids rehashing and invalidates the
// iterator *it*. *next* is still valid.
(*it)->resetDownstream();
it = next;
++num_streams_reset;
}

++bucket_idx;
}

return num_streams_reset;
}

WatermarkBufferFactory::WatermarkBufferFactory(
const envoy::config::overload::v3::BufferFactoryConfig& config)
: bitshift_(config.minimum_account_to_track_power_of_two()
Expand All @@ -205,19 +247,19 @@ BufferMemoryAccountImpl::createAccount(WatermarkBufferFactory* factory,
return account;
}

int BufferMemoryAccountImpl::balanceToClassIndex() {
absl::optional<uint32_t> BufferMemoryAccountImpl::balanceToClassIndex() {
const uint64_t shifted_balance = buffer_memory_allocated_ >> factory_->bitshift();

if (shifted_balance == 0) {
return -1; // Not worth tracking anything < configured minimum threshold
return {}; // Not worth tracking anything < configured minimum threshold
}

const int class_idx = absl::bit_width(shifted_balance) - 1;
return std::min<int>(class_idx, NUM_MEMORY_CLASSES_ - 1);
return std::min<uint32_t>(class_idx, NUM_MEMORY_CLASSES_ - 1);
}

void BufferMemoryAccountImpl::updateAccountClass() {
const int new_class = balanceToClassIndex();
auto new_class = balanceToClassIndex();
if (shared_this_ && new_class != current_bucket_idx_) {
factory_->updateAccountClass(shared_this_, current_bucket_idx_, new_class);
current_bucket_idx_ = new_class;
Expand All @@ -241,7 +283,7 @@ void BufferMemoryAccountImpl::clearDownstream() {
if (reset_handler_.has_value()) {
reset_handler_.reset();
factory_->unregisterAccount(shared_this_, current_bucket_idx_);
current_bucket_idx_ = -1;
current_bucket_idx_.reset();
shared_this_ = nullptr;
}
}
Expand Down
22 changes: 14 additions & 8 deletions source/common/buffer/watermark_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ class BufferMemoryAccountImpl : public BufferMemoryAccount {
static BufferMemoryAccountSharedPtr createAccount(WatermarkBufferFactory* factory,
Http::StreamResetHandler& reset_handler);
~BufferMemoryAccountImpl() override {
// The buffer_memory_allocated_ should always be zero on destruction, even
// if we triggered a reset of the downstream. This is because the destructor
// will only trigger when no entities have a pointer to the account, meaning
// any slices which charge and credit the account should have credited the
// account when they were deleted, maintaining this invariant.
ASSERT(buffer_memory_allocated_ == 0);
ASSERT(!reset_handler_.has_value());
}
Expand Down Expand Up @@ -129,15 +134,13 @@ class BufferMemoryAccountImpl : public BufferMemoryAccount {
// Returns the class index based off of the buffer_memory_allocated_
// This can differ with current_bucket_idx_ if buffer_memory_allocated_ was
// just modified.
// The class indexes returned are based on buckets of powers of two, if the
// account is above a minimum threshold. Returned class index range is [-1,
// NUM_MEMORY_CLASSES_).
int balanceToClassIndex();
// Returned class index, if present, is in the range [0, NUM_MEMORY_CLASSES_).
absl::optional<uint32_t> balanceToClassIndex();
void updateAccountClass();

uint64_t buffer_memory_allocated_ = 0;
// Current bucket index where the account is being tracked in.
int current_bucket_idx_ = -1;
absl::optional<uint32_t> current_bucket_idx_{};

WatermarkBufferFactory* factory_ = nullptr;

Expand Down Expand Up @@ -195,16 +198,19 @@ class WatermarkBufferFactory : public WatermarkFactory {
}

BufferMemoryAccountSharedPtr createAccount(Http::StreamResetHandler& reset_handler) override;
uint64_t resetAccountsGivenPressure(float pressure) override;

// Called by BufferMemoryAccountImpls created by the factory on account class
// updated.
void updateAccountClass(const BufferMemoryAccountSharedPtr& account, int current_class,
int new_class);
void updateAccountClass(const BufferMemoryAccountSharedPtr& account,
absl::optional<uint32_t> current_class,
absl::optional<uint32_t> new_class);

uint32_t bitshift() const { return bitshift_; }

// Unregister a buffer memory account.
virtual void unregisterAccount(const BufferMemoryAccountSharedPtr& account, int current_class);
virtual void unregisterAccount(const BufferMemoryAccountSharedPtr& account,
absl::optional<uint32_t> current_class);

protected:
// Enable subclasses to inspect the mapping.
Expand Down
16 changes: 6 additions & 10 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -273,16 +273,12 @@ RequestDecoder& ConnectionManagerImpl::newStream(ResponseEncoder& response_encod

ENVOY_CONN_LOG(debug, "new stream", read_callbacks_->connection());

// Set the account to start accounting if enabled. This is still a
// work-in-progress, and will be removed when other features using the
// accounting are implemented.
Buffer::BufferMemoryAccountSharedPtr downstream_stream_account;
if (Runtime::runtimeFeatureEnabled("envoy.test_only.per_stream_buffer_accounting")) {
// Create account, wiring the stream to use it.
auto& buffer_factory = read_callbacks_->connection().dispatcher().getWatermarkFactory();
downstream_stream_account = buffer_factory.createAccount(response_encoder.getStream());
response_encoder.getStream().setAccount(downstream_stream_account);
}
// Create account, wiring the stream to use it for tracking bytes.
// If tracking is disabled, the wiring becomes a NOP.
auto& buffer_factory = read_callbacks_->connection().dispatcher().getWatermarkFactory();
Buffer::BufferMemoryAccountSharedPtr downstream_stream_account =
buffer_factory.createAccount(response_encoder.getStream());
response_encoder.getStream().setAccount(downstream_stream_account);
ActiveStreamPtr new_stream(new ActiveStream(*this, response_encoder.getStream().bufferLimit(),
std::move(downstream_stream_account)));

Expand Down
2 changes: 0 additions & 2 deletions source/common/runtime/runtime_features.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,6 @@ constexpr const char* disabled_runtime_features[] = {
"envoy.reloadable_features.remove_legacy_json",
// Sentinel and test flag.
"envoy.reloadable_features.test_feature_false",
// TODO(kbaichoo): Remove when this is no longer test only.
"envoy.test_only.per_stream_buffer_accounting",
// Allows the use of ExtensionWithMatcher to wrap a HTTP filter with a match tree.
"envoy.reloadable_features.experimental_matching_api",
// When the runtime is flipped to true, use shared cache in getOrCreateRawAsyncClient method if
Expand Down
Loading

0 comments on commit 7bf466c

Please sign in to comment.