Skip to content

Commit

Permalink
router: fixing a watermark bug for streaming retries (envoyproxy#10866)
Browse files Browse the repository at this point in the history
Fixes an issue where, if a retry was attempted when the upstream connection was watermark-overrun, data might spool upstream but reading from downstream would not resume. This is a preexisting design flaw which manifests now that we have streaming retries (causing them to time out rather than succeed, if the upstream buffer limit is smaller than the downstream buffer limit, and it backs up due to upstream slowness) because if the whole request is read, the loop of unwinding pause between requests takes care of it.

Risk Level: Medium (watermarks)
Testing: new unit test
Docs Changes: n/a
Release Notes: n/a

Signed-off-by: Alyssa Wilk <alyssar@chromium.org>
Signed-off-by: pengg <pengg@google.com>
  • Loading branch information
alyssawilk authored and penguingao committed Apr 22, 2020
1 parent 2f01698 commit ea3490a
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 5 deletions.
12 changes: 11 additions & 1 deletion source/common/router/upstream_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ UpstreamRequest::~UpstreamRequest() {
upstream_log->log(parent_.downstreamHeaders(), upstream_headers_.get(),
upstream_trailers_.get(), stream_info_);
}

while (downstream_data_disabled_ != 0) {
parent_.callbacks()->onDecoderFilterBelowWriteBufferLowWatermark();
parent_.cluster()->stats().upstream_flow_control_drained_total_.inc();
--downstream_data_disabled_;
}
}

void UpstreamRequest::decode100ContinueHeaders(Http::ResponseHeaderMapPtr&& headers) {
Expand Down Expand Up @@ -421,7 +427,6 @@ void UpstreamRequest::DownstreamWatermarkManager::onAboveWriteBufferHighWatermar
// can disable reads from upstream.
ASSERT(!parent_.parent_.finalUpstreamRequest() ||
&parent_ == parent_.parent_.finalUpstreamRequest());

// The downstream connection is overrun. Pause reads from upstream.
// If there are multiple calls to readDisable either the codec (H2) or the underlying
// Network::Connection (H1) will handle reference counting.
Expand Down Expand Up @@ -451,6 +456,7 @@ void UpstreamRequest::disableDataFromDownstreamForFlowControl() {
ASSERT(parent_.upstreamRequests().size() == 1 || parent_.downstreamEndStream());
parent_.cluster()->stats().upstream_flow_control_backed_up_total_.inc();
parent_.callbacks()->onDecoderFilterAboveWriteBufferHighWatermark();
++downstream_data_disabled_;
}

void UpstreamRequest::enableDataFromDownstreamForFlowControl() {
Expand All @@ -466,6 +472,10 @@ void UpstreamRequest::enableDataFromDownstreamForFlowControl() {
ASSERT(parent_.upstreamRequests().size() == 1 || parent_.downstreamEndStream());
parent_.cluster()->stats().upstream_flow_control_drained_total_.inc();
parent_.callbacks()->onDecoderFilterBelowWriteBufferLowWatermark();
ASSERT(downstream_data_disabled_ != 0);
if (downstream_data_disabled_ > 0) {
--downstream_data_disabled_;
}
}

void HttpConnPool::newStream(GenericConnectionPoolCallbacks* callbacks) {
Expand Down
2 changes: 2 additions & 0 deletions source/common/router/upstream_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ class UpstreamRequest : public Logger::Loggable<Logger::Id::router>,
Http::ResponseTrailerMapPtr upstream_trailers_;
Http::MetadataMapVector downstream_metadata_map_vector_;

// Tracks the number of times the flow of data from downstream has been disabled.
uint32_t downstream_data_disabled_{};
bool calling_encode_headers_ : 1;
bool upstream_canary_ : 1;
bool decode_complete_ : 1;
Expand Down
32 changes: 28 additions & 4 deletions test/common/router/router_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5718,9 +5718,10 @@ class WatermarkTest : public RouterTest {
.WillOnce(Return(std::chrono::milliseconds(0)));
EXPECT_CALL(callbacks_.dispatcher_, createTimer_(_)).Times(0);

EXPECT_CALL(stream_, addCallbacks(_)).WillOnce(Invoke([&](Http::StreamCallbacks& callbacks) {
stream_callbacks_ = &callbacks;
}));
EXPECT_CALL(stream_, addCallbacks(_))
.Times(num_add_callbacks_)
.WillOnce(
Invoke([&](Http::StreamCallbacks& callbacks) { stream_callbacks_ = &callbacks; }));
EXPECT_CALL(encoder_, getStream()).WillRepeatedly(ReturnRef(stream_));
EXPECT_CALL(cm_.conn_pool_, newStream(_, _))
.WillOnce(Invoke(
Expand Down Expand Up @@ -5751,6 +5752,7 @@ class WatermarkTest : public RouterTest {
Http::ResponseDecoder* response_decoder_ = nullptr;
Http::TestRequestHeaderMapImpl headers_;
Http::ConnectionPool::Callbacks* pool_callbacks_{nullptr};
int num_add_callbacks_{1};
};

TEST_F(WatermarkTest, DownstreamWatermarks) {
Expand Down Expand Up @@ -5830,7 +5832,29 @@ TEST_F(WatermarkTest, FilterWatermarks) {
.value());

sendResponse();
} // namespace Router
}

TEST_F(WatermarkTest, FilterWatermarksUnwound) {
num_add_callbacks_ = 0;
EXPECT_CALL(callbacks_, decoderBufferLimit()).Times(3).WillRepeatedly(Return(10));
router_.setDecoderFilterCallbacks(callbacks_);
// Send the headers sans-fin, and don't flag the pool as ready.
sendRequest(false, false);

// Send 11 bytes of body to fill the 10 byte buffer.
Buffer::OwnedImpl data("1234567890!");
router_.decodeData(data, false);
EXPECT_EQ(1U, cm_.thread_local_cluster_.cluster_.info_->stats_store_
.counter("upstream_flow_control_backed_up_total")
.value());

// Set up a pool failure, and make sure the flow control blockage is undone.
pool_callbacks_->onPoolFailure(Http::ConnectionPool::PoolFailureReason::RemoteConnectionFailure,
absl::string_view(), nullptr);
EXPECT_EQ(1U, cm_.thread_local_cluster_.cluster_.info_->stats_store_
.counter("upstream_flow_control_drained_total")
.value());
}

// Same as RetryRequestNotComplete but with decodeData larger than the buffer
// limit, no retry will occur.
Expand Down

0 comments on commit ea3490a

Please sign in to comment.