Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

health check: fix more fallout from inline deletion change #6988

Merged
merged 12 commits into from
May 21, 2019
22 changes: 14 additions & 8 deletions source/common/upstream/health_checker_base_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,11 @@ HealthCheckerImplBase::HealthCheckerImplBase(const Cluster& cluster,
}

HealthCheckerImplBase::~HealthCheckerImplBase() {
// Make sure that any sessions that were deferred deleted are cleared before we destruct.
dispatcher_.clearDeferredDeleteList();
// ASSERTs inside the session destructor check to make sure we have been previously deferred
// deleted. Unify that logic here before actual destruction happens.
for (auto& session : active_sessions_) {
session.second->onDeferredDeleteBase();
}
}

void HealthCheckerImplBase::decHealthy() {
Expand Down Expand Up @@ -226,19 +229,22 @@ HealthCheckerImplBase::ActiveHealthCheckSession::ActiveHealthCheckSession(
}

HealthCheckerImplBase::ActiveHealthCheckSession::~ActiveHealthCheckSession() {
if (!host_->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)) {
parent_.decHealthy();
}
if (host_->healthFlagGet(Host::HealthFlag::DEGRADED_ACTIVE_HC)) {
parent_.decDegraded();
}
// Make sure onDeferredDeleteBase() has been called. We should not reference our parent at this
// point since we may have been deferred deleted.
ASSERT(interval_timer_ == nullptr && timeout_timer_ == nullptr);
}

void HealthCheckerImplBase::ActiveHealthCheckSession::onDeferredDeleteBase() {
// The session is about to be deferred deleted. Make sure all timers are gone and any
// implementation specific state is destroyed.
interval_timer_.reset();
timeout_timer_.reset();
if (!host_->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)) {
parent_.decHealthy();
}
if (host_->healthFlagGet(Host::HealthFlag::DEGRADED_ACTIVE_HC)) {
parent_.decDegraded();
}
onDeferredDelete();
}

Expand Down
3 changes: 0 additions & 3 deletions source/common/upstream/health_checker_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ HttpHealthCheckerImpl::HttpActiveHealthCheckSession::HttpActiveHealthCheckSessio
local_address_(std::make_shared<Network::Address::Ipv4Instance>("127.0.0.1")) {}

HttpHealthCheckerImpl::HttpActiveHealthCheckSession::~HttpActiveHealthCheckSession() {
onDeferredDelete();
ASSERT(client_ == nullptr);
}

