From 408576935f8e48d3dc9216aaa63661ae50d3fb66 Mon Sep 17 00:00:00 2001 From: Matt Klein Date: Thu, 17 Nov 2016 13:18:37 -0800 Subject: [PATCH 1/3] perf: simplify connection buffer stats We spend a considerable amount of time computing instantaneous connection buffer stats and firing callbacks. Currently we only use this information for stats purposes. This commit makes the buffer stats eventually consistent and much more streamlined. Eventually, we can provide direct accessors vs. callbacks if we need to inspect buffer information in order to apply back pressure. --- include/envoy/buffer/buffer.h | 14 -- include/envoy/network/connection.h | 22 ++-- source/common/buffer/buffer_impl.cc | 19 --- source/common/buffer/buffer_impl.h | 11 -- source/common/filter/auth/client_ssl.h | 1 - source/common/filter/ratelimit.h | 1 - source/common/filter/tcp_proxy.cc | 39 +++--- source/common/filter/tcp_proxy.h | 21 +-- source/common/http/codec_client.h | 5 +- source/common/http/conn_manager_impl.cc | 13 +- source/common/http/conn_manager_impl.h | 2 - source/common/http/http1/conn_pool.cc | 14 +- source/common/http/http1/conn_pool.h | 2 - source/common/http/http2/conn_pool.cc | 14 +- source/common/http/http2/conn_pool.h | 1 - source/common/mongo/proxy.h | 1 - source/common/network/connection_impl.cc | 128 +++++++++++++------ source/common/network/connection_impl.h | 36 +++++- source/common/network/utility.cc | 21 --- source/common/network/utility.h | 8 -- source/common/ssl/connection_impl.cc | 18 +-- source/common/ssl/connection_impl.h | 4 +- source/common/stats/statsd.h | 1 - source/common/upstream/health_checker_impl.h | 2 - source/server/connection_handler.h | 2 - test/common/network/connection_impl_test.cc | 51 +++++++- test/integration/fake_upstream.h | 1 - test/integration/integration.h | 2 - test/mocks/network/mocks.h | 3 +- test/mocks/stats/mocks.cc | 3 + test/mocks/stats/mocks.h | 15 +++ 31 files changed, 248 insertions(+), 227 deletions(-) diff --git a/include/envoy/buffer/buffer.h b/include/envoy/buffer/buffer.h index 1dc584b637e1..719d63a060d9 100644 --- a/include/envoy/buffer/buffer.h +++ b/include/envoy/buffer/buffer.h @@ -12,13 +12,6 @@ struct RawSlice { uint64_t len_; }; -/** - * Buffer change callback. - * @param old_size supplies the size of the buffer prior to the change. - * @param data supplies how much data was added or removed. - */ -typedef std::function Callback; - /** * A basic buffer abstraction. */ @@ -118,13 +111,6 @@ class Instance { */ virtual ssize_t search(const void* data, uint64_t size, size_t start) const PURE; - /** - * Set a buffer change callback. Only a single callback can be set at a time. The callback - * is invoked inline with buffer changes. - * @param callback supplies the callback to set. Pass nullptr to clear the callback. - */ - virtual void setCallback(Callback callback) PURE; - /** * Write the buffer out to a file descriptor. * @param fd supplies the descriptor to write to. diff --git a/include/envoy/network/connection.h b/include/envoy/network/connection.h index 3c3aa3dc736d..50bdc0b97f6a 100644 --- a/include/envoy/network/connection.h +++ b/include/envoy/network/connection.h @@ -34,14 +34,6 @@ class ConnectionCallbacks { public: virtual ~ConnectionCallbacks() {} - /** - * Callback for connection buffer changes. - * @param type supplies which buffer has changed. - * @param old_size supplies the original size of the buffer. - * @param delta supplies how much data was added or removed from the buffer. - */ - virtual void onBufferChange(ConnectionBufferType type, uint64_t old_size, int64_t delta) PURE; - /** * Callback for connection events. * @param events supplies the ConnectionEvent events that occurred as a bitmask. @@ -64,6 +56,13 @@ class Connection : public Event::DeferredDeletable, public FilterManager { public: enum class State { Open, Closing, Closed }; + struct BufferStats { + Stats::Counter& read_total_; + Stats::Gauge& read_current_; + Stats::Counter& write_total_; + Stats::Gauge& write_current_; + }; + virtual ~Connection() {} /** @@ -116,6 +115,13 @@ class Connection : public Event::DeferredDeletable, public FilterManager { */ virtual const std::string& remoteAddress() PURE; + /** + * Set the buffer stats to update when the connection's read/write buffers change. Note that + * for performance reasons these stats are eventually consistent and may not always accurately + * represent the buffer contents at any given point in time. + */ + virtual void setBufferStats(const BufferStats& stats) PURE; + /** * @return the SSL connection data if this is an SSL connection, or nullptr if it is not. */ diff --git a/source/common/buffer/buffer_impl.cc b/source/common/buffer/buffer_impl.cc index b1bc57192077..d3a09d2e932a 100644 --- a/source/common/buffer/buffer_impl.cc +++ b/source/common/buffer/buffer_impl.cc @@ -6,10 +6,6 @@ namespace Buffer { -const evbuffer_cb_func OwnedImpl::buffer_cb_ = - [](evbuffer*, const evbuffer_cb_info* info, void* arg) - -> void { static_cast(arg)->onBufferChange(*info); }; - void OwnedImpl::add(const void* data, uint64_t size) { evbuffer_add(buffer_.get(), data, size); } void OwnedImpl::add(const std::string& data) { @@ -78,10 +74,6 @@ void OwnedImpl::move(Instance& rhs, uint64_t length) { UNREFERENCED_PARAMETER(rc); } -void OwnedImpl::onBufferChange(const evbuffer_cb_info& info) { - cb_(info.orig_size, info.n_added - info.n_deleted); -} - int OwnedImpl::read(int fd, uint64_t max_length) { return evbuffer_read(buffer_.get(), fd, max_length); } @@ -108,17 +100,6 @@ ssize_t OwnedImpl::search(const void* data, uint64_t size, size_t start) const { return result_ptr.pos; } -void OwnedImpl::setCallback(Callback callback) { - ASSERT(!callback || !cb_); - if (callback) { - evbuffer_add_cb(buffer_.get(), buffer_cb_, this); - cb_ = callback; - } else { - evbuffer_remove_cb(buffer_.get(), buffer_cb_, this); - cb_ = nullptr; - } -} - int OwnedImpl::write(int fd) { return evbuffer_write(buffer_.get(), fd); } OwnedImpl::OwnedImpl() : buffer_(evbuffer_new()) {} diff --git a/source/common/buffer/buffer_impl.h b/source/common/buffer/buffer_impl.h index 7bba7d3c54db..63c7d02ddee1 100644 --- a/source/common/buffer/buffer_impl.h +++ b/source/common/buffer/buffer_impl.h @@ -4,10 +4,6 @@ #include "common/event/libevent.h" -// Forward decls to avoid leaking libevent headers to rest of program. -struct evbuffer_cb_info; -typedef void (*evbuffer_cb_func)(evbuffer* buffer, const evbuffer_cb_info* info, void* arg); - namespace Buffer { /** @@ -34,17 +30,10 @@ class OwnedImpl : public Instance { int read(int fd, uint64_t max_length) override; uint64_t reserve(uint64_t length, RawSlice* iovecs, uint64_t num_iovecs) override; ssize_t search(const void* data, uint64_t size, size_t start) const override; - void setCallback(Callback callback) override; int write(int fd) override; private: - void onBufferChange(const evbuffer_cb_info& info); - - static const evbuffer_cb_func buffer_cb_; // Static callback used for all evbuffer callbacks. - // This allows us to add/remove by value. - Event::Libevent::BufferPtr buffer_; - Callback cb_; // The per buffer callback. Invoked via the buffer_cb_ static thunk. }; } // Buffer diff --git a/source/common/filter/auth/client_ssl.h b/source/common/filter/auth/client_ssl.h index faae983ab0a0..087479cbf119 100644 --- a/source/common/filter/auth/client_ssl.h +++ b/source/common/filter/auth/client_ssl.h @@ -110,7 +110,6 @@ class Instance : public Network::ReadFilter, public Network::ConnectionCallbacks } // Network::ConnectionCallbacks - void onBufferChange(Network::ConnectionBufferType, uint64_t, int64_t) override {} void onEvent(uint32_t events) override; private: diff --git a/source/common/filter/ratelimit.h b/source/common/filter/ratelimit.h index 0f586d1848ac..f632c46072e3 100644 --- a/source/common/filter/ratelimit.h +++ b/source/common/filter/ratelimit.h @@ -74,7 +74,6 @@ class Instance : public Network::ReadFilter, } // Network::ConnectionCallbacks - void onBufferChange(Network::ConnectionBufferType, uint64_t, int64_t) override {} void onEvent(uint32_t events) override; // RateLimit::RequestCallbacks diff --git a/source/common/filter/tcp_proxy.cc b/source/common/filter/tcp_proxy.cc index a5fdf280963d..a1beabb0d3b5 100644 --- a/source/common/filter/tcp_proxy.cc +++ b/source/common/filter/tcp_proxy.cc @@ -45,6 +45,16 @@ TcpProxyStats TcpProxyConfig::generateStats(const std::string& name, Stats::Stor POOL_GAUGE_PREFIX(store, final_prefix))}; } +void TcpProxy::initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) { + read_callbacks_ = &callbacks; + conn_log_info("new tcp proxy session", read_callbacks_->connection()); + read_callbacks_->connection().addConnectionCallbacks(downstream_callbacks_); + read_callbacks_->connection().setBufferStats({config_->stats().downstream_cx_rx_bytes_total_, + config_->stats().downstream_cx_rx_bytes_buffered_, + config_->stats().downstream_cx_tx_bytes_total_, + config_->stats().downstream_cx_tx_bytes_buffered_}); +} + Network::FilterStatus TcpProxy::initializeUpstreamConnection() { Upstream::ResourceManager& upstream_cluster_resource_manager = cluster_manager_.get(config_->clusterName()) @@ -68,6 +78,11 @@ Network::FilterStatus TcpProxy::initializeUpstreamConnection() { upstream_connection_->addReadFilter(upstream_callbacks_); upstream_connection_->addConnectionCallbacks(*upstream_callbacks_); + upstream_connection_->setBufferStats( + {read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_rx_bytes_total_, + read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_rx_bytes_buffered_, + read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_tx_bytes_total_, + read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_tx_bytes_buffered_}); upstream_connection_->connect(); upstream_connection_->noDelay(true); @@ -103,18 +118,6 @@ Network::FilterStatus TcpProxy::onData(Buffer::Instance& data) { return Network::FilterStatus::StopIteration; } -void TcpProxy::onDownstreamBufferChange(Network::ConnectionBufferType type, uint64_t, - int64_t delta) { - if (type == Network::ConnectionBufferType::Write) { - if (delta > 0) { - config_->stats().downstream_cx_tx_bytes_total_.add(delta); - config_->stats().downstream_cx_tx_bytes_buffered_.add(delta); - } else { - config_->stats().downstream_cx_tx_bytes_buffered_.sub(std::abs(delta)); - } - } -} - void TcpProxy::onDownstreamEvent(uint32_t event) { if ((event & Network::ConnectionEvent::RemoteClose || event & Network::ConnectionEvent::LocalClose) && @@ -127,18 +130,6 @@ void TcpProxy::onDownstreamEvent(uint32_t event) { } } -void TcpProxy::onUpstreamBufferChange(Network::ConnectionBufferType type, uint64_t, int64_t delta) { - if (type == Network::ConnectionBufferType::Write) { - if (delta > 0) { - read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_tx_bytes_total_.add(delta); - read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_tx_bytes_buffered_.add(delta); - } else { - read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_tx_bytes_buffered_.sub( - std::abs(delta)); - } - } -} - void TcpProxy::onUpstreamData(Buffer::Instance& data) { read_callbacks_->connection().write(data); ASSERT(0 == data.length()); diff --git a/source/common/filter/tcp_proxy.h b/source/common/filter/tcp_proxy.h index a31413a6eed3..7730c73d6382 100644 --- a/source/common/filter/tcp_proxy.h +++ b/source/common/filter/tcp_proxy.h @@ -17,6 +17,8 @@ namespace Filter { */ // clang-format off #define ALL_TCP_PROXY_STATS(COUNTER, GAUGE) \ + COUNTER(downstream_cx_rx_bytes_total) \ + GAUGE (downstream_cx_rx_bytes_buffered) \ COUNTER(downstream_cx_tx_bytes_total) \ GAUGE (downstream_cx_tx_bytes_buffered) // clang-format on @@ -61,22 +63,13 @@ class TcpProxy : public Network::ReadFilter, Logger::Loggableconnection()); - read_callbacks_->connection().addConnectionCallbacks(downstream_callbacks_); - } + void initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) override; private: struct DownstreamCallbacks : public Network::ConnectionCallbacks { DownstreamCallbacks(TcpProxy& parent) : parent_(parent) {} // Network::ConnectionCallbacks - void onBufferChange(Network::ConnectionBufferType type, uint64_t old_size, - int64_t delta) override { - parent_.onDownstreamBufferChange(type, old_size, delta); - } - void onEvent(uint32_t event) override { parent_.onDownstreamEvent(event); } TcpProxy& parent_; @@ -87,11 +80,6 @@ class TcpProxy : public Network::ReadFilter, Logger::Loggable, */ StreamEncoder& newStream(StreamDecoder& response_decoder); + void setBufferStats(const Network::Connection::BufferStats& stats) { + connection_->setBufferStats(stats); + } + void setCodecClientCallbacks(CodecClientCallbacks& callbacks) { codec_client_callbacks_ = &callbacks; } @@ -180,7 +184,6 @@ class CodecClient : Logger::Loggable, void onData(Buffer::Instance& data); // Network::ConnectionCallbacks - void onBufferChange(Network::ConnectionBufferType, uint64_t, int64_t) override {} void onEvent(uint32_t events) override; std::list active_requests_; diff --git a/source/common/http/conn_manager_impl.cc b/source/common/http/conn_manager_impl.cc index 9e4a9e20dc0e..fbc8f8eb1067 100644 --- a/source/common/http/conn_manager_impl.cc +++ b/source/common/http/conn_manager_impl.cc @@ -57,6 +57,11 @@ void ConnectionManagerImpl::initializeReadFilterCallbacks(Network::ReadFilterCal [this]() -> void { onIdleTimeout(); }); idle_timer_->enableTimer(config_.idleTimeout().value()); } + + read_callbacks_->connection().setBufferStats({stats_.named_.downstream_cx_rx_bytes_total_, + stats_.named_.downstream_cx_rx_bytes_buffered_, + stats_.named_.downstream_cx_tx_bytes_total_, + stats_.named_.downstream_cx_tx_bytes_buffered_}); } ConnectionManagerImpl::~ConnectionManagerImpl() { @@ -189,14 +194,6 @@ Network::FilterStatus ConnectionManagerImpl::onData(Buffer::Instance& data) { return Network::FilterStatus::StopIteration; } -void ConnectionManagerImpl::onBufferChange(Network::ConnectionBufferType type, uint64_t, - int64_t delta) { - Network::Utility::updateBufferStats(type, delta, stats_.named_.downstream_cx_rx_bytes_total_, - stats_.named_.downstream_cx_rx_bytes_buffered_, - stats_.named_.downstream_cx_tx_bytes_total_, - stats_.named_.downstream_cx_tx_bytes_buffered_); -} - void ConnectionManagerImpl::resetAllStreams() { while (!streams_.empty()) { // Mimic a downstream reset in this case. diff --git a/source/common/http/conn_manager_impl.h b/source/common/http/conn_manager_impl.h index 7a4a6cbfa6af..e2e05268abb3 100644 --- a/source/common/http/conn_manager_impl.h +++ b/source/common/http/conn_manager_impl.h @@ -215,8 +215,6 @@ class ConnectionManagerImpl : Logger::Loggable, StreamDecoder& newStream(StreamEncoder& response_encoder) override; // Network::ConnectionCallbacks - void onBufferChange(Network::ConnectionBufferType type, uint64_t old_size, - int64_t delta) override; void onEvent(uint32_t events) override; private: diff --git a/source/common/http/http1/conn_pool.cc b/source/common/http/http1/conn_pool.cc index e65d6f4a0672..eb5e42425145 100644 --- a/source/common/http/http1/conn_pool.cc +++ b/source/common/http/http1/conn_pool.cc @@ -276,6 +276,11 @@ ConnPoolImpl::ActiveClient::ActiveClient(ConnPoolImpl& parent) conn_length_ = parent_.host_->cluster().stats().upstream_cx_length_ms_.allocateSpan(); connect_timer_->enableTimer(parent_.host_->cluster().connectTimeout()); parent_.host_->cluster().resourceManager(parent_.priority_).connections().inc(); + + codec_client_->setBufferStats({parent_.host_->cluster().stats().upstream_cx_rx_bytes_total_, + parent_.host_->cluster().stats().upstream_cx_rx_bytes_buffered_, + parent_.host_->cluster().stats().upstream_cx_tx_bytes_total_, + parent_.host_->cluster().stats().upstream_cx_tx_bytes_buffered_}); } ConnPoolImpl::ActiveClient::~ActiveClient() { @@ -285,15 +290,6 @@ ConnPoolImpl::ActiveClient::~ActiveClient() { parent_.host_->cluster().resourceManager(parent_.priority_).connections().dec(); } -void ConnPoolImpl::ActiveClient::onBufferChange(Network::ConnectionBufferType type, uint64_t, - int64_t delta) { - Network::Utility::updateBufferStats( - type, delta, parent_.host_->cluster().stats().upstream_cx_rx_bytes_total_, - parent_.host_->cluster().stats().upstream_cx_rx_bytes_buffered_, - parent_.host_->cluster().stats().upstream_cx_tx_bytes_total_, - parent_.host_->cluster().stats().upstream_cx_tx_bytes_buffered_); -} - void ConnPoolImpl::ActiveClient::onConnectTimeout() { // We just close the client at this point. This will result in both a timeout and a connect // failure and will fold into all the normal connect failure logic. diff --git a/source/common/http/http1/conn_pool.h b/source/common/http/http1/conn_pool.h index dc9fde1feace..ec036f868fa8 100644 --- a/source/common/http/http1/conn_pool.h +++ b/source/common/http/http1/conn_pool.h @@ -70,8 +70,6 @@ class ConnPoolImpl : Logger::Loggable, public ConnectionPool:: void onConnectTimeout(); // Network::ConnectionCallbacks - void onBufferChange(Network::ConnectionBufferType type, uint64_t old_size, - int64_t delta) override; void onEvent(uint32_t events) override { parent_.onConnectionEvent(*this, events); } ConnPoolImpl& parent_; diff --git a/source/common/http/http2/conn_pool.cc b/source/common/http/http2/conn_pool.cc index 0b3ebdb95d03..d60b7d6a8a3a 100644 --- a/source/common/http/http2/conn_pool.cc +++ b/source/common/http/http2/conn_pool.cc @@ -226,6 +226,11 @@ ConnPoolImpl::ActiveClient::ActiveClient(ConnPoolImpl& parent) parent_.host_->cluster().stats().upstream_cx_active_.inc(); parent_.host_->cluster().stats().upstream_cx_http2_total_.inc(); conn_length_ = parent_.host_->cluster().stats().upstream_cx_length_ms_.allocateSpan(); + + client_->setBufferStats({parent_.host_->cluster().stats().upstream_cx_rx_bytes_total_, + parent_.host_->cluster().stats().upstream_cx_rx_bytes_buffered_, + parent_.host_->cluster().stats().upstream_cx_tx_bytes_total_, + parent_.host_->cluster().stats().upstream_cx_tx_bytes_buffered_}); } ConnPoolImpl::ActiveClient::~ActiveClient() { @@ -234,15 +239,6 @@ ConnPoolImpl::ActiveClient::~ActiveClient() { conn_length_->complete(); } -void ConnPoolImpl::ActiveClient::onBufferChange(Network::ConnectionBufferType type, uint64_t, - int64_t delta) { - Network::Utility::updateBufferStats( - type, delta, parent_.host_->cluster().stats().upstream_cx_rx_bytes_total_, - parent_.host_->cluster().stats().upstream_cx_rx_bytes_buffered_, - parent_.host_->cluster().stats().upstream_cx_tx_bytes_total_, - parent_.host_->cluster().stats().upstream_cx_tx_bytes_buffered_); -} - CodecClientPtr ProdConnPoolImpl::createCodecClient(Upstream::Host::CreateConnectionData& data) { CodecClientStats stats{host_->cluster().stats().upstream_cx_protocol_error_}; CodecClientPtr codec{new CodecClientProd(CodecClient::Type::HTTP2, std::move(data.connection_), diff --git a/source/common/http/http2/conn_pool.h b/source/common/http/http2/conn_pool.h index a59d30a38cf1..be450e194c75 100644 --- a/source/common/http/http2/conn_pool.h +++ b/source/common/http/http2/conn_pool.h @@ -37,7 +37,6 @@ class ConnPoolImpl : Logger::Loggable, public ConnectionPool:: void onConnectTimeout() { parent_.onConnectTimeout(*this); } // Network::ConnectionCallbacks - void onBufferChange(Network::ConnectionBufferType type, uint64_t, int64_t delta) override; void onEvent(uint32_t events) override { parent_.onConnectionEvent(*this, events); } // CodecClientCallbacks diff --git a/source/common/mongo/proxy.h b/source/common/mongo/proxy.h index 6b68ba3922ba..3985b944d9e7 100644 --- a/source/common/mongo/proxy.h +++ b/source/common/mongo/proxy.h @@ -104,7 +104,6 @@ class ProxyFilter : public Network::Filter, // Network::ConnectionCallbacks void onEvent(uint32_t event) override; - void onBufferChange(Network::ConnectionBufferType, uint64_t, int64_t) override {} private: struct ActiveQuery { diff --git a/source/common/network/connection_impl.cc b/source/common/network/connection_impl.cc index 9155fac9aaf5..9b987137a40a 100644 --- a/source/common/network/connection_impl.cc +++ b/source/common/network/connection_impl.cc @@ -12,6 +12,24 @@ namespace Network { +void ConnectionImplUtility::updateBufferStats(uint64_t delta, uint64_t new_total, + uint64_t& previous_total, Stats::Counter& stat_total, + Stats::Gauge& stat_current) { + if (delta) { + stat_total.add(delta); + } + + if (new_total != previous_total) { + if (new_total > previous_total) { + stat_current.add(new_total - previous_total); + } else { + stat_current.sub(previous_total - new_total); + } + + previous_total = new_total; + } +} + std::atomic ConnectionImpl::next_global_id_; ConnectionImpl::ConnectionImpl(Event::DispatcherImpl& dispatcher, int fd, @@ -25,13 +43,6 @@ ConnectionImpl::ConnectionImpl(Event::DispatcherImpl& dispatcher, int fd, file_event_ = dispatcher_.createFileEvent(fd_, [this](uint32_t events) -> void { onFileEvent(events); }); - - read_buffer_.setCallback([this](uint64_t old_size, int64_t delta) -> void { - onBufferChange(ConnectionBufferType::Read, old_size, delta); - }); - write_buffer_.setCallback([this](uint64_t old_size, int64_t delta) -> void { - onBufferChange(ConnectionBufferType::Write, old_size, delta); - }); } ConnectionImpl::~ConnectionImpl() { @@ -93,20 +104,10 @@ void ConnectionImpl::closeSocket(uint32_t close_type) { conn_log_debug("closing socket: {}", *this, close_type); - // Drain input and output buffers so that callbacks get fired. This does not happen automatically - // as part of destruction. - uint64_t current_read_buffer_length = read_buffer_.length(); - read_buffer_.setCallback(nullptr); - if (current_read_buffer_length > 0) { - onBufferChange(ConnectionBufferType::Read, current_read_buffer_length, - -current_read_buffer_length); - } - uint64_t current_write_buffer_length = write_buffer_.length(); - write_buffer_.setCallback(nullptr); - if (current_write_buffer_length > 0) { - onBufferChange(ConnectionBufferType::Write, current_write_buffer_length, - -current_write_buffer_length); - } + // Drain input and output buffers. + updateReadBufferStats(0, 0); + updateWriteBufferStats(0, 0); + buffer_stats_.reset(); file_event_.reset(); ::close(fd_); @@ -148,18 +149,12 @@ void ConnectionImpl::noDelay(bool enable) { uint64_t ConnectionImpl::id() { return id_; } -void ConnectionImpl::onBufferChange(ConnectionBufferType type, uint64_t old_size, int64_t delta) { - for (ConnectionCallbacks* callbacks : callbacks_) { - callbacks->onBufferChange(type, old_size, delta); - } -} - -void ConnectionImpl::onRead() { +void ConnectionImpl::onRead(uint64_t read_buffer_size) { if (!(state_ & InternalState::ReadEnabled)) { return; } - if (read_buffer_.length() == 0) { + if (read_buffer_size == 0) { return; } @@ -243,7 +238,9 @@ void ConnectionImpl::onFileEvent(uint32_t events) { } } -ConnectionImpl::PostIoAction ConnectionImpl::doReadFromSocket() { +ConnectionImpl::IoResult ConnectionImpl::doReadFromSocket() { + PostIoAction action; + uint64_t bytes_read = 0; do { // 16K read is arbitrary. IIRC, libevent will currently clamp this to 4K. libevent will also // use an ioctl() before every read to figure out how much data there is to read. @@ -254,38 +251,50 @@ ConnectionImpl::PostIoAction ConnectionImpl::doReadFromSocket() { // Remote close. Might need to raise data before raising close. if (rc == 0) { - return PostIoAction::Close; + action = PostIoAction::Close; + break; } // Remote error (might be no data). if (rc == -1) { conn_log_trace("read error: {}", *this, errno); if (errno == EAGAIN) { - return PostIoAction::KeepOpen; + action = PostIoAction::KeepOpen; } else { - return PostIoAction::Close; + action = PostIoAction::Close; } + + break; + } else { + bytes_read += rc; } } while (true); + + return {action, bytes_read}; } void ConnectionImpl::onReadReady() { ASSERT(!(state_ & InternalState::Connecting)); - PostIoAction action = doReadFromSocket(); - onRead(); + IoResult result = doReadFromSocket(); + uint64_t new_buffer_size = read_buffer_.length(); + updateReadBufferStats(result.bytes_processed_, new_buffer_size); + onRead(new_buffer_size); // The read callback may have already closed the connection. - if (action == PostIoAction::Close) { + if (result.action_ == PostIoAction::Close) { conn_log_debug("remote close", *this); closeSocket(ConnectionEvent::RemoteClose); } } -ConnectionImpl::PostIoAction ConnectionImpl::doWriteToSocket() { +ConnectionImpl::IoResult ConnectionImpl::doWriteToSocket() { + PostIoAction action; + uint64_t bytes_written = 0; do { if (write_buffer_.length() == 0) { - return PostIoAction::KeepOpen; + action = PostIoAction::KeepOpen; + break; } int rc = write_buffer_.write(fd_); @@ -293,12 +302,18 @@ ConnectionImpl::PostIoAction ConnectionImpl::doWriteToSocket() { if (rc == -1) { conn_log_trace("write error: {}", *this, errno); if (errno == EAGAIN) { - return PostIoAction::KeepOpen; + action = PostIoAction::KeepOpen; } else { - return PostIoAction::Close; + action = PostIoAction::Close; } + + break; + } else { + bytes_written += rc; } } while (true); + + return {action, bytes_written}; } void ConnectionImpl::onConnected() { raiseEvents(ConnectionEvent::Connected); } @@ -324,12 +339,16 @@ void ConnectionImpl::onWriteReady() { } } - if (doWriteToSocket() == PostIoAction::Close) { + IoResult result = doWriteToSocket(); + uint64_t new_buffer_size = write_buffer_.length(); + updateWriteBufferStats(result.bytes_processed_, new_buffer_size); + + if (result.action_ == PostIoAction::Close) { // It is possible (though unlikely) for the connection to have already been closed during the // write callback. This can happen if we manage to complete the SSL handshake in the write // callback, raise a connected event, and close the connection. closeSocket(ConnectionEvent::RemoteClose); - } else if ((state_ & InternalState::CloseWithFlush) && write_buffer_.length() == 0) { + } else if ((state_ & InternalState::CloseWithFlush) && new_buffer_size == 0) { conn_log_debug("write flush complete", *this); closeSocket(ConnectionEvent::LocalClose); } @@ -353,6 +372,31 @@ void ConnectionImpl::doConnect(const sockaddr* addr, socklen_t addrlen) { } } +void ConnectionImpl::setBufferStats(const BufferStats& stats) { + ASSERT(!buffer_stats_); + buffer_stats_.reset(new BufferStats(stats)); +} + +void ConnectionImpl::updateReadBufferStats(uint64_t num_read, uint64_t new_size) { + if (!buffer_stats_) { + return; + } + + ConnectionImplUtility::updateBufferStats(num_read, new_size, last_read_buffer_size_, + buffer_stats_->read_total_, + buffer_stats_->read_current_); +} + +void ConnectionImpl::updateWriteBufferStats(uint64_t num_written, uint64_t new_size) { + if (!buffer_stats_) { + return; + } + + ConnectionImplUtility::updateBufferStats(num_written, new_size, last_write_buffer_size_, + buffer_stats_->write_total_, + buffer_stats_->write_current_); +} + ClientConnectionImpl::ClientConnectionImpl(Event::DispatcherImpl& dispatcher, int fd, const std::string& url) : ConnectionImpl(dispatcher, fd, url) {} diff --git a/source/common/network/connection_impl.h b/source/common/network/connection_impl.h index 7656cbb23c29..f0e3151728af 100644 --- a/source/common/network/connection_impl.h +++ b/source/common/network/connection_impl.h @@ -11,6 +11,25 @@ namespace Network { +/** + * Utility functions for the connection implementation. + */ +class ConnectionImplUtility { +public: + /** + * Update the buffer stats for a connection. + * @param delta supplies the data read/written. + * @param new_total supplies the final total buffer size. + * @param previous_total supplies the previous final total buffer size. previous_total will be + * updated to new_total when the call is complete. + * @param stat_total supplies the counter to increment with the delta. + * @param stat_current supplies the guage that should be updated with the delta of previous_total + * and new_total. + */ + static void updateBufferStats(uint64_t delta, uint64_t new_total, uint64_t& previous_total, + Stats::Counter& stat_total, Stats::Gauge& stat_current); +}; + /** * Implementation of Network::Connection. */ @@ -37,6 +56,7 @@ class ConnectionImpl : public virtual Connection, void readDisable(bool disable) override; bool readEnabled() override; const std::string& remoteAddress() override { return remote_address_; } + void setBufferStats(const BufferStats& stats) override; Ssl::Connection* ssl() override { return nullptr; } State state() override; void write(Buffer::Instance& data) override; @@ -48,6 +68,11 @@ class ConnectionImpl : public virtual Connection, protected: enum class PostIoAction { Close, KeepOpen }; + struct IoResult { + PostIoAction action_; + uint64_t bytes_processed_; + }; + virtual void closeSocket(uint32_t close_type); void doConnect(const sockaddr* addr, socklen_t addrlen); void raiseEvents(uint32_t events); @@ -67,14 +92,16 @@ class ConnectionImpl : public virtual Connection, }; // clang-format on - virtual PostIoAction doReadFromSocket(); - virtual PostIoAction doWriteToSocket(); + virtual IoResult doReadFromSocket(); + virtual IoResult doWriteToSocket(); void onBufferChange(ConnectionBufferType type, uint64_t old_size, int64_t delta); virtual void onConnected(); void onFileEvent(uint32_t events); - void onRead(); + void onRead(uint64_t read_buffer_size); void onReadReady(); void onWriteReady(); + void updateReadBufferStats(uint64_t num_read, uint64_t new_size); + void updateWriteBufferStats(uint64_t num_written, uint64_t new_size); static std::atomic next_global_id_; @@ -85,6 +112,9 @@ class ConnectionImpl : public virtual Connection, std::list callbacks_; uint32_t state_{InternalState::ReadEnabled}; Buffer::Instance* current_write_buffer_{}; + uint64_t last_read_buffer_size_{}; + uint64_t last_write_buffer_size_{}; + std::unique_ptr buffer_stats_; }; /** diff --git a/source/common/network/utility.cc b/source/common/network/utility.cc index 6fbc182e7e92..a5b7636f8496 100644 --- a/source/common/network/utility.cc +++ b/source/common/network/utility.cc @@ -70,27 +70,6 @@ bool IpWhiteList::contains(const std::string& remote_address) const { const std::string Utility::TCP_SCHEME = "tcp://"; const std::string Utility::UNIX_SCHEME = "unix://"; -void Utility::updateBufferStats(ConnectionBufferType type, int64_t delta, Stats::Counter& rx_total, - Stats::Gauge& rx_buffered, Stats::Counter& tx_total, - Stats::Gauge& tx_buffered) { - if (type == ConnectionBufferType::Read) { - if (delta > 0) { - rx_total.add(delta); - rx_buffered.add(delta); - } else { - rx_buffered.sub(std::abs(delta)); - } - } else { - ASSERT(type == ConnectionBufferType::Write); - if (delta > 0) { - tx_total.add(delta); - tx_buffered.add(delta); - } else { - tx_buffered.sub(std::abs(delta)); - } - } -} - AddrInfoPtr Utility::resolveTCP(const std::string& host, uint32_t port) { addrinfo addrinfo_hints; memset(&addrinfo_hints, 0, sizeof(addrinfo_hints)); diff --git a/source/common/network/utility.h b/source/common/network/utility.h index 3b0f61c03371..4d243e719076 100644 --- a/source/common/network/utility.h +++ b/source/common/network/utility.h @@ -37,14 +37,6 @@ class Utility { static const std::string TCP_SCHEME; static const std::string UNIX_SCHEME; - /** - * Update buffering stats for a connection. Meant to be paired with - * ConnectionCallbacks::onBufferChange(). - */ - static void updateBufferStats(ConnectionBufferType type, int64_t delta, Stats::Counter& rx_total, - Stats::Gauge& rx_buffered, Stats::Counter& tx_total, - Stats::Gauge& tx_buffered); - /** * Resolve a TCP address. * @param host supplies the host name. diff --git a/source/common/ssl/connection_impl.cc b/source/common/ssl/connection_impl.cc index 799bc98e8381..901e49e4268a 100644 --- a/source/common/ssl/connection_impl.cc +++ b/source/common/ssl/connection_impl.cc @@ -32,16 +32,17 @@ ConnectionImpl::~ConnectionImpl() { filter_manager_.destroyFilters(); } -Network::ConnectionImpl::PostIoAction ConnectionImpl::doReadFromSocket() { +Network::ConnectionImpl::IoResult ConnectionImpl::doReadFromSocket() { if (!handshake_complete_) { PostIoAction action = doHandshake(); if (action == PostIoAction::Close || !handshake_complete_) { - return action; + return {action, 0}; } } bool keep_reading = true; PostIoAction action = PostIoAction::KeepOpen; + uint64_t bytes_read = 0; while (keep_reading) { // We use 2 slices here so that we can use the remainder of an existing buffer chain element // if there is extra space. 16K read is arbitrary and can be tuned later. @@ -54,6 +55,7 @@ Network::ConnectionImpl::PostIoAction ConnectionImpl::doReadFromSocket() { if (rc > 0) { slices[i].len_ = rc; slices_to_commit++; + bytes_read += rc; } else { keep_reading = false; int err = SSL_get_error(ssl_.get(), rc); @@ -77,7 +79,7 @@ Network::ConnectionImpl::PostIoAction ConnectionImpl::doReadFromSocket() { } } - return action; + return {action, bytes_read}; } Network::ConnectionImpl::PostIoAction ConnectionImpl::doHandshake() { @@ -117,16 +119,16 @@ void ConnectionImpl::drainErrorQueue() { } } -Network::ConnectionImpl::PostIoAction ConnectionImpl::doWriteToSocket() { +Network::ConnectionImpl::IoResult ConnectionImpl::doWriteToSocket() { if (!handshake_complete_) { PostIoAction action = doHandshake(); if (action == PostIoAction::Close || !handshake_complete_) { - return action; + return {action, 0}; } } if (write_buffer_.length() == 0) { - return PostIoAction::KeepOpen; + return {PostIoAction::KeepOpen, 0}; } uint64_t num_slices = write_buffer_.getRawSlices(nullptr, 0); @@ -154,7 +156,7 @@ Network::ConnectionImpl::PostIoAction ConnectionImpl::doWriteToSocket() { // Renegotiation has started. We don't handle renegotiation so just fall through. default: drainErrorQueue(); - return PostIoAction::Close; + return {PostIoAction::Close, bytes_written}; } break; @@ -165,7 +167,7 @@ Network::ConnectionImpl::PostIoAction ConnectionImpl::doWriteToSocket() { write_buffer_.drain(bytes_written); } - return PostIoAction::KeepOpen; + return {PostIoAction::KeepOpen, bytes_written}; } void ConnectionImpl::onConnected() { ASSERT(!handshake_complete_); } diff --git a/source/common/ssl/connection_impl.h b/source/common/ssl/connection_impl.h index ac37ce2baf17..0f38d29275b9 100644 --- a/source/common/ssl/connection_impl.h +++ b/source/common/ssl/connection_impl.h @@ -27,8 +27,8 @@ class ConnectionImpl : public Network::ConnectionImpl, public Connection { // Network::ConnectionImpl void closeSocket(uint32_t close_type) override; - PostIoAction doReadFromSocket() override; - PostIoAction doWriteToSocket() override; + IoResult doReadFromSocket() override; + IoResult doWriteToSocket() override; void onConnected() override; ContextImpl& ctx_; diff --git a/source/common/stats/statsd.h b/source/common/stats/statsd.h index 650d1faca08f..899f86634d3f 100644 --- a/source/common/stats/statsd.h +++ b/source/common/stats/statsd.h @@ -92,7 +92,6 @@ class TcpStatsdSink : public Sink { void shutdown() override; // Network::ConnectionCallbacks - void onBufferChange(Network::ConnectionBufferType, uint64_t, int64_t) override {} void onEvent(uint32_t events) override; TcpStatsdSink& parent_; diff --git a/source/common/upstream/health_checker_impl.h b/source/common/upstream/health_checker_impl.h index a9e9984ed41a..83be6bc73891 100644 --- a/source/common/upstream/health_checker_impl.h +++ b/source/common/upstream/health_checker_impl.h @@ -136,7 +136,6 @@ class HttpHealthCheckerImpl : public HealthCheckerImplBase { void onResetStream(Http::StreamResetReason reason) override; // Network::ConnectionCallbacks - void onBufferChange(Network::ConnectionBufferType, uint64_t, int64_t) override {} void onEvent(uint32_t events) override; HttpHealthCheckerImpl& parent_; @@ -241,7 +240,6 @@ class TcpHealthCheckerImpl : public HealthCheckerImplBase { TcpSessionCallbacks(TcpActiveHealthCheckSession& parent) : parent_(parent) {} // Network::ConnectionCallbacks - void onBufferChange(Network::ConnectionBufferType, uint64_t, int64_t) override {} void onEvent(uint32_t events) override { parent_.onEvent(events); } // Network::ReadFilter diff --git a/source/server/connection_handler.h b/source/server/connection_handler.h index 20d0f62e94d8..2408af1679b9 100644 --- a/source/server/connection_handler.h +++ b/source/server/connection_handler.h @@ -120,8 +120,6 @@ class ConnectionHandler final : NonCopyable { ~ActiveConnection(); // Network::ConnectionCallbacks - void onBufferChange(Network::ConnectionBufferType, uint64_t, int64_t) override {} - void onEvent(uint32_t event) override { // Any event leads to destruction of the connection. if (event == Network::ConnectionEvent::LocalClose || diff --git a/test/common/network/connection_impl_test.cc b/test/common/network/connection_impl_test.cc index 1a10d5ad153e..12b746bc02e1 100644 --- a/test/common/network/connection_impl_test.cc +++ b/test/common/network/connection_impl_test.cc @@ -5,21 +5,58 @@ #include "common/stats/stats_impl.h" #include "test/mocks/network/mocks.h" +#include "test/mocks/stats/mocks.h" using testing::_; using testing::Sequence; +using testing::InSequence; using testing::Invoke; using testing::Return; +using testing::StrictMock; using testing::Test; namespace Network { +TEST(ConnectionImplUtility, updateBufferStats) { + StrictMock counter; + StrictMock gauge; + uint64_t previous_total = 0; + + InSequence s; + EXPECT_CALL(counter, add(5)); + EXPECT_CALL(gauge, add(5)); + ConnectionImplUtility::updateBufferStats(5, 5, previous_total, counter, gauge); + EXPECT_EQ(5UL, previous_total); + + EXPECT_CALL(counter, add(1)); + EXPECT_CALL(gauge, sub(1)); + ConnectionImplUtility::updateBufferStats(1, 4, previous_total, counter, gauge); + + EXPECT_CALL(gauge, sub(4)); + ConnectionImplUtility::updateBufferStats(0, 0, previous_total, counter, gauge); + + EXPECT_CALL(counter, add(3)); + EXPECT_CALL(gauge, add(3)); + ConnectionImplUtility::updateBufferStats(3, 3, previous_total, counter, gauge); +} + TEST(ConnectionImplDeathTest, BadFd) { Event::DispatcherImpl dispatcher; EXPECT_DEATH(ConnectionImpl(dispatcher, -1, "127.0.0.1"), ".*assert failure: fd_ != -1.*"); } -TEST(ConnectionImplTest, BufferCallbacks) { +struct MockBufferStats { + Connection::BufferStats toBufferStats() { + return {rx_total_, rx_current_, tx_total_, tx_current_}; + } + + StrictMock rx_total_; + StrictMock rx_current_; + StrictMock tx_total_; + StrictMock tx_current_; +}; + +TEST(ConnectionImplTest, BufferStats) { Stats::IsolatedStoreImpl stats_store; Event::DispatcherImpl dispatcher; Network::TcpListenSocket socket(10000); @@ -31,6 +68,8 @@ TEST(ConnectionImplTest, BufferCallbacks) { dispatcher.createClientConnection("tcp://127.0.0.1:10000"); MockConnectionCallbacks client_callbacks; client_connection->addConnectionCallbacks(client_callbacks); + MockBufferStats client_buffer_stats; + client_connection->setBufferStats(client_buffer_stats.toBufferStats()); client_connection->connect(); std::shared_ptr write_filter(new MockWriteFilter()); @@ -44,24 +83,26 @@ TEST(ConnectionImplTest, BufferCallbacks) { .WillOnce(Return(FilterStatus::StopIteration)); EXPECT_CALL(*write_filter, onWrite(_)).InSequence(s1).WillOnce(Return(FilterStatus::Continue)); EXPECT_CALL(*filter, onWrite(_)).InSequence(s1).WillOnce(Return(FilterStatus::Continue)); - EXPECT_CALL(client_callbacks, onBufferChange(ConnectionBufferType::Write, 0, 4)).InSequence(s1); EXPECT_CALL(client_callbacks, onEvent(ConnectionEvent::Connected)).InSequence(s1); - EXPECT_CALL(client_callbacks, onBufferChange(ConnectionBufferType::Write, 4, -4)).InSequence(s1); + EXPECT_CALL(client_buffer_stats.tx_total_, add(4)).InSequence(s1); Network::ConnectionPtr server_connection; Network::MockConnectionCallbacks server_callbacks; + MockBufferStats server_buffer_stats; std::shared_ptr read_filter(new MockReadFilter()); EXPECT_CALL(listener_callbacks, onNewConnection_(_)) .WillOnce(Invoke([&](Network::ConnectionPtr& conn) -> void { server_connection = std::move(conn); server_connection->addConnectionCallbacks(server_callbacks); + server_connection->setBufferStats(server_buffer_stats.toBufferStats()); server_connection->addReadFilter(read_filter); EXPECT_EQ("", server_connection->nextProtocol()); })); Sequence s2; - EXPECT_CALL(server_callbacks, onBufferChange(ConnectionBufferType::Read, 0, 4)).InSequence(s2); - EXPECT_CALL(server_callbacks, onBufferChange(ConnectionBufferType::Read, 4, -4)).InSequence(s2); + EXPECT_CALL(server_buffer_stats.rx_total_, add(4)).InSequence(s2); + EXPECT_CALL(server_buffer_stats.rx_current_, add(4)).InSequence(s2); + EXPECT_CALL(server_buffer_stats.rx_current_, sub(4)).InSequence(s2); EXPECT_CALL(server_callbacks, onEvent(ConnectionEvent::LocalClose)).InSequence(s2); EXPECT_CALL(*read_filter, onNewConnection()); diff --git a/test/integration/fake_upstream.h b/test/integration/fake_upstream.h index b270c1101858..f82186906e9c 100644 --- a/test/integration/fake_upstream.h +++ b/test/integration/fake_upstream.h @@ -66,7 +66,6 @@ class FakeConnectionBase : public Network::ConnectionCallbacks { void waitForDisconnect(); // Network::ConnectionCallbacks - void onBufferChange(Network::ConnectionBufferType, uint64_t, int64_t) override {} void onEvent(uint32_t events) override; protected: diff --git a/test/integration/integration.h b/test/integration/integration.h index dbf7421cf2ee..57841c32e92d 100644 --- a/test/integration/integration.h +++ b/test/integration/integration.h @@ -68,7 +68,6 @@ class IntegrationCodecClient : public Http::CodecClientProd { ConnectionCallbacks(IntegrationCodecClient& parent) : parent_(parent) {} // Network::ConnectionCallbacks - void onBufferChange(Network::ConnectionBufferType, uint64_t, int64_t) override {} void onEvent(uint32_t events) override; IntegrationCodecClient& parent_; @@ -113,7 +112,6 @@ class IntegrationTcpClient { ConnectionCallbacks(IntegrationTcpClient& parent) : parent_(parent) {} // Network::ConnectionCallbacks - void onBufferChange(Network::ConnectionBufferType, uint64_t, int64_t) override {} void onEvent(uint32_t events) override; // Network::ReadFilter diff --git a/test/mocks/network/mocks.h b/test/mocks/network/mocks.h index 901063d351dd..6de69c5930a1 100644 --- a/test/mocks/network/mocks.h +++ b/test/mocks/network/mocks.h @@ -16,7 +16,6 @@ class MockConnectionCallbacks : public ConnectionCallbacks { ~MockConnectionCallbacks(); // Network::ConnectionCallbacks - MOCK_METHOD3(onBufferChange, void(ConnectionBufferType type, uint64_t old_size, int64_t delta)); MOCK_METHOD1(onEvent, void(uint32_t events)); }; @@ -54,6 +53,7 @@ class MockConnection : public Connection, public MockConnectionBase { MOCK_METHOD1(readDisable, void(bool disable)); MOCK_METHOD0(readEnabled, bool()); MOCK_METHOD0(remoteAddress, const std::string&()); + MOCK_METHOD1(setBufferStats, void(const BufferStats& stats)); MOCK_METHOD0(ssl, Ssl::Connection*()); MOCK_METHOD0(state, State()); MOCK_METHOD1(write, void(Buffer::Instance& data)); @@ -82,6 +82,7 @@ class MockClientConnection : public ClientConnection, public MockConnectionBase MOCK_METHOD1(readDisable, void(bool disable)); MOCK_METHOD0(readEnabled, bool()); MOCK_METHOD0(remoteAddress, const std::string&()); + MOCK_METHOD1(setBufferStats, void(const BufferStats& stats)); MOCK_METHOD0(ssl, Ssl::Connection*()); MOCK_METHOD0(state, State()); MOCK_METHOD1(write, void(Buffer::Instance& data)); diff --git a/test/mocks/stats/mocks.cc b/test/mocks/stats/mocks.cc index ccabe57954af..bff49e913f03 100644 --- a/test/mocks/stats/mocks.cc +++ b/test/mocks/stats/mocks.cc @@ -7,6 +7,9 @@ namespace Stats { MockCounter::MockCounter() {} MockCounter::~MockCounter() {} +MockGauge::MockGauge() {} +MockGauge::~MockGauge() {} + MockTimespan::MockTimespan() {} MockTimespan::~MockTimespan() {} diff --git a/test/mocks/stats/mocks.h b/test/mocks/stats/mocks.h index d31c647ab97c..80261c307d5b 100644 --- a/test/mocks/stats/mocks.h +++ b/test/mocks/stats/mocks.h @@ -20,6 +20,21 @@ class MockCounter : public Counter { MOCK_METHOD0(value, uint64_t()); }; +class MockGauge : public Gauge { +public: + MockGauge(); + ~MockGauge(); + + MOCK_METHOD1(add, void(uint64_t amount)); + MOCK_METHOD0(dec, void()); + MOCK_METHOD0(inc, void()); + MOCK_METHOD0(name, std::string()); + MOCK_METHOD1(set, void(uint64_t value)); + MOCK_METHOD1(sub, void(uint64_t amount)); + MOCK_METHOD0(used, bool()); + MOCK_METHOD0(value, uint64_t()); +}; + class MockTimespan : public Timespan { public: MockTimespan(); From f665fbd12d60c1a460bc2ace7d3121748416885e Mon Sep 17 00:00:00 2001 From: Matt Klein Date: Mon, 28 Nov 2016 14:46:13 -0800 Subject: [PATCH 2/3] fix --- source/common/filter/tcp_proxy.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/filter/tcp_proxy.h b/source/common/filter/tcp_proxy.h index 7730c73d6382..898686076983 100644 --- a/source/common/filter/tcp_proxy.h +++ b/source/common/filter/tcp_proxy.h @@ -18,7 +18,7 @@ namespace Filter { // clang-format off #define ALL_TCP_PROXY_STATS(COUNTER, GAUGE) \ COUNTER(downstream_cx_rx_bytes_total) \ - GAUGE (downstream_cx_rx_bytes_buffered) \ + GAUGE (downstream_cx_rx_bytes_buffered) \ COUNTER(downstream_cx_tx_bytes_total) \ GAUGE (downstream_cx_tx_bytes_buffered) // clang-format on From 3b314be1a6e08c0c6afa4819f58720ea5901b9fe Mon Sep 17 00:00:00 2001 From: Matt Klein Date: Mon, 28 Nov 2016 15:09:09 -0800 Subject: [PATCH 3/3] comment --- source/common/network/connection_impl.cc | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/source/common/network/connection_impl.cc b/source/common/network/connection_impl.cc index 9b987137a40a..1e35f7fa98f0 100644 --- a/source/common/network/connection_impl.cc +++ b/source/common/network/connection_impl.cc @@ -253,10 +253,8 @@ ConnectionImpl::IoResult ConnectionImpl::doReadFromSocket() { if (rc == 0) { action = PostIoAction::Close; break; - } - - // Remote error (might be no data). - if (rc == -1) { + } else if (rc == -1) { + // Remote error (might be no data). conn_log_trace("read error: {}", *this, errno); if (errno == EAGAIN) { action = PostIoAction::KeepOpen;