Skip to content

Commit

Permalink
http: improving shadowing with upstream filters (envoyproxy#23067)
Browse files Browse the repository at this point in the history
Fixing a bug where the router was not properly shut down for shadowing with upstream filters.
Fixing a second bug where headers were latched too late, and request headers were not isolated from shadow headers

Testing: new integration tests
Docs Changes: n/a
Release Notes: n/a
Part of envoyproxy#10455

Signed-off-by: Alyssa Wilk <alyssar@chromium.org>
  • Loading branch information
alyssawilk authored Sep 13, 2022
1 parent edb6e2e commit 7ba4c86
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 26 deletions.
1 change: 1 addition & 0 deletions source/common/http/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class AsyncStreamImpl : public AsyncClient::Stream,
public:
AsyncStreamImpl(AsyncClientImpl& parent, AsyncClient::StreamCallbacks& callbacks,
const AsyncClient::StreamOptions& options);
~AsyncStreamImpl() override { router_.onDestroy(); }

// Http::AsyncClient::Stream
void sendHeaders(RequestHeaderMap& headers, bool end_stream) override;
Expand Down
11 changes: 8 additions & 3 deletions source/common/router/router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,7 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers,
const auto& policy_ref = *shadow_policy;
if (FilterUtility::shouldShadow(policy_ref, config_.runtime_, callbacks_->streamId())) {
active_shadow_policies_.push_back(std::cref(policy_ref));
shadow_headers_ = Http::createHeaderMap<Http::RequestHeaderMapImpl>(*downstream_headers_);
}
}
}
Expand Down Expand Up @@ -804,6 +805,10 @@ Http::FilterDataStatus Filter::decodeData(Buffer::Instance& data, bool end_strea
Http::FilterTrailersStatus Filter::decodeTrailers(Http::RequestTrailerMap& trailers) {
ENVOY_STREAM_LOG(debug, "router decoding trailers:\n{}", *callbacks_, trailers);

if (shadow_headers_) {
shadow_trailers_ = Http::createHeaderMap<Http::RequestTrailerMapImpl>(trailers);
}

// upstream_requests_.size() cannot be > 1 because that only happens when a per
// try timeout occurs with hedge_on_per_try_timeout enabled but the per
// try timeout timer is not started until onRequestComplete(). It could be zero
Expand Down Expand Up @@ -881,12 +886,12 @@ void Filter::maybeDoShadowing() {
}

Http::RequestMessagePtr request(new Http::RequestMessageImpl(
Http::createHeaderMap<Http::RequestHeaderMapImpl>(*downstream_headers_)));
Http::createHeaderMap<Http::RequestHeaderMapImpl>(*shadow_headers_)));
if (callbacks_->decodingBuffer()) {
request->body().add(*callbacks_->decodingBuffer());
}
if (downstream_trailers_) {
request->trailers(Http::createHeaderMap<Http::RequestTrailerMapImpl>(*downstream_trailers_));
if (shadow_trailers_) {
request->trailers(Http::createHeaderMap<Http::RequestTrailerMapImpl>(*shadow_trailers_));
}

auto options = Http::AsyncClient::RequestOptions()
Expand Down
2 changes: 2 additions & 0 deletions source/common/router/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,8 @@ class Filter : Logger::Loggable<Logger::Id::router>,
MetadataMatchCriteriaConstPtr metadata_match_;
std::function<void(Http::ResponseHeaderMap&)> modify_headers_;
std::vector<std::reference_wrapper<const ShadowPolicy>> active_shadow_policies_{};
std::unique_ptr<Http::RequestHeaderMap> shadow_headers_;
std::unique_ptr<Http::RequestTrailerMap> shadow_trailers_;
// The stream lifetime configured by request header.
absl::optional<std::chrono::milliseconds> dynamic_max_stream_duration_;
// list of cookies to add to upstream headers
Expand Down
14 changes: 4 additions & 10 deletions test/integration/filters/add_body_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,9 @@ class AddBodyStreamFilter : public Http::PassThroughFilter {
}

Http::FilterDataStatus decodeData(Buffer::Instance& data, bool end_stream) override {
// Ensure that decodeData is only called for HTTP/3 (where protocol is set at the
// connection level). In HTTP/3 the FIN arrives separately so we will get
// decodeData() with an empty body.
// decodeData is called for HTTP/3 where the FIN arrives separately from headers.
if (config_->where_to_add_body_ == test::integration::filters::AddBodyFilterConfig::DEFAULT) {
if (end_stream && decoder_callbacks_->connection()->streamInfo().protocol() &&
data.length() == 0u) {
if (end_stream && data.length() == 0u) {
data.add("body");
}
} else if (config_->where_to_add_body_ ==
Expand All @@ -83,12 +80,9 @@ class AddBodyStreamFilter : public Http::PassThroughFilter {
}

Http::FilterDataStatus encodeData(Buffer::Instance& data, bool end_stream) override {
// Ensure that encodeData is only called for HTTP/3 (where protocol is set at the
// connection level). In HTTP/3 the FIN arrives separately so we will get
// encodeData() with an empty body.
// encodeData is called for HTTP/3 where the FIN arrives separately from headers.
if (config_->where_to_add_body_ == test::integration::filters::AddBodyFilterConfig::DEFAULT) {
if (end_stream && decoder_callbacks_->connection()->streamInfo().protocol() &&
data.length() == 0) {
if (end_stream && data.length() == 0) {
data.add("body");
}
} else if (config_->where_to_add_body_ ==
Expand Down
96 changes: 83 additions & 13 deletions test/integration/shadow_policy_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,32 @@ namespace {
class ShadowPolicyIntegrationTest : public testing::TestWithParam<Network::Address::IpVersion>,
public HttpIntegrationTest {
public:
ShadowPolicyIntegrationTest() : HttpIntegrationTest(Http::CodecClient::Type::HTTP2, GetParam()) {
ShadowPolicyIntegrationTest() : HttpIntegrationTest(Http::CodecType::HTTP2, GetParam()) {
setUpstreamProtocol(Http::CodecType::HTTP2);
autonomous_upstream_ = true;
setUpstreamCount(2);
}

void intitialConfigSetup(const std::string& cluster_name, const std::string& cluster_header) {
config_helper_.addConfigModifier([](envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
config_helper_.addConfigModifier([this](envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
auto* cluster = bootstrap.mutable_static_resources()->add_clusters();
cluster->MergeFrom(bootstrap.static_resources().clusters()[0]);
cluster->set_name(std::string(Envoy::RepickClusterFilter::ClusterName));
ConfigHelper::setHttp2(*cluster);
if (cluster_with_custom_filter_.has_value()) {
auto* cluster =
bootstrap.mutable_static_resources()->mutable_clusters(*cluster_with_custom_filter_);

ConfigHelper::HttpProtocolOptions protocol_options =
MessageUtil::anyConvert<ConfigHelper::HttpProtocolOptions>(
(*cluster->mutable_typed_extension_protocol_options())
["envoy.extensions.upstreams.http.v3.HttpProtocolOptions"]);
protocol_options.add_http_filters()->set_name(filter_name_);
protocol_options.add_http_filters()->set_name("envoy.filters.http.upstream_codec");
(*cluster->mutable_typed_extension_protocol_options())
["envoy.extensions.upstreams.http.v3.HttpProtocolOptions"]
.PackFrom(protocol_options);
}
});

// Set the mirror policy with cluster header or cluster name.
Expand All @@ -39,20 +55,31 @@ class ShadowPolicyIntegrationTest : public testing::TestWithParam<Network::Addre
void sendRequestAndValidateResponse() {
codec_client_ = makeHttpConnection(lookupPort("http"));

Http::TestRequestHeaderMapImpl request_headers{{":method", "GET"},
{":path", "/test/long/url"},
{":scheme", "http"},
{":authority", "host"}};

IntegrationStreamDecoderPtr response =
sendRequestAndWaitForResponse(request_headers, 0, default_response_headers_, 0, 0);
EXPECT_TRUE(upstream_request_->complete());
EXPECT_EQ(0U, upstream_request_->bodyLength());
codec_client_->makeHeaderOnlyRequest(default_request_headers_);
ASSERT_TRUE(response->waitForEndStream());
EXPECT_TRUE(response->complete());
EXPECT_EQ("200", response->headers().getStatusValue());
EXPECT_EQ(0U, response->body().size());
if (filter_name_ != "add-body-filter") {
EXPECT_EQ(10U, response->body().size());
}
test_server_->waitForCounterEq("cluster.cluster_1.internal.upstream_rq_completed", 1);
test_server_->waitForCounterEq("cluster.cluster_1.internal.upstream_rq_completed", 1);

upstream_headers_ =
reinterpret_cast<AutonomousUpstream*>(fake_upstreams_[0].get())->lastRequestHeaders();
EXPECT_TRUE(upstream_headers_ != nullptr);
mirror_headers_ =
reinterpret_cast<AutonomousUpstream*>(fake_upstreams_[1].get())->lastRequestHeaders();
EXPECT_TRUE(mirror_headers_ != nullptr);

cleanupUpstreamAndDownstream();
}

absl::optional<int> cluster_with_custom_filter_;
std::string filter_name_ = "on-local-reply-filter";
std::unique_ptr<Http::TestRequestHeaderMapImpl> upstream_headers_;
std::unique_ptr<Http::TestRequestHeaderMapImpl> mirror_headers_;
};

INSTANTIATE_TEST_SUITE_P(IpVersions, ShadowPolicyIntegrationTest,
Expand All @@ -79,9 +106,52 @@ TEST_P(ShadowPolicyIntegrationTest, RequestMirrorPolicyWithClusterHeaderWithFilt

initialize();
sendRequestAndValidateResponse();
}

EXPECT_EQ(test_server_->counter("cluster.cluster_1.upstream_cx_total")->value(), 1);
EXPECT_EQ(test_server_->counter("cluster.cluster_0.upstream_cx_total")->value(), 1);
// Test request mirroring / shadowing with the original cluster having a local reply filter.
TEST_P(ShadowPolicyIntegrationTest, OriginalClusterWithLocalReply) {
intitialConfigSetup("cluster_1", "");
cluster_with_custom_filter_ = 0;
initialize();

codec_client_ = makeHttpConnection(lookupPort("http"));
auto response = codec_client_->makeHeaderOnlyRequest(default_request_headers_);
ASSERT_TRUE(response->waitForEndStream());
EXPECT_EQ("400", response->headers().getStatusValue());
}

// Test request mirroring / shadowing with the mirror cluster having a local reply filter.
TEST_P(ShadowPolicyIntegrationTest, MirrorClusterWithLocalReply) {
intitialConfigSetup("cluster_1", "");
cluster_with_custom_filter_ = 1;
initialize();

codec_client_ = makeHttpConnection(lookupPort("http"));
auto response = codec_client_->makeHeaderOnlyRequest(default_request_headers_);
ASSERT_TRUE(response->waitForEndStream());
EXPECT_EQ("200", response->headers().getStatusValue());
}

TEST_P(ShadowPolicyIntegrationTest, OriginalClusterWithAddBody) {
intitialConfigSetup("cluster_1", "");
cluster_with_custom_filter_ = 0;
filter_name_ = "add-body-filter";

initialize();
sendRequestAndValidateResponse();
EXPECT_EQ(upstream_headers_->getContentLengthValue(), "4");
EXPECT_EQ(mirror_headers_->getContentLengthValue(), "");
}

TEST_P(ShadowPolicyIntegrationTest, MirrorClusterWithAddBody) {
intitialConfigSetup("cluster_1", "");
cluster_with_custom_filter_ = 1;
filter_name_ = "add-body-filter";

initialize();
sendRequestAndValidateResponse();
EXPECT_EQ(upstream_headers_->getContentLengthValue(), "");
EXPECT_EQ(mirror_headers_->getContentLengthValue(), "4");
}

} // namespace
Expand Down

0 comments on commit 7ba4c86

Please sign in to comment.