Skip to content

Commit

Permalink
internal redirect: fix a lifetime bug (envoyproxy#785)
Browse files Browse the repository at this point in the history
Signed-off-by: Pradeep Rao <pcrao@google.com>
  • Loading branch information
pradeepcrao authored and jacob-delgado committed May 26, 2022
1 parent c785832 commit 41cf495
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 15 deletions.
6 changes: 6 additions & 0 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1729,6 +1729,12 @@ void ConnectionManagerImpl::ActiveStream::recreateStream(
filter_state->parent(), StreamInfo::FilterState::LifeSpan::FilterChain);
}

// Make sure that relevant information makes it from the original stream info
// to the new one. Generally this should consist of all downstream related
// data, and not include upstream related data.
(*connection_manager_.streams_.begin())
->filter_manager_.streamInfo()
.setFromForRecreateStream(filter_manager_.streamInfo());
new_stream.decodeHeaders(std::move(request_headers_), !proxy_body);
if (proxy_body) {
// This functionality is currently only used for internal redirects, which the router only
Expand Down
7 changes: 3 additions & 4 deletions source/common/http/filter_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ Buffer::InstancePtr& ActiveStreamDecoderFilter::bufferedData() {
return parent_.buffered_request_data_;
}

bool ActiveStreamDecoderFilter::complete() { return parent_.state_.remote_decode_complete_; }
bool ActiveStreamDecoderFilter::complete() { return parent_.remoteDecodeComplete(); }

void ActiveStreamDecoderFilter::doHeaders(bool end_stream) {
parent_.decodeHeaders(this, *parent_.filter_manager_callbacks_.requestHeaders(), end_stream);
Expand Down Expand Up @@ -843,9 +843,8 @@ void FilterManager::decodeMetadata(ActiveStreamDecoderFilter* filter, MetadataMa
}

void FilterManager::maybeEndDecode(bool end_stream) {
ASSERT(!state_.remote_decode_complete_);
state_.remote_decode_complete_ = end_stream;
if (end_stream) {
// If recreateStream is called, the HCM rewinds state and may send more encodeData calls.
if (end_stream && !remoteDecodeComplete()) {
stream_info_.downstreamTiming().onLastDownstreamRxByteReceived(dispatcher().timeSource());
ENVOY_STREAM_LOG(debug, "request end stream", *this);
}
Expand Down
16 changes: 9 additions & 7 deletions source/common/http/filter_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -907,7 +907,10 @@ class FilterManager : public ScopeTrackedObject,
/**
* Whether remote processing has been marked as complete.
*/
bool remoteDecodeComplete() const { return state_.remote_decode_complete_; }
bool remoteDecodeComplete() const {
return stream_info_.downstreamTiming() &&
stream_info_.downstreamTiming()->lastDownstreamRxByteReceived().has_value();
}

/**
* Instructs the FilterManager to not create a filter chain. This makes it possible to issue
Expand Down Expand Up @@ -1058,15 +1061,14 @@ class FilterManager : public ScopeTrackedObject,

struct State {
State()
: remote_encode_complete_(false), remote_decode_complete_(false), local_complete_(false),
has_1xx_headers_(false), created_filter_chain_(false), is_head_request_(false),
is_grpc_request_(false), non_100_response_headers_encoded_(false),
under_on_local_reply_(false), decoder_filter_chain_aborted_(false),
encoder_filter_chain_aborted_(false), saw_downstream_reset_(false) {}
: remote_encode_complete_(false), local_complete_(false), has_1xx_headers_(false),
created_filter_chain_(false), is_head_request_(false), is_grpc_request_(false),
non_100_response_headers_encoded_(false), under_on_local_reply_(false),
decoder_filter_chain_aborted_(false), encoder_filter_chain_aborted_(false),
saw_downstream_reset_(false) {}
uint32_t filter_call_state_{0};

bool remote_encode_complete_ : 1;
bool remote_decode_complete_ : 1;
bool local_complete_ : 1; // This indicates that local is complete prior to filter processing.
// A filter can still stop the stream from being complete as seen
// by the codec.
Expand Down
6 changes: 4 additions & 2 deletions source/common/http/http1/codec_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1203,8 +1203,10 @@ ParserStatus ServerConnectionImpl::onMessageCompleteBase() {
}

void ServerConnectionImpl::onResetStream(StreamResetReason reason) {
active_request_->response_encoder_.runResetCallbacks(reason);
connection_.dispatcher().deferredDelete(std::move(active_request_));
if (active_request_) {
active_request_->response_encoder_.runResetCallbacks(reason);
connection_.dispatcher().deferredDelete(std::move(active_request_));
}
}

Status ServerConnectionImpl::sendProtocolError(absl::string_view details) {
Expand Down
18 changes: 16 additions & 2 deletions source/common/stream_info/stream_info_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -308,9 +308,23 @@ struct StreamInfoImpl : public StreamInfo {
ASSERT(downstream_bytes_meter_.get() == downstream_bytes_meter.get());
}

// This function is used to persist relevant information from the original
// stream into to the new one, when recreating the stream. Generally this
// includes information about the downstream stream, but not the upstream
// stream.
void setFromForRecreateStream(StreamInfo& info) {
downstream_timing_ = info.downstreamTiming();
protocol_ = info.protocol();
bytes_received_ = info.bytesReceived();
downstream_bytes_meter_ = info.getDownstreamBytesMeter();
// These two are set in the constructor, but to T(recreate), and should be T(create)
start_time_ = info.startTime();
start_time_monotonic_ = info.startTimeMonotonic();
}

TimeSource& time_source_;
const SystemTime start_time_;
const MonotonicTime start_time_monotonic_;
SystemTime start_time_;
MonotonicTime start_time_monotonic_;
absl::optional<MonotonicTime> final_time_;

absl::optional<Http::Protocol> protocol_;
Expand Down
28 changes: 28 additions & 0 deletions test/common/stream_info/stream_info_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,34 @@ TEST_F(StreamInfoImplTest, MiscSettersAndGetters) {
}
}

TEST_F(StreamInfoImplTest, SetFrom) {
StreamInfoImpl s1(Http::Protocol::Http2, test_time_.timeSystem(), nullptr);

s1.addBytesReceived(1);
s1.downstreamTiming().onLastDownstreamRxByteReceived(test_time_.timeSystem());

#ifdef __clang__
#if defined(__linux__)
#if defined(__has_feature) && !(__has_feature(thread_sanitizer))
ASSERT_TRUE(sizeof(s1) == 760 || sizeof(s1) == 776 || sizeof(s1) == 800)
<< "If adding fields to StreamInfoImpl, please check to see if you "
"need to add them to setFromForRecreateStream! Current size "
<< sizeof(s1);
#endif
#endif
#endif

StreamInfoImpl s2(Http::Protocol::Http11, test_time_.timeSystem(), nullptr);
s2.setFromForRecreateStream(s1);
EXPECT_EQ(s1.startTime(), s2.startTime());
EXPECT_EQ(s1.startTimeMonotonic(), s2.startTimeMonotonic());
EXPECT_EQ(s1.downstreamTiming().lastDownstreamRxByteReceived(),
s2.downstreamTiming().lastDownstreamRxByteReceived());
EXPECT_EQ(s1.protocol(), s2.protocol());
EXPECT_EQ(s1.bytesReceived(), s2.bytesReceived());
EXPECT_EQ(s1.getDownstreamBytesMeter(), s2.getDownstreamBytesMeter());
}

TEST_F(StreamInfoImplTest, DynamicMetadataTest) {
StreamInfoImpl stream_info(Http::Protocol::Http2, test_time_.timeSystem(), nullptr);

Expand Down
46 changes: 46 additions & 0 deletions test/integration/cds_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,52 @@ TEST_P(CdsIntegrationTest, TwoClusters) {
cleanupUpstreamAndDownstream();
}

// Test internal redirect to a cluster removed during the backend think time.
TEST_P(CdsIntegrationTest, TwoClustersAndRedirects) {
setDownstreamProtocol(Http::CodecType::HTTP1);
config_helper_.addConfigModifier(
[](envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager&
hcm) {
auto* route = hcm.mutable_route_config()->mutable_virtual_hosts(0)->mutable_routes(1);
route->mutable_route()
->mutable_internal_redirect_policy()
->mutable_redirect_response_codes()
->Add(302);
});

// Tell Envoy that cluster_2 is here.
initialize();
sendDiscoveryResponse<envoy::config::cluster::v3::Cluster>(
Config::TypeUrl::get().Cluster, {cluster1_, cluster2_}, {cluster2_}, {}, "42");
// The '3' includes the fake CDS server.
test_server_->waitForGaugeGe("cluster_manager.active_clusters", 3);
// Tell Envoy that cluster_1 is gone.
sendDiscoveryResponse<envoy::config::cluster::v3::Cluster>(Config::TypeUrl::get().Cluster,
{cluster2_}, {}, {ClusterName1}, "43");
test_server_->waitForCounterGe("cluster_manager.cluster_removed", 1);

codec_client_ = makeHttpConnection(makeClientConnection((lookupPort("http"))));
default_request_headers_.setPath("/cluster2");
default_request_headers_.setContentLength("4");
auto encoder_decoder = codec_client_->startRequest(default_request_headers_);
Buffer::OwnedImpl data("body");
encoder_decoder.first.encodeData(data, true);
auto& response = encoder_decoder.second;

ASSERT_TRUE(fake_upstreams_[UpstreamIndex2]->waitForHttpConnection(*dispatcher_,
fake_upstream_connection_));
ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_));
ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_));

Http::TestResponseHeaderMapImpl redirect_response{
{":status", "302"}, {"content-length", "0"}, {"location", "http://host/cluster1"}};

// Send a response to the original request redirecting to the deleted cluster.
upstream_request_->encodeHeaders(redirect_response, true);
ASSERT_TRUE(response->waitForEndStream());
EXPECT_EQ("503", response->headers().getStatusValue());
}

// Tests that when Envoy's delta xDS stream dis/reconnects, Envoy can inform the server of the
// resources it already has: the reconnected stream need not start with a state-of-the-world update.
TEST_P(CdsIntegrationTest, VersionsRememberedAfterReconnect) {
Expand Down

0 comments on commit 41cf495

Please sign in to comment.