Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upstream TCP connection buffer and read buffer limits (#150). #571

Merged
merged 5 commits into from
Mar 16, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion docs/configuration/cluster_manager/cluster.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Cluster
"name": "...",
"type": "...",
"connect_timeout_ms": "...",
"per_connection_buffer_limit_bytes": "...",
"lb_type": "...",
"hosts": [],
"service_name": "...",
Expand Down Expand Up @@ -37,6 +38,10 @@ connect_timeout_ms
*(required, integer)* The timeout for new network connections to hosts in the cluster specified
in milliseconds.

per_connection_buffer_limit_bytes
*(optional, integer)* Soft limit on size of the cluster's connections read and write buffers.
If unspecified, an implementation defined default is applied (1MiB).

lb_type
*(required, string)* The :ref:`load balancer type <arch_overview_load_balancing_types>` to use
when picking a host in the cluster. Possible options are *round_robin*, *least_request*,
Expand Down Expand Up @@ -172,7 +177,6 @@ outlier_detection
Each of the above configuration values can be overridden via
:ref:`runtime values <config_cluster_manager_cluster_runtime_outlier_detection>`.


.. toctree::
:hidden:

Expand Down
5 changes: 3 additions & 2 deletions docs/configuration/listeners/listeners.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ Each individual listener configuration has the following format:
"ssl_context": "{...}",
"bind_to_port": "...",
"use_proxy_proto": "...",
"use_original_dst": "..."
"use_original_dst": "...",
"per_connection_buffer_limit_bytes": "..."
}

port
Expand Down Expand Up @@ -54,7 +55,7 @@ use_original_dst

per_connection_buffer_limit_bytes
*(optional, integer)* Soft limit on size of the listener's new connection read and write buffers.
If unspecified, an implementation defined default is applied (1MB).
If unspecified, an implementation defined default is applied (1MiB).

.. toctree::
:hidden:
Expand Down
5 changes: 5 additions & 0 deletions include/envoy/network/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ class Connection : public Event::DeferredDeletable, public FilterManager {
* processing pipeline.
*/
virtual void setReadBufferLimit(uint32_t limit) PURE;

/**
* Get the value set with setReadBufferLimit.
*/
virtual uint32_t readBufferLimit() const PURE;
};

