Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http: delaying attach pending requests #2871

Merged
merged 16 commits into from
Apr 24, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 36 additions & 7 deletions source/common/http/http1/conn_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ namespace Envoy {
namespace Http {
namespace Http1 {

ConnPoolImpl::ConnPoolImpl(Event::Dispatcher& dispatcher, Upstream::HostConstSharedPtr host,
Upstream::ResourcePriority priority,
const Network::ConnectionSocket::OptionsSharedPtr& options)
: dispatcher_(dispatcher), host_(host), priority_(priority), socket_options_(options),
upstream_ready_timer_(dispatcher_.createTimer([this]() { onUpstreamReady(); })) {}

ConnPoolImpl::~ConnPoolImpl() {
while (!ready_clients_.empty()) {
ready_clients_.front()->codec_client_->close();
Expand Down Expand Up @@ -180,7 +186,7 @@ void ConnPoolImpl::onConnectionEvent(ActiveClient& client, Network::ConnectionEv
// whether the client is in the ready list (connected) or the busy list (failed to connect).
if (event == Network::ConnectionEvent::Connected) {
conn_connect_ms_->complete();
processIdleClient(client);
processIdleClient(client, false);
}
}

Expand Down Expand Up @@ -209,25 +215,48 @@ void ConnPoolImpl::onResponseComplete(ActiveClient& client) {
host_->cluster().stats().upstream_cx_max_requests_.inc();
onDownstreamReset(client);
} else {
processIdleClient(client);
// Upstream connection might be closed right after response is complete. Setting delay=true
// here to attach pending requests in next dispatcher loop to handle that case.
// https://github.com/envoyproxy/envoy/issues/2715
processIdleClient(client, true);
}
}

void ConnPoolImpl::onUpstreamReady() {
upstream_ready_enabled_ = false;
while (!pending_requests_.empty() && !ready_clients_.empty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sanity check: this is simply deferring work we used to do in the last dispatcher loop to the next loop, so there's no corner case we'll end up batching together too much work and triggering the watchdog, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The chance of batching together is when multiple upstream connections are completing responses at same time (same epoll callback), I don't think there will will be too much work to trigger watchdog.

ActiveClient& client = *ready_clients_.front();
ENVOY_CONN_LOG(debug, "attaching to next request", *client.codec_client_);
// There is work to do so bind a request to the client and move it to the busy list. Pending
// requests are pushed onto the front, so pull from the back.
attachRequestToClient(client, pending_requests_.back()->decoder_,
pending_requests_.back()->callbacks_);
pending_requests_.pop_back();
client.moveBetweenLists(ready_clients_, busy_clients_);
}
}

void ConnPoolImpl::processIdleClient(ActiveClient& client) {
void ConnPoolImpl::processIdleClient(ActiveClient& client, bool delay) {
client.stream_wrapper_.reset();
if (pending_requests_.empty()) {
// There is nothing to service so just move the connection into the ready list.
if (pending_requests_.empty() || delay) {
// There is nothing to service or delayed processing is requested, so just move the connection
// into the ready list.
ENVOY_CONN_LOG(debug, "moving to ready", *client.codec_client_);
client.moveBetweenLists(busy_clients_, ready_clients_);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lizan Too early to mark the client ready in this branch. Expecting anther read attempt to see if there is EOF. This client could be immediately used by another down stream request.
At the next cycle, dispatcher always handle write attempt first, even there is EOF in the read buffer, that is a 503 for this upstream request.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the end of the upstream read event when we reach EOF the client will be removed so this is safe. https://github.com/envoyproxy/envoy/blob/v1.10.0/source/common/network/connection_impl.cc#L486

} else {
// There is work to do so bind a request to the client and move it to the busy list. Pending
// requests are pushed onto the front, so pull from the back.
// There is work to do immediately so bind a request to the client and move it to the busy list.
// Pending requests are pushed onto the front, so pull from the back.
ENVOY_CONN_LOG(debug, "attaching to next request", *client.codec_client_);
attachRequestToClient(client, pending_requests_.back()->decoder_,
pending_requests_.back()->callbacks_);
pending_requests_.pop_back();
}

if (delay && !pending_requests_.empty() && !upstream_ready_enabled_) {
upstream_ready_enabled_ = true;
upstream_ready_timer_->enableTimer(std::chrono::milliseconds(0));
}

checkForDrained();
}

Expand Down
8 changes: 5 additions & 3 deletions source/common/http/http1/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ class ConnPoolImpl : Logger::Loggable<Logger::Id::pool>, public ConnectionPool::
public:
ConnPoolImpl(Event::Dispatcher& dispatcher, Upstream::HostConstSharedPtr host,
Upstream::ResourcePriority priority,
const Network::ConnectionSocket::OptionsSharedPtr& options)
: dispatcher_(dispatcher), host_(host), priority_(priority), socket_options_(options) {}
const Network::ConnectionSocket::OptionsSharedPtr& options);

~ConnPoolImpl();

Expand Down Expand Up @@ -123,7 +122,8 @@ class ConnPoolImpl : Logger::Loggable<Logger::Id::pool>, public ConnectionPool::
void onDownstreamReset(ActiveClient& client);
void onPendingRequestCancel(PendingRequest& request);
void onResponseComplete(ActiveClient& client);
void processIdleClient(ActiveClient& client);
void onUpstreamReady();
void processIdleClient(ActiveClient& client, bool delay);

Stats::TimespanPtr conn_connect_ms_;
Event::Dispatcher& dispatcher_;
Expand All @@ -134,6 +134,8 @@ class ConnPoolImpl : Logger::Loggable<Logger::Id::pool>, public ConnectionPool::
std::list<DrainedCb> drained_callbacks_;
Upstream::ResourcePriority priority_;
const Network::ConnectionSocket::OptionsSharedPtr socket_options_;
Event::TimerPtr upstream_ready_timer_;
bool upstream_ready_enabled_{false};
};

/**
Expand Down
88 changes: 85 additions & 3 deletions test/common/http/http1/conn_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ namespace Http1 {
class ConnPoolImplForTest : public ConnPoolImpl {
public:
ConnPoolImplForTest(Event::MockDispatcher& dispatcher,
Upstream::ClusterInfoConstSharedPtr cluster)
Upstream::ClusterInfoConstSharedPtr cluster,
NiceMock<Event::MockTimer>* upstream_ready_timer)
: ConnPoolImpl(dispatcher, Upstream::makeTestHost(cluster, "tcp://127.0.0.1:9000"),
Upstream::ResourcePriority::Default, nullptr),
mock_dispatcher_(dispatcher) {}
mock_dispatcher_(dispatcher), mock_upstream_ready_timer_(upstream_ready_timer) {}

~ConnPoolImplForTest() {
EXPECT_EQ(0U, ready_clients_.size());
Expand Down Expand Up @@ -98,7 +99,19 @@ class ConnPoolImplForTest : public ConnPoolImpl {
EXPECT_CALL(*test_client.connect_timer_, enableTimer(_));
}

void expectEnableUpstreamReady() {
EXPECT_FALSE(upstream_ready_enabled_);
EXPECT_CALL(*mock_upstream_ready_timer_, enableTimer(_)).Times(1).RetiresOnSaturation();
}

void expectAndRunUpstreamReady() {
EXPECT_TRUE(upstream_ready_enabled_);
mock_upstream_ready_timer_->callback_();
EXPECT_FALSE(upstream_ready_enabled_);
}

Event::MockDispatcher& mock_dispatcher_;
NiceMock<Event::MockTimer>* mock_upstream_ready_timer_;
std::vector<TestCodecClient> test_clients_;
};

Expand All @@ -107,7 +120,9 @@ class ConnPoolImplForTest : public ConnPoolImpl {
*/
class Http1ConnPoolImplTest : public testing::Test {
public:
Http1ConnPoolImplTest() : conn_pool_(dispatcher_, cluster_) {}
Http1ConnPoolImplTest()
: upstream_ready_timer_(new NiceMock<Event::MockTimer>(&dispatcher_)),
conn_pool_(dispatcher_, cluster_, upstream_ready_timer_) {}

~Http1ConnPoolImplTest() {
// Make sure all gauges are 0.
Expand All @@ -118,6 +133,7 @@ class Http1ConnPoolImplTest : public testing::Test {

NiceMock<Event::MockDispatcher> dispatcher_;
std::shared_ptr<Upstream::MockClusterInfo> cluster_{new NiceMock<Upstream::MockClusterInfo>()};
NiceMock<Event::MockTimer>* upstream_ready_timer_;
ConnPoolImplForTest conn_pool_;
NiceMock<Runtime::MockLoader> runtime_;
};
Expand Down Expand Up @@ -437,6 +453,7 @@ TEST_F(Http1ConnPoolImplTest, MaxConnections) {
conn_pool_.test_clients_[0].connection_->raiseEvent(Network::ConnectionEvent::Connected);

// Finishing request 1 will immediately bind to request 2.
conn_pool_.expectEnableUpstreamReady();
EXPECT_CALL(*conn_pool_.test_clients_[0].codec_, newStream(_))
.WillOnce(DoAll(SaveArgAddress(&inner_decoder), ReturnRef(request_encoder)));
EXPECT_CALL(callbacks2.pool_ready_, ready());
Expand All @@ -445,6 +462,7 @@ TEST_F(Http1ConnPoolImplTest, MaxConnections) {
Http::HeaderMapPtr response_headers(new TestHeaderMapImpl{{":status", "200"}});
inner_decoder->decodeHeaders(std::move(response_headers), true);

conn_pool_.expectAndRunUpstreamReady();
callbacks2.outer_encoder_->encodeHeaders(TestHeaderMapImpl{}, true);
response_headers.reset(new TestHeaderMapImpl{{":status", "200"}});
inner_decoder->decodeHeaders(std::move(response_headers), true);
Expand All @@ -455,6 +473,67 @@ TEST_F(Http1ConnPoolImplTest, MaxConnections) {
dispatcher_.clearDeferredDeleteList();
}

/**
* Test when upstream closes connection without 'connection: close' like
* https://github.com/envoyproxy/envoy/pull/2715
*/
TEST_F(Http1ConnPoolImplTest, ConnectionCloseWithoutHeader) {
InSequence s;

// Request 1 should kick off a new connection.
NiceMock<Http::MockStreamDecoder> outer_decoder1;
ConnPoolCallbacks callbacks;
conn_pool_.expectClientCreate();
Http::ConnectionPool::Cancellable* handle = conn_pool_.newStream(outer_decoder1, callbacks);

EXPECT_NE(nullptr, handle);

// Request 2 should not kick off a new connection.
NiceMock<Http::MockStreamDecoder> outer_decoder2;
ConnPoolCallbacks callbacks2;
handle = conn_pool_.newStream(outer_decoder2, callbacks2);
EXPECT_EQ(1U, cluster_->stats_.upstream_cx_overflow_.value());

EXPECT_NE(nullptr, handle);

// Connect event will bind to request 1.
NiceMock<Http::MockStreamEncoder> request_encoder;
Http::StreamDecoder* inner_decoder;
EXPECT_CALL(*conn_pool_.test_clients_[0].codec_, newStream(_))
.WillOnce(DoAll(SaveArgAddress(&inner_decoder), ReturnRef(request_encoder)));
EXPECT_CALL(callbacks.pool_ready_, ready());

conn_pool_.test_clients_[0].connection_->raiseEvent(Network::ConnectionEvent::Connected);

// Finishing request 1 will schedule binding the connection to request 2.
conn_pool_.expectEnableUpstreamReady();

callbacks.outer_encoder_->encodeHeaders(TestHeaderMapImpl{}, true);
Http::HeaderMapPtr response_headers(new TestHeaderMapImpl{{":status", "200"}});
inner_decoder->decodeHeaders(std::move(response_headers), true);

// Cause the connection to go away.
conn_pool_.expectClientCreate();
EXPECT_CALL(conn_pool_, onClientDestroy());
conn_pool_.test_clients_[0].connection_->raiseEvent(Network::ConnectionEvent::RemoteClose);
dispatcher_.clearDeferredDeleteList();

conn_pool_.expectAndRunUpstreamReady();

EXPECT_CALL(*conn_pool_.test_clients_[1].codec_, newStream(_))
.WillOnce(DoAll(SaveArgAddress(&inner_decoder), ReturnRef(request_encoder)));
EXPECT_CALL(callbacks2.pool_ready_, ready());
conn_pool_.test_clients_[1].connection_->raiseEvent(Network::ConnectionEvent::Connected);

callbacks2.outer_encoder_->encodeHeaders(TestHeaderMapImpl{}, true);
response_headers.reset(new TestHeaderMapImpl{{":status", "200"}});
inner_decoder->decodeHeaders(std::move(response_headers), true);

EXPECT_CALL(conn_pool_, onClientDestroy());
conn_pool_.test_clients_[1].connection_->raiseEvent(Network::ConnectionEvent::RemoteClose);
dispatcher_.clearDeferredDeleteList();
}

/**
* Test when upstream sends us 'connection: close'
*/
Expand Down Expand Up @@ -537,8 +616,11 @@ TEST_F(Http1ConnPoolImplTest, ConcurrentConnections) {
ActiveTestRequest r3(*this, 0, ActiveTestRequest::Type::Pending);

// Finish r1, which gets r3 going.
conn_pool_.expectEnableUpstreamReady();
r3.expectNewStream();

r1.completeResponse(false);
conn_pool_.expectAndRunUpstreamReady();
r3.startRequest();

r2.completeResponse(false);
Expand Down
51 changes: 51 additions & 0 deletions test/integration/http_integration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -936,6 +936,57 @@ void HttpIntegrationTest::testIdleTimeoutWithTwoRequests() {
test_server_->waitForCounterGe("cluster.cluster_0.upstream_cx_idle_timeout", 1);
}

void HttpIntegrationTest::testUpstreamDisconnectWithTwoRequests() {
initialize();
fake_upstreams_[0]->set_allow_unexpected_disconnects(true);

codec_client_ = makeHttpConnection(lookupPort("http"));

// Request 1.
codec_client_->makeRequestWithBody(Http::TestHeaderMapImpl{{":method", "GET"},
{":path", "/test/long/url"},
{":scheme", "http"},
{":authority", "host"}},
1024, *response_);
waitForNextUpstreamRequest();

// Request 2.
IntegrationStreamDecoderPtr response2{new IntegrationStreamDecoder(*dispatcher_)};
IntegrationCodecClientPtr codec_client2 = makeHttpConnection(lookupPort("http"));
codec_client2->makeRequestWithBody(Http::TestHeaderMapImpl{{":method", "GET"},
{":path", "/test/long/url"},
{":scheme", "http"},
{":authority", "host"}},
512, *response2);

// Response 1.
upstream_request_->encodeHeaders(Http::TestHeaderMapImpl{{":status", "200"}}, false);
upstream_request_->encodeData(512, true);
fake_upstream_connection_->close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I may not have had sufficient caffeine, but how are we making sure that the test Envoy is receiving the FIN before assigning the next request? If there were a context switch between encodeData and the close() couldn't Envoy read the whole request and reassign the upstream before the close() occurs? We may need to unit test this rather than integration test. Alternately we could do a raw tcp connection, send the response and stray data in one write call. I think that'd guarantee no race and I'd hope any activity on the delayed-use connection would cause it to be removed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dispatcher is not running until next wait* call, so in the right next line waitForEndSream the dispatcher runs, the connection read 512 bytes and FIN in same event. So the request is not being scheduled before close() occurs.

Also added a unit test in conn_pool_test too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'd like to think this will be fine, we've just had a spate of macos integration flakes due to slightly different network semantics. We can just keep an eye out for test flakes and remove this if it causes problems now that we have a unit test. @zuercher just so it's on his radar.

response_->waitForEndStream();

EXPECT_TRUE(upstream_request_->complete());
EXPECT_TRUE(response_->complete());
EXPECT_STREQ("200", response_->headers().Status()->value().c_str());
test_server_->waitForCounterGe("cluster.cluster_0.upstream_cx_total", 1);
test_server_->waitForCounterGe("cluster.cluster_0.upstream_rq_200", 1);

// Response 2.
fake_upstream_connection_->waitForDisconnect();
fake_upstream_connection_.reset();
waitForNextUpstreamRequest();
upstream_request_->encodeHeaders(Http::TestHeaderMapImpl{{":status", "200"}}, false);
upstream_request_->encodeData(1024, true);
response2->waitForEndStream();
codec_client2->close();

EXPECT_TRUE(upstream_request_->complete());
EXPECT_TRUE(response2->complete());
EXPECT_STREQ("200", response2->headers().Status()->value().c_str());
test_server_->waitForCounterGe("cluster.cluster_0.upstream_cx_total", 2);
test_server_->waitForCounterGe("cluster.cluster_0.upstream_rq_200", 2);
}

void HttpIntegrationTest::testTwoRequests() {
initialize();

Expand Down
1 change: 1 addition & 0 deletions test/integration/http_integration.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ class HttpIntegrationTest : public BaseIntegrationTest {
void testIdleTimeoutBasic();
void testIdleTimeoutWithTwoRequests();
void testIdleTimerDisabled();
void testUpstreamDisconnectWithTwoRequests();
// HTTP/1 tests
void testBadFirstline();
void testMissingDelimiter();
Expand Down
4 changes: 4 additions & 0 deletions test/integration/integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ TEST_P(IntegrationTest, EnvoyProxyingLate100ContinueWithEncoderFilter) {

TEST_P(IntegrationTest, TwoRequests) { testTwoRequests(); }

TEST_P(IntegrationTest, UpstreamDisconnectWithTwoRequests) {
testUpstreamDisconnectWithTwoRequests();
}

TEST_P(IntegrationTest, RetryHittingBufferLimit) { testRetryHittingBufferLimit(); }

TEST_P(IntegrationTest, HittingDecoderFilterLimit) { testHittingDecoderFilterLimit(); }
Expand Down