Expand Down Expand Up @@ -356,7 +355,6 @@ TcpHealthCheckerImpl::TcpHealthCheckerImpl(const Cluster& cluster,
receive_bytes_(TcpHealthCheckMatcher::loadProtoBytes(config.tcp_health_check().receive())) {}

TcpHealthCheckerImpl::TcpActiveHealthCheckSession::~TcpActiveHealthCheckSession() {
onDeferredDelete();
ASSERT(client_ == nullptr);
}

Expand Down Expand Up @@ -464,7 +462,6 @@ GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::GrpcActiveHealthCheckSessio
: ActiveHealthCheckSession(parent, host), parent_(parent) {}

GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::~GrpcActiveHealthCheckSession() {
onDeferredDelete();
ASSERT(client_ == nullptr);
}

Expand Down
1 change: 0 additions & 1 deletion source/extensions/health_checkers/redis/redis.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ RedisHealthChecker::RedisActiveHealthCheckSession::RedisActiveHealthCheckSession
: ActiveHealthCheckSession(parent, host), parent_(parent) {}

RedisHealthChecker::RedisActiveHealthCheckSession::~RedisActiveHealthCheckSession() {
onDeferredDelete();
ASSERT(current_request_ == nullptr);
ASSERT(client_ == nullptr);
}
Expand Down
3 changes: 0 additions & 3 deletions test/common/upstream/hds_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ TEST_F(HdsTest, TestProcessMessageHealthChecks) {
// Check Correctness
EXPECT_EQ(hds_delegate_->hdsClusters()[0]->healthCheckers().size(), 2);
EXPECT_EQ(hds_delegate_->hdsClusters()[1]->healthCheckers().size(), 3);
EXPECT_CALL(dispatcher_, clearDeferredDeleteList()).Times(5);
}

// Tests OnReceiveMessage given a minimal HealthCheckSpecifier message
Expand Down Expand Up @@ -325,8 +324,6 @@ TEST_F(HdsTest, TestSendResponseOneEndpointTimeout) {
.socket_address()
.port_value(),
1234);

EXPECT_CALL(dispatcher_, clearDeferredDeleteList());
}

} // namespace Upstream
Expand Down
1 change: 0 additions & 1 deletion test/common/upstream/health_checker_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ TEST(HealthCheckerFactoryTest, CreateGrpc) {
Event::MockDispatcher dispatcher;
AccessLog::MockAccessLogManager log_manager;

EXPECT_CALL(dispatcher, clearDeferredDeleteList());
EXPECT_NE(nullptr, dynamic_cast<GrpcHealthCheckerImpl*>(
HealthCheckerFactory::create(createGrpcHealthCheckConfig(), cluster,
runtime, random, dispatcher, log_manager)
Expand Down
33 changes: 28 additions & 5 deletions test/config/utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,8 @@ void ConfigHelper::finalize(const std::vector<uint32_t>& ports) {
*cluster->mutable_transport_socket(), tls_config);
}
}
ASSERT(port_idx == ports.size() || eds_hosts || custom_cluster);
ASSERT(port_idx == ports.size() || eds_hosts || custom_cluster ||
bootstrap_.dynamic_resources().has_cds_config());

if (!connect_timeout_set_) {
#ifdef __APPLE__
Expand Down Expand Up @@ -598,15 +599,31 @@ void ConfigHelper::addConfigModifier(HttpModifierFunction function) {
});
}

CdsHelper::CdsHelper() : cds_path_(TestEnvironment::writeStringToFileForTest("cds.pb_text", "")) {}

void CdsHelper::setCds(const std::vector<envoy::api::v2::Cluster>& clusters) {
// Write to file the DiscoveryResponse and trigger inotify watch.
envoy::api::v2::DiscoveryResponse cds_response;
cds_response.set_version_info(std::to_string(cds_version_++));
cds_response.set_type_url(Config::TypeUrl::get().Cluster);
for (const auto& cluster : clusters) {
cds_response.add_resources()->PackFrom(cluster);
}
// Past the initial write, need move semantics to trigger inotify move event that the
// FilesystemSubscriptionImpl is subscribed to.
std::string path =
TestEnvironment::writeStringToFileForTest("cds.update.pb_text", cds_response.DebugString());
TestUtility::renameFile(path, cds_path_);
}

EdsHelper::EdsHelper() : eds_path_(TestEnvironment::writeStringToFileForTest("eds.pb_text", "")) {
// cluster.cluster_0.update_success will be incremented on the initial
// load when Envoy comes up.
++update_successes_;
}

void EdsHelper::setEds(
const std::vector<envoy::api::v2::ClusterLoadAssignment>& cluster_load_assignments,
IntegrationTestServerStats& server_stats) {
const std::vector<envoy::api::v2::ClusterLoadAssignment>& cluster_load_assignments) {
// Write to file the DiscoveryResponse and trigger inotify watch.
envoy::api::v2::DiscoveryResponse eds_response;
eds_response.set_version_info(std::to_string(eds_version_++));
Expand All @@ -619,10 +636,16 @@ void EdsHelper::setEds(
std::string path =
TestEnvironment::writeStringToFileForTest("eds.update.pb_text", eds_response.DebugString());
TestUtility::renameFile(path, eds_path_);
}

void EdsHelper::setEdsAndWait(
const std::vector<envoy::api::v2::ClusterLoadAssignment>& cluster_load_assignments,
IntegrationTestServerStats& server_stats) {
setEds(cluster_load_assignments);
// Make sure Envoy has consumed the update now that it is running.
server_stats.waitForCounterGe("cluster.cluster_0.update_success", ++update_successes_);
++update_successes_;
server_stats.waitForCounterGe("cluster.cluster_0.update_success", update_successes_);
RELEASE_ASSERT(
update_successes_ == server_stats.counter("cluster.cluster_0.update_success")->value(), "");
}

} // namespace Envoy
19 changes: 17 additions & 2 deletions test/config/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,29 @@ class ConfigHelper {
bool finalized_{false};
};

