Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

outlier detection framework #142

Merged
merged 4 commits into from
Oct 12, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions include/envoy/upstream/host_description.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include "envoy/stats/stats_macros.h"
#include "envoy/upstream/outlier_detection.h"

namespace Upstream {

Expand Down Expand Up @@ -42,6 +43,11 @@ class HostDescription {
*/
virtual const Cluster& cluster() const PURE;

/**
* @return the host's outlier detection sink.
*/
virtual OutlierDetectorHostSink& outlierDetector() const PURE;

/**
* @return the URL used to connect to the host.
*/
Expand Down
53 changes: 53 additions & 0 deletions include/envoy/upstream/outlier_detection.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#pragma once

#include "envoy/common/pure.h"

namespace Upstream {

class Host;
typedef std::shared_ptr<Host> HostPtr;

/**
* Sink for per host data. Proxy filters should send pertinent data when available.
*/
class OutlierDetectorHostSink {
public:
virtual ~OutlierDetectorHostSink() {}

/**
* Add an HTTP response code for a host.
*/
virtual void putHttpResponseCode(uint64_t code) PURE;

/**
* Add a response time for a host (in this case response time is generic and might be used for
* different operations including HTTP, Mongo, Redis, etc.).
*/
virtual void putResponseTime(std::chrono::milliseconds time) PURE;
};

typedef std::unique_ptr<OutlierDetectorHostSink> OutlierDetectorHostSinkPtr;

/**
* Interface for an outlier detection engine. Uses per host data to determine which hosts in a
* cluster are outliers and should be ejected.
*/
class OutlierDetector {
public:
virtual ~OutlierDetector() {}

/**
* Outlier detection change state callback.
*/
typedef std::function<void(HostPtr host)> ChangeStateCb;

/**
* Add a changed state callback to the detector. The callback will be called whenever any host
* changes state (either ejected or brought back in) due to outlier status.
*/
virtual void addChangedStateCb(ChangeStateCb cb) PURE;
};

typedef std::unique_ptr<OutlierDetector> OutlierDetectorPtr;

} // Upstream
17 changes: 12 additions & 5 deletions include/envoy/upstream/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@

namespace Upstream {

class Host;
typedef std::shared_ptr<Host> HostPtr;
typedef std::shared_ptr<const Host> ConstHostPtr;

/**
* An upstream host.
*/
Expand All @@ -24,7 +20,9 @@ class Host : virtual public HostDescription {

enum class HealthFlag {
// The host is currently failing active health checks.
FAILED_ACTIVE_HC = 0x1
FAILED_ACTIVE_HC = 0x1,
// The host is currently considered an outlier and has been ejected.
FAILED_OUTLIER_CHECK = 0x02
};

/**
Expand Down Expand Up @@ -69,6 +67,13 @@ class Host : virtual public HostDescription {
*/
virtual bool healthy() const PURE;

/**
* Set the host's outlier detector. Outlier detectors are assumed to be thread safe, however
* a new outlier detector must be installed before the host is used across threads. Thus,
* this routine should only be called on the main thread before the host is used across threads.
*/
virtual void setOutlierDetector(OutlierDetectorHostSinkPtr&& outlier_detector) PURE;

/**
* @return the current load balancing weight of the host, in the range 1-100.
*/
Expand All @@ -80,6 +85,8 @@ class Host : virtual public HostDescription {
virtual void weight(uint32_t new_weight) PURE;
};

typedef std::shared_ptr<const Host> ConstHostPtr;

/**
* Base host set interface. This is used both for clusters, as well as per thread/worker host sets
* used during routing/forwarding.
Expand Down
1 change: 1 addition & 0 deletions source/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ add_library(
upstream/host_utility.cc
upstream/load_balancer_impl.cc
upstream/logical_dns_cluster.cc
upstream/outlier_detection_impl.cc
upstream/sds.cc
upstream/upstream_impl.cc
${gen_git_sha_target})
Expand Down
51 changes: 24 additions & 27 deletions source/common/http/codes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,33 +59,30 @@ void CodeUtility::chargeResponseStat(const ResponseStatInfo& info) {
}

void CodeUtility::chargeResponseTiming(const ResponseTimingInfo& info) {
if (DateUtil::timePointValid(info.request_send_time_)) {
std::chrono::milliseconds ms = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now() - info.request_send_time_);

info.store_.deliverTimingToSinks(info.prefix_ + "upstream_rq_time", ms);
if (info.upstream_canary_) {
info.store_.deliverTimingToSinks(info.prefix_ + "canary.upstream_rq_time", ms);
}

if (info.internal_request_) {
info.store_.deliverTimingToSinks(info.prefix_ + "internal.upstream_rq_time", ms);
} else {
info.store_.deliverTimingToSinks(info.prefix_ + "external.upstream_rq_time", ms);
}

if (!info.request_vcluster_name_.empty()) {
info.store_.deliverTimingToSinks("vhost." + info.request_vhost_name_ + ".vcluster." +
info.request_vcluster_name_ + ".upstream_rq_time",
ms);
}

// Handle per zone stats.
if (!info.from_zone_.empty() && !info.to_zone_.empty()) {
info.store_.deliverTimingToSinks(fmt::format("{}zone.{}.{}.upstream_rq_time", info.prefix_,
info.from_zone_, info.to_zone_),
ms);
}
info.store_.deliverTimingToSinks(info.prefix_ + "upstream_rq_time", info.response_time_);
if (info.upstream_canary_) {
info.store_.deliverTimingToSinks(info.prefix_ + "canary.upstream_rq_time", info.response_time_);
}

if (info.internal_request_) {
info.store_.deliverTimingToSinks(info.prefix_ + "internal.upstream_rq_time",
info.response_time_);
} else {
info.store_.deliverTimingToSinks(info.prefix_ + "external.upstream_rq_time",
info.response_time_);
}

if (!info.request_vcluster_name_.empty()) {
info.store_.deliverTimingToSinks("vhost." + info.request_vhost_name_ + ".vcluster." +
info.request_vcluster_name_ + ".upstream_rq_time",
info.response_time_);
}

// Handle per zone stats.
if (!info.from_zone_.empty() && !info.to_zone_.empty()) {
info.store_.deliverTimingToSinks(
fmt::format("{}zone.{}.{}.upstream_rq_time", info.prefix_, info.from_zone_, info.to_zone_),
info.response_time_);
}
}

Expand Down
3 changes: 1 addition & 2 deletions source/common/http/codes.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#pragma once

#include "envoy/common/time.h"
#include "envoy/http/codes.h"
#include "envoy/http/header_map.h"
#include "envoy/stats/stats.h"
Expand Down Expand Up @@ -40,7 +39,7 @@ class CodeUtility {
struct ResponseTimingInfo {
Stats::Store& store_;
const std::string& prefix_;
SystemTime request_send_time_;
std::chrono::milliseconds response_time_;
bool upstream_canary_;
bool internal_request_;
const std::string& request_vhost_name_;
Expand Down
22 changes: 15 additions & 7 deletions source/common/router/router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ void Filter::chargeUpstreamCode(const Http::HeaderMap& response_headers) {
bool is_canary = (response_headers.get(Http::Headers::get().EnvoyUpstreamCanary) == "true") ||
(upstream_host_ ? upstream_host_->canary() : false);

if (upstream_host_) {
upstream_host_->outlierDetector().putHttpResponseCode(
Http::Utility::getResponseStatus(response_headers));
}

Http::CodeUtility::ResponseStatInfo info{
config_.stats_store_, stat_prefix_, response_headers,
downstream_headers_->get(Http::Headers::get().EnvoyInternalRequest) == "true",
Expand Down Expand Up @@ -430,11 +435,16 @@ void Filter::onUpstreamComplete() {
upstream_request_->upstream_encoder_->resetStream();
}

if (config_.emit_dynamic_stats_ && !callbacks_->requestInfo().healthCheck()) {
if (config_.emit_dynamic_stats_ && !callbacks_->requestInfo().healthCheck() &&
DateUtil::timePointValid(upstream_request_->upstream_encoder_->requestCompleteTime())) {
std::chrono::milliseconds response_time = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now() -
upstream_request_->upstream_encoder_->requestCompleteTime());

upstream_host_->outlierDetector().putResponseTime(response_time);

Http::CodeUtility::ResponseTimingInfo info{
config_.stats_store_, stat_prefix_,
upstream_request_->upstream_encoder_->requestCompleteTime(),
upstream_request_->upstream_canary_,
config_.stats_store_, stat_prefix_, response_time, upstream_request_->upstream_canary_,
downstream_headers_->get(Http::Headers::get().EnvoyInternalRequest) == "true",
route_->virtualHostName(), request_vcluster_ ? request_vcluster_->name() : "",
config_.service_zone_, upstreamZone()};
Expand All @@ -443,9 +453,7 @@ void Filter::onUpstreamComplete() {

for (const std::string& alt_prefix : alt_stat_prefixes_) {
Http::CodeUtility::ResponseTimingInfo info{
config_.stats_store_, alt_prefix,
upstream_request_->upstream_encoder_->requestCompleteTime(),
upstream_request_->upstream_canary_,
config_.stats_store_, alt_prefix, response_time, upstream_request_->upstream_canary_,
downstream_headers_->get(Http::Headers::get().EnvoyInternalRequest) == "true", "", "",
config_.service_zone_, upstreamZone()};

Expand Down
2 changes: 2 additions & 0 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ void ClusterManagerImpl::loadCluster(const Json::Object& cluster, Stats::Store&
}
}

new_cluster->setOutlierDetector(OutlierDetectorImplFactory::createForCluster(
*new_cluster, cluster, dns_resolver.dispatcher()));
primary_clusters_.emplace(new_cluster->name(), new_cluster);
}

Expand Down
3 changes: 3 additions & 0 deletions source/common/upstream/logical_dns_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ class LogicalDnsCluster : public ClusterImplBase {
// Upstream:HostDescription
bool canary() const override { return false; }
const Cluster& cluster() const override { return logical_host_->cluster(); }
OutlierDetectorHostSink& outlierDetector() const override {
return logical_host_->outlierDetector();
}
const HostStats& stats() const override { return logical_host_->stats(); }
const std::string& url() const override { return url_; }
const std::string& zone() const override { return EMPTY_STRING; }
Expand Down
44 changes: 44 additions & 0 deletions source/common/upstream/outlier_detection_impl.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#include "outlier_detection_impl.h"

#include "common/common/assert.h"

namespace Upstream {

OutlierDetectorPtr OutlierDetectorImplFactory::createForCluster(Cluster& cluster,
const Json::Object& cluster_config,
Event::Dispatcher& dispatcher) {
// Right now we don't support any configuration but in order to make the config backwards
// compatible we just look for an empty object.
if (cluster_config.hasObject("outlier_detection")) {
return OutlierDetectorPtr{new OutlierDetectorImpl(cluster, dispatcher)};
} else {
return nullptr;
}
}

OutlierDetectorImpl::OutlierDetectorImpl(Cluster& cluster, Event::Dispatcher&) {
for (HostPtr host : cluster.hosts()) {
addHostSink(host);
}

cluster.addMemberUpdateCb([this](const std::vector<HostPtr>& hosts_added,
const std::vector<HostPtr>& hosts_removed) -> void {
for (HostPtr host : hosts_added) {
addHostSink(host);
}

for (HostPtr host : hosts_removed) {
ASSERT(host_sinks_.count(host) == 1);
host_sinks_.erase(host);
}
});
}

void OutlierDetectorImpl::addHostSink(HostPtr host) {
ASSERT(host_sinks_.count(host) == 0);
OutlierDetectorHostSinkImpl* sink = new OutlierDetectorHostSinkImpl();
host_sinks_[host] = sink;
host->setOutlierDetector(OutlierDetectorHostSinkPtr{sink});
}

} // Upstream
58 changes: 58 additions & 0 deletions source/common/upstream/outlier_detection_impl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#pragma once

#include "envoy/upstream/outlier_detection.h"
#include "envoy/upstream/upstream.h"

#include "common/json/json_loader.h"

namespace Upstream {

/**
* Null host sink implementation.
*/
class OutlierDetectorHostSinkNullImpl : public OutlierDetectorHostSink {
public:
// Upstream::OutlierDetectorHostSink
void putHttpResponseCode(uint64_t) override {}
void putResponseTime(std::chrono::milliseconds) override {}
};

/**
* Factory for creating a detector from a JSON configuration.
*/
class OutlierDetectorImplFactory {
public:
static OutlierDetectorPtr createForCluster(Cluster& cluster, const Json::Object& cluster_config,
Event::Dispatcher& dispatcher);
};

/**
* Implementation of OutlierDetectorHostSink for the generic detector.
*/
class OutlierDetectorHostSinkImpl : public OutlierDetectorHostSink {
public:
// Upstream::OutlierDetectorHostSink
void putHttpResponseCode(uint64_t) override {}
void putResponseTime(std::chrono::milliseconds) override {}
};

/**
* An implementation of an outlier detector. In the future we may support multiple outlier detection
* implementations with different configuration. For now, as we iterate everything is contained
* within this implementation.
*/
class OutlierDetectorImpl : public OutlierDetector {
public:
OutlierDetectorImpl(Cluster& cluster, Event::Dispatcher& dispatcher);

// Upstream::OutlierDetector
void addChangedStateCb(ChangeStateCb cb) override { callbacks_.push_back(cb); }

private:
void addHostSink(HostPtr host);

std::list<ChangeStateCb> callbacks_;
std::unordered_map<HostPtr, OutlierDetectorHostSinkImpl*> host_sinks_;
};

} // Upstream
2 changes: 1 addition & 1 deletion source/common/upstream/resource_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class ResourceManagerImpl : public ResourceManager {
const uint64_t max_;
std::atomic<uint64_t> current_{};
Runtime::Loader& runtime_;
std::string runtime_key_;
const std::string runtime_key_;
};

ResourceImpl connections_;
Expand Down
Loading