typedef std::unique_ptr<Connection> ConnectionPtr;
Expand Down
5 changes: 5 additions & 0 deletions include/envoy/upstream/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,11 @@ class ClusterInfo {
*/
virtual std::chrono::milliseconds connectTimeout() const PURE;

/**
* @return soft limit on size of the cluster's connections read and write buffers.
*/
virtual uint32_t perConnectionBufferLimitBytes() const PURE;

/**
* @return uint64_t features supported by the cluster. @see Features.
*/
Expand Down
5 changes: 5 additions & 0 deletions source/common/json/config_schemas.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1032,6 +1032,11 @@ const std::string Json::Schema::CLUSTER_SCHEMA(R"EOF(
"minimum" : 0,
"exclusiveMinimum" : true
},
"per_connection_buffer_limit_bytes" : {
"type" : "integer",
"minimum" : 0,
"exclusiveMinimum" : true
},
"lb_type" : {
"type" : "string",
"enum" : ["round_robin", "least_request", "random", "ring_hash"]
Expand Down
1 change: 1 addition & 0 deletions source/common/network/connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class ConnectionImpl : public virtual Connection,
State state() override;
void write(Buffer::Instance& data) override;
void setReadBufferLimit(uint32_t limit) override { read_buffer_limit_ = limit; }
uint32_t readBufferLimit() const override { return read_buffer_limit_; }

// Network::BufferSource
Buffer::Instance& getReadBuffer() override { return read_buffer_; }
Expand Down
19 changes: 11 additions & 8 deletions source/common/upstream/upstream_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,16 @@ Host::CreateConnectionData HostImpl::createConnection(Event::Dispatcher& dispatc
return {createConnection(dispatcher, *cluster_, address_), shared_from_this()};
}

Network::ClientConnectionPtr HostImpl::createConnection(Event::Dispatcher& dispatcher,
const ClusterInfo& cluster,
Network::Address::InstancePtr address) {
if (cluster.sslContext()) {
return Network::ClientConnectionPtr{
dispatcher.createSslClientConnection(*cluster.sslContext(), address)};
} else {
return Network::ClientConnectionPtr{dispatcher.createClientConnection(address)};
Network::ClientConnectionPtr
HostImpl::createConnection(Event::Dispatcher& dispatcher, const ClusterInfo& cluster,
Network::Address::InstancePtr address) const {
Network::ClientConnectionPtr connection =
cluster.sslContext() ? dispatcher.createSslClientConnection(*cluster.sslContext(), address)
: dispatcher.createClientConnection(address);
if (cluster_) {
Copy link
Member

@mattklein123 mattklein123 Mar 15, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't recall why this function is/was static, but you can use the passed in cluster, you don't need to use cluster_ (or check if nullptr, I don't think it can ever be nullptr).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's nicer, but there's a bug (probably just in tests?) that allows cluster_ to be null:

Program received signal SIGSEGV, Segmentation fault.
0x0000000001dfbbf8 in Upstream::HostImpl::createConnection (this=0x35cc8c0, dispatcher=..., cluster=..., address=...) at /source/source/common/upstream/upstream_impl.cc:36
36 connection->setReadBufferLimit(cluster.perConnectionBufferLimitBytes());
#0 0x0000000001dfbbf8 in Upstream::HostImpl::createConnection (this=0x35cc8c0, dispatcher=..., cluster=..., address=...) at /source/source/common/upstream/upstream_impl.cc:36
#1 0x0000000001dfba6d in Upstream::HostImpl::createConnection (this=0x35cc8c0, dispatcher=...) at /source/source/common/upstream/upstream_impl.cc:27
#2 0x0000000001dd490a in Upstream::HttpHealthCheckerImpl::HttpActiveHealthCheckSession::onInterval (this=0x33f3800) at /source/source/common/upstream/health_checker_impl.cc:227
#3 0x0000000001dd455a in Upstream::HttpHealthCheckerImpl::HttpActiveHealthCheckSession::HttpActiveHealthCheckSession (this=0x33f3800, parent=..., host=...) at /source/source/common/upstream/health_checker_impl.cc:193
#4 0x0000000001dd43d7 in Upstream::HttpHealthCheckerImpl::start (this=0x34b3400) at /source/source/common/upstream/health_checker_impl.cc:186
#5 0x000000000186d2d6 in Upstream::HttpHealthCheckerImplTest_Success_Test::TestBody (this=0x35e6c00) at /source/test/common/upstream/health_checker_impl_test.cc:169
#6 0x000000000205bcd6 in void testing::internal::HandleSehExceptionsInMethodIfSupported<testing::Test, void>(testing::Test*, void (testing::Test::)(), char const) ()
#7 0x0000000002056a18 in void testing::internal::HandleExceptionsInMethodIfSupported<testing::Test, void>(testing::Test*, void (testing::Test::)(), char const) ()
#8 0x000000000203d965 in testing::Test::Run() ()
#9 0x000000000203e1f1 in testing::TestInfo::Run() ()
#10 0x000000000203e882 in testing::TestCase::Run() ()
#11 0x000000000204502d in testing::internal::UnitTestImpl::RunAllTests() ()
#12 0x000000000205cdd6 in bool testing::internal::HandleSehExceptionsInMethodIfSupported<testing::internal::UnitTestImpl, bool>(testing::internal::UnitTestImpl*, bool (testing::internal::UnitTestImpl::)(), char const) ()
#13 0x00000000020576da in bool testing::internal::HandleExceptionsInMethodIfSupported<testing::internal::UnitTestImpl, bool>(testing::internal::UnitTestImpl*, bool (testing::internal::UnitTestImpl::)(), char const) ()
#14 0x0000000002043cb7 in testing::UnitTest::Run() ()
#15 0x00000000019845c4 in RUN_ALL_TESTS () at /thirdparty_build/include/gtest/gtest.h:2233
#16 0x0000000001983709 in main (argc=1, argv=0x7fffffffec18) at /source/test/main.cc:21
Starting epoch 0

Looking into it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I'm guessing that's just stupid test bug.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turns out it was the connection, not cluster that could be null, and that we should still test for.

connection->setReadBufferLimit(cluster_->perConnectionBufferLimitBytes());
}
return connection;
}

void HostImpl::weight(uint32_t new_weight) { weight_ = std::max(1U, std::min(100U, new_weight)); }
Expand All @@ -60,6 +61,8 @@ ClusterInfoImpl::ClusterInfoImpl(const Json::Object& config, Runtime::Loader& ru
: runtime_(runtime), name_(config.getString("name")),
max_requests_per_connection_(config.getInteger("max_requests_per_connection", 0)),
connect_timeout_(std::chrono::milliseconds(config.getInteger("connect_timeout_ms"))),
per_connection_buffer_limit_bytes_(
config.getInteger("per_connection_buffer_limit_bytes", 1024 * 1024)),
stats_scope_(stats.createScope(fmt::format("cluster.{}.", name_))),
stats_(generateStats(*stats_scope_)), features_(parseFeatures(config)),
http_codec_options_(Http::Utility::parseCodecOptions(config)),
Expand Down
10 changes: 7 additions & 3 deletions source/common/upstream/upstream_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,9 @@ class HostImpl : public HostDescriptionImpl,
void weight(uint32_t new_weight) override;

protected:
static Network::ClientConnectionPtr createConnection(Event::Dispatcher& dispatcher,
const ClusterInfo& cluster,
Network::Address::InstancePtr address);
Network::ClientConnectionPtr createConnection(Event::Dispatcher& dispatcher,
const ClusterInfo& cluster,
Network::Address::InstancePtr address) const;

private:
std::atomic<uint64_t> health_flags_{};
Expand Down Expand Up @@ -160,6 +160,9 @@ class ClusterInfoImpl : public ClusterInfo {

// Upstream::ClusterInfo
std::chrono::milliseconds connectTimeout() const override { return connect_timeout_; }
uint32_t perConnectionBufferLimitBytes() const override {
return per_connection_buffer_limit_bytes_;
}
uint64_t features() const override { return features_; }
uint64_t httpCodecOptions() const override { return http_codec_options_; }
LoadBalancerType lbType() const override { return lb_type_; }
Expand Down Expand Up @@ -189,6 +192,7 @@ class ClusterInfoImpl : public ClusterInfo {
const std::string name_;
const uint64_t max_requests_per_connection_;
const std::chrono::milliseconds connect_timeout_;
const uint32_t per_connection_buffer_limit_bytes_;
Stats::ScopePtr stats_scope_;
mutable ClusterStats stats_;
Ssl::ClientContextPtr ssl_ctx_;
Expand Down
17 changes: 17 additions & 0 deletions test/common/http/http1/conn_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,23 @@ TEST_F(Http1ConnPoolImplTest, VerifyTimingStats) {
dispatcher_.clearDeferredDeleteList();
}

/**
* Test that buffer limits are set.
*/
TEST_F(Http1ConnPoolImplTest, VerifyBufferLimits) {
NiceMock<Http::MockStreamDecoder> outer_decoder;
ConnPoolCallbacks callbacks;
conn_pool_.expectClientCreate();
EXPECT_CALL(*cluster_, perConnectionBufferLimitBytes()).WillOnce(Return(8192));
EXPECT_CALL(*conn_pool_.test_clients_.back().connection_, setReadBufferLimit(8192));
Http::ConnectionPool::Cancellable* handle = conn_pool_.newStream(outer_decoder, callbacks);
EXPECT_NE(nullptr, handle);

EXPECT_CALL(conn_pool_, onClientDestroy());
conn_pool_.test_clients_[0].connection_->raiseEvents(Network::ConnectionEvent::RemoteClose);
dispatcher_.clearDeferredDeleteList();
}

/**
* Tests a request that generates a new connection, completes, and then a second request that uses
* the same connection.
Expand Down
20 changes: 20 additions & 0 deletions test/common/http/http2/conn_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,26 @@ TEST_F(Http2ConnPoolImplTest, VerifyConnectionTimingStats) {
dispatcher_.clearDeferredDeleteList();
}

/**
* Test that buffer limits are set.
*/
TEST_F(Http2ConnPoolImplTest, VerifyBufferLimits) {
expectClientCreate();
EXPECT_CALL(*cluster_, perConnectionBufferLimitBytes()).WillOnce(Return(8192));
EXPECT_CALL(*test_clients_.back().connection_, setReadBufferLimit(8192));

ActiveTestRequest r1(*this, 0);
EXPECT_CALL(r1.inner_encoder_, encodeHeaders(_, true));
r1.callbacks_.outer_encoder_->encodeHeaders(HeaderMapImpl{}, true);
expectClientConnect(0);
EXPECT_CALL(r1.decoder_, decodeHeaders_(_, true));
r1.inner_decoder_->decodeHeaders(HeaderMapPtr{new HeaderMapImpl{}}, true);

test_clients_[0].connection_->raiseEvents(Network::ConnectionEvent::RemoteClose);
EXPECT_CALL(*this, onClientDestroy());
dispatcher_.clearDeferredDeleteList();
}

TEST_F(Http2ConnPoolImplTest, RequestAndResponse) {
InSequence s;

Expand Down
1 change: 1 addition & 0 deletions test/common/network/connection_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ class ReadBufferLimitTest : public testing::Test {
server_connection = std::move(conn);
server_connection->addReadFilter(read_filter);
EXPECT_EQ("", server_connection->nextProtocol());
EXPECT_EQ(read_buffer_limit, server_connection->readBufferLimit());
}));

uint32_t filter_seen = 0;
Expand Down
1 change: 1 addition & 0 deletions test/common/ssl/connection_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ class SslReadBufferLimitTest : public testing::Test {
server_connection = std::move(conn);
server_connection->addReadFilter(read_filter);
EXPECT_EQ("", server_connection->nextProtocol());
EXPECT_EQ(read_buffer_limit, server_connection->readBufferLimit());
}));

uint32_t filter_seen = 0;
Expand Down
28 changes: 28 additions & 0 deletions test/common/upstream/cluster_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,34 @@ TEST_F(ClusterManagerImplTest, UnknownCluster) {
factory_.tls_.shutdownThread();
}

/**
* Test that buffer limits are set on new TCP connections.
*/
TEST_F(ClusterManagerImplTest, VerifyBufferLimits) {
std::string json = R"EOF(
{
"clusters": [
{
"name": "cluster_1",
"connect_timeout_ms": 250,
"per_connection_buffer_limit_bytes": 8192,
"type": "static",
"lb_type": "round_robin",
"hosts": [{"url": "tcp://127.0.0.1:11001"}]
}]
}
)EOF";

Json::ObjectPtr loader = Json::Factory::LoadFromString(json);
create(*loader);
Network::MockClientConnection* connection = new NiceMock<Network::MockClientConnection>();
EXPECT_CALL(*connection, setReadBufferLimit(8192));
EXPECT_CALL(factory_.tls_.dispatcher_, createClientConnection_(_)).WillOnce(Return(connection));
auto conn_data = cluster_manager_->tcpConnForCluster("cluster_1");
EXPECT_EQ(connection, conn_data.connection_.get());
factory_.tls_.shutdownThread();
}

TEST_F(ClusterManagerImplTest, ShutdownOrder) {
std::string json = R"EOF(
{
Expand Down
2 changes: 2 additions & 0 deletions test/mocks/network/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class MockConnection : public Connection, public MockConnectionBase {
MOCK_METHOD0(state, State());
MOCK_METHOD1(write, void(Buffer::Instance& data));
MOCK_METHOD1(setReadBufferLimit, void(uint32_t limit));
MOCK_CONST_METHOD0(readBufferLimit, uint32_t());
};

/**
Expand Down Expand Up @@ -89,6 +90,7 @@ class MockClientConnection : public ClientConnection, public MockConnectionBase
MOCK_METHOD0(state, State());
MOCK_METHOD1(write, void(Buffer::Instance& data));
MOCK_METHOD1(setReadBufferLimit, void(uint32_t limit));
MOCK_CONST_METHOD0(readBufferLimit, uint32_t());

// Network::ClientConnection
MOCK_METHOD0(connect, void());
Expand Down
1 change: 1 addition & 0 deletions test/mocks/upstream/cluster_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class MockClusterInfo : public ClusterInfo {

// Upstream::ClusterInfo
MOCK_CONST_METHOD0(connectTimeout, std::chrono::milliseconds());
MOCK_CONST_METHOD0(perConnectionBufferLimitBytes, uint32_t());
MOCK_CONST_METHOD0(features, uint64_t());
MOCK_CONST_METHOD0(httpCodecOptions, uint64_t());
MOCK_CONST_METHOD0(lbType, LoadBalancerType());
Expand Down
35 changes: 35 additions & 0 deletions test/server/configuration_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,41 @@ TEST(ConfigurationImplTest, SetListenerPerConnectionBufferLimit) {
EXPECT_EQ(8192U, config.listeners().back()->perConnectionBufferLimitBytes());
}

TEST(ConfigurationImplTest, SetUpstreamClusterPerConnectionBufferLimit) {
std::string json = R"EOF(
{
"listeners" : [],
"cluster_manager": {
"clusters": [
{
"name": "test_cluster",
"type": "static",
"connect_timeout_ms": 1,
"per_connection_buffer_limit_bytes": 8192,
"lb_type": "round_robin",
"hosts": []
}
]
}
}
)EOF";

Json::ObjectPtr loader = Json::Factory::LoadFromString(json);

NiceMock<Server::MockInstance> server;
MainImpl config(server);
config.initialize(*loader);

ASSERT_EQ(1U, config.clusterManager().clusters().count("test_cluster"));
EXPECT_EQ(8192U, config.clusterManager()
.clusters()
.find("test_cluster")
->second.get()
.info()
->perConnectionBufferLimitBytes());
server.thread_local_.shutdownThread();
}

TEST(ConfigurationImplTest, BadListenerConfig) {
std::string json = R"EOF(
{
Expand Down