class CdsHelper {
public:
CdsHelper();

// Set CDS contents on filesystem.
void setCds(const std::vector<envoy::api::v2::Cluster>& cluster);
const std::string& cds_path() const { return cds_path_; }

private:
const std::string cds_path_;
uint32_t cds_version_{};
};

// Common code for tests that deliver EDS update via the filesystem.
class EdsHelper {
public:
EdsHelper();

// Set EDS contents on filesystem and wait for Envoy to pick this up.
void setEds(const std::vector<envoy::api::v2::ClusterLoadAssignment>& cluster_load_assignments,
IntegrationTestServerStats& server_stats);
void setEds(const std::vector<envoy::api::v2::ClusterLoadAssignment>& cluster_load_assignments);
void
setEdsAndWait(const std::vector<envoy::api::v2::ClusterLoadAssignment>& cluster_load_assignments,
IntegrationTestServerStats& server_stats);
const std::string& eds_path() const { return eds_path_; }

private:
Expand Down
1 change: 0 additions & 1 deletion test/extensions/health_checkers/redis/config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ TEST(HealthCheckerFactoryTest, CreateRedisViaUpstreamHealthCheckerFactory) {
Event::MockDispatcher dispatcher;
AccessLog::MockAccessLogManager log_manager;

EXPECT_CALL(dispatcher, clearDeferredDeleteList());
EXPECT_NE(nullptr, dynamic_cast<CustomRedisHealthChecker*>(
Upstream::HealthCheckerFactory::create(
Upstream::parseHealthCheckFromV2Yaml(yaml), cluster, runtime, random,
Expand Down
127 changes: 93 additions & 34 deletions test/integration/eds_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ class EdsIntegrationTest : public testing::TestWithParam<Network::Address::IpVer
// filesystem delivery to simplify test mechanics.
void setEndpoints(uint32_t total_endpoints, uint32_t healthy_endpoints,
uint32_t degraded_endpoints, bool remaining_unhealthy = true,
absl::optional<uint32_t> overprovisioning_factor = absl::nullopt) {
absl::optional<uint32_t> overprovisioning_factor = absl::nullopt,
bool await_update = true) {
ASSERT(total_endpoints >= healthy_endpoints + degraded_endpoints);
envoy::api::v2::ClusterLoadAssignment cluster_load_assignment;
cluster_load_assignment.set_cluster_name("cluster_0");
Expand All @@ -45,42 +46,108 @@ class EdsIntegrationTest : public testing::TestWithParam<Network::Address::IpVer
: envoy::api::v2::core::HealthStatus::UNKNOWN);
}
}
eds_helper_.setEds({cluster_load_assignment}, *test_server_);

if (await_update) {
eds_helper_.setEdsAndWait({cluster_load_assignment}, *test_server_);
} else {
eds_helper_.setEds({cluster_load_assignment});
}
}

void initializeTest(bool http_active_hc) {
setUpstreamCount(4);
if (use_http2_hc_) {
setUpstreamProtocol(FakeHttpConnection::Type::HTTP2);
}
config_helper_.addConfigModifier([this](envoy::config::bootstrap::v2::Bootstrap& bootstrap) {
// Switch predefined cluster_0 to CDS filesystem sourcing.
bootstrap.mutable_dynamic_resources()->mutable_cds_config()->set_path(cds_helper_.cds_path());
bootstrap.mutable_static_resources()->clear_clusters();
});

// Set validate_clusters to false to allow us to reference a CDS cluster.
config_helper_.addConfigModifier(
[this, http_active_hc](envoy::config::bootstrap::v2::Bootstrap& bootstrap) {
// Switch predefined cluster_0 to EDS filesystem sourcing.
auto* cluster_0 = bootstrap.mutable_static_resources()->mutable_clusters(0);
cluster_0->mutable_hosts()->Clear();
cluster_0->set_type(envoy::api::v2::Cluster::EDS);
auto* eds_cluster_config = cluster_0->mutable_eds_cluster_config();
eds_cluster_config->mutable_eds_config()->set_path(eds_helper_.eds_path());
if (http_active_hc) {
auto* health_check = cluster_0->add_health_checks();
health_check->mutable_timeout()->set_seconds(30);
// TODO(mattklein123): Consider using simulated time here.
health_check->mutable_interval()->CopyFrom(
Protobuf::util::TimeUtil::MillisecondsToDuration(100));
health_check->mutable_no_traffic_interval()->CopyFrom(
Protobuf::util::TimeUtil::MillisecondsToDuration(100));
health_check->mutable_unhealthy_threshold()->set_value(1);
health_check->mutable_healthy_threshold()->set_value(1);
health_check->mutable_http_health_check()->set_path("/healthcheck");
}
});
[](envoy::config::filter::network::http_connection_manager::v2::HttpConnectionManager&
hcm) { hcm.mutable_route_config()->mutable_validate_clusters()->set_value(false); });

cluster_.mutable_connect_timeout()->CopyFrom(
Protobuf::util::TimeUtil::MillisecondsToDuration(100));
cluster_.set_name("cluster_0");
cluster_.mutable_hosts()->Clear();
cluster_.set_type(envoy::api::v2::Cluster::EDS);
auto* eds_cluster_config = cluster_.mutable_eds_cluster_config();
eds_cluster_config->mutable_eds_config()->set_path(eds_helper_.eds_path());
if (http_active_hc) {
auto* health_check = cluster_.add_health_checks();
health_check->mutable_timeout()->set_seconds(30);
// TODO(mattklein123): Consider using simulated time here.
health_check->mutable_interval()->CopyFrom(
Protobuf::util::TimeUtil::MillisecondsToDuration(100));
health_check->mutable_no_traffic_interval()->CopyFrom(
Protobuf::util::TimeUtil::MillisecondsToDuration(100));
health_check->mutable_unhealthy_threshold()->set_value(1);
health_check->mutable_healthy_threshold()->set_value(1);
health_check->mutable_http_health_check()->set_path("/healthcheck");
health_check->mutable_http_health_check()->set_use_http2(use_http2_hc_);
}
setEndpoints(0, 0, 0, true, absl::nullopt, false);
cds_helper_.setCds({cluster_});
initialize();
setEndpoints(0, 0, 0);
test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 0);
}

bool use_http2_hc_{};
EdsHelper eds_helper_;
CdsHelper cds_helper_;
envoy::api::v2::Cluster cluster_;
};

INSTANTIATE_TEST_SUITE_P(IpVersions, EdsIntegrationTest,
testing::ValuesIn(TestEnvironment::getIpVersionsForTest()));

// Verifies that a new cluster can we warmed when using HTTP/2 health checking. Regression test
// of the issue detailed in issue #6951.
TEST_P(EdsIntegrationTest, Http2HcClusterRewarming) {
use_http2_hc_ = true;
initializeTest(true);
fake_upstreams_[0]->set_allow_unexpected_disconnects(true);
setEndpoints(1, 0, 0, false);
EXPECT_EQ(1, test_server_->gauge("cluster.cluster_0.membership_total")->value());
EXPECT_EQ(0, test_server_->gauge("cluster.cluster_0.membership_healthy")->value());

// Wait for the first HC and verify the host is healthy. This should warm the initial cluster.
waitForNextUpstreamRequest();
upstream_request_->encodeHeaders(Http::TestHeaderMapImpl{{":status", "200"}}, true);
test_server_->waitForGaugeEq("cluster.cluster_0.membership_healthy", 1);
EXPECT_EQ(1, test_server_->gauge("cluster.cluster_0.membership_total")->value());

// Trigger a CDS update. This should cause a new cluster to require warming, blocked on the host
// being health checked.
cluster_.mutable_circuit_breakers()->add_thresholds()->mutable_max_connections()->set_value(100);
cds_helper_.setCds({cluster_});
test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 1);
EXPECT_EQ(1, test_server_->gauge("cluster_manager.warming_clusters")->value());

// We need to do a bunch of work to get a hold of second hc connection.
FakeHttpConnectionPtr fake_upstream_connection;
auto result = fake_upstreams_[0]->waitForHttpConnection(
*dispatcher_, fake_upstream_connection, TestUtility::DefaultTimeout, max_request_headers_kb_);
RELEASE_ASSERT(result, result.message());

FakeStreamPtr upstream_request;
result = fake_upstream_connection->waitForNewStream(*dispatcher_, upstream_request);
RELEASE_ASSERT(result, result.message());
// Wait for the stream to be completely received.
result = upstream_request->waitForEndStream(*dispatcher_);
RELEASE_ASSERT(result, result.message());

// Respond with a health check. This will cause the previous cluster to be destroyed inline as
// part of processing the response.
upstream_request->encodeHeaders(Http::TestHeaderMapImpl{{":status", "503"}}, true);
test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 0);
EXPECT_EQ(0, test_server_->gauge("cluster_manager.warming_clusters")->value());
}

// Verify that a host stabilized via active health checking which is first removed from EDS and
// then fails health checking is removed.
TEST_P(EdsIntegrationTest, RemoveAfterHcFail) {
Expand Down Expand Up @@ -111,11 +178,7 @@ TEST_P(EdsIntegrationTest, RemoveAfterHcFail) {

// Verifies that endpoints are ignored until health checked when configured to.
TEST_P(EdsIntegrationTest, EndpointWarmingSuccessfulHc) {
config_helper_.addConfigModifier([](envoy::config::bootstrap::v2::Bootstrap& bootstrap) {
// Switch predefined cluster_0 to EDS filesystem sourcing.
auto* cluster_0 = bootstrap.mutable_static_resources()->mutable_clusters(0);
cluster_0->mutable_common_lb_config()->set_ignore_new_hosts_until_first_hc(true);
});
cluster_.mutable_common_lb_config()->set_ignore_new_hosts_until_first_hc(true);

// Endpoints are initially excluded.
initializeTest(true);
Expand All @@ -138,11 +201,7 @@ TEST_P(EdsIntegrationTest, EndpointWarmingSuccessfulHc) {
// Verifies that endpoints are ignored until health checked when configured to when the first
// health check fails.
TEST_P(EdsIntegrationTest, EndpointWarmingFailedHc) {
config_helper_.addConfigModifier([](envoy::config::bootstrap::v2::Bootstrap& bootstrap) {
// Switch predefined cluster_0 to EDS filesystem sourcing.
auto* cluster_0 = bootstrap.mutable_static_resources()->mutable_clusters(0);
cluster_0->mutable_common_lb_config()->set_ignore_new_hosts_until_first_hc(true);
});
cluster_.mutable_common_lb_config()->set_ignore_new_hosts_until_first_hc(true);

// Endpoints are initially excluded.
initializeTest(true);
Expand Down Expand Up @@ -255,7 +314,7 @@ TEST_P(EdsIntegrationTest, BatchMemberUpdateCb) {
auto* endpoint = locality_lb_endpoints->add_lb_endpoints();
setUpstreamAddress(1, *endpoint);

eds_helper_.setEds({cluster_load_assignment}, *test_server_);
eds_helper_.setEdsAndWait({cluster_load_assignment}, *test_server_);

EXPECT_EQ(1, member_update_count);
}
Expand Down
2 changes: 1 addition & 1 deletion test/integration/load_stats_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class LoadStatsIntegrationTest : public testing::TestWithParam<Network::Address:
for (uint32_t index : p1_dragon_upstreams.endpoints_) {
addEndpoint(*dragon_p1, index, num_endpoints);
}
eds_helper_.setEds({cluster_load_assignment}, *test_server_);
eds_helper_.setEdsAndWait({cluster_load_assignment}, *test_server_);
}

void createUpstreams() override {
Expand Down