Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce listener TCP connection buffer configuration and implement … #558

Merged
merged 6 commits into from
Mar 13, 2017
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/configuration/listeners/listeners.rst
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ use_original_dst
destination port. If there is no listener associated with the original destination port, the
connection is handled by the listener that receives it. Default is false.

per_connection_buffer_limit_bytes
*(optional, integer)* Soft limit on size of the listener's new connection read and write buffers. If unspecified, an implementation defined default is applied (1MB).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 100 col line break


.. toctree::
:hidden:

Expand Down
18 changes: 11 additions & 7 deletions include/envoy/event/dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,16 @@ class Dispatcher {
* (http://www.haproxy.org/download/1.5/doc/proxy-protocol.txt)
* @param use_orig_dst if a connection was redirected to this port using iptables,
* allow the listener to hand it off to the listener associated to the original port
* @param per_connection_buffer_limit_bytes soft limit on size of the listener's new connection
* read and write buffers.
* @return Network::ListenerPtr a new listener that is owned by the caller.
*/
virtual Network::ListenerPtr createListener(Network::ConnectionHandler& conn_handler,
Network::ListenSocket& socket,
Network::ListenerCallbacks& cb,
Stats::Store& stats_store, bool bind_to_port,
bool use_proxy_proto, bool use_orig_dst) PURE;
bool use_proxy_proto, bool use_orig_dst,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, to ask you to do this, but I have a feeling we are going to keep adding params/options here. (I know soon we will split use_original_dst into use_orginal_port/use_original_address, etc.). Can we define a struct ListenerOptions and just pass that. That way we don't have to change a gazillion mocks and callsites the next time we add a param.

size_t per_connection_buffer_limit_bytes) PURE;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of size_t, can we be specific with either uint32_t or uint64_t (probably uint32_t). Same applies all the other places we reference this.


/**
* Create a listener on a specific port.
Expand All @@ -108,14 +111,15 @@ class Dispatcher {
* (http://www.haproxy.org/download/1.5/doc/proxy-protocol.txt)
* @param use_orig_dst if a connection was redirected to this port using iptables,
* allow the listener to hand it off to the listener associated to the original port
* @param per_connection_buffer_limit_bytes soft limit on size of the listener's new connection
* read and write buffers.
* @return Network::ListenerPtr a new listener that is owned by the caller.
*/
virtual Network::ListenerPtr createSslListener(Network::ConnectionHandler& conn_handler,
Ssl::ServerContext& ssl_ctx,
Network::ListenSocket& socket,
Network::ListenerCallbacks& cb,
Stats::Store& stats_store, bool bind_to_port,
bool use_proxy_proto, bool use_orig_dst) PURE;
virtual Network::ListenerPtr
createSslListener(Network::ConnectionHandler& conn_handler, Ssl::ServerContext& ssl_ctx,
Network::ListenSocket& socket, Network::ListenerCallbacks& cb,
Stats::Store& stats_store, bool bind_to_port, bool use_proxy_proto,
bool use_orig_dst, size_t per_connection_buffer_limit_bytes) PURE;

/**
* Allocate a timer. @see Event::Timer for docs on how to use the timer.
Expand Down
6 changes: 6 additions & 0 deletions include/envoy/network/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ class Connection : public Event::DeferredDeletable, public FilterManager {
* are installed.
*/
virtual void write(Buffer::Instance& data) PURE;

/**
* Set a soft limit on the size of the read buffer prior to flushing to further stages in the
* processing pipeline.
*/
virtual void setReadBufferLimit(size_t limit) PURE;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uint32_t

};

typedef std::unique_ptr<Connection> ConnectionPtr;
Expand Down
10 changes: 8 additions & 2 deletions include/envoy/network/connection_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ class ConnectionHandler {
* (http://www.haproxy.org/download/1.5/doc/proxy-protocol.txt)
* @param use_orig_dst if a connection was redirected to this port using iptables,
* allow the listener to hand it off to the listener associated to the original port
* @param per_connection_buffer_limit_bytes soft limit on size of the listener's new connection
* read and write buffers.
*/
virtual void addListener(Network::FilterChainFactory& factory, Network::ListenSocket& socket,
bool bind_to_port, bool use_proxy_proto, bool use_orig_dst) PURE;
bool bind_to_port, bool use_proxy_proto, bool use_orig_dst,
size_t per_connection_buffer_limit_bytes) PURE;

/**
* Adds listener to the handler.
Expand All @@ -43,10 +46,13 @@ class ConnectionHandler {
* (http://www.haproxy.org/download/1.5/doc/proxy-protocol.txt)
* @param use_orig_dst if a connection was redirected to this port using iptables,
* allow the listener to hand it off to the listener associated to the original port
* @param per_connection_buffer_limit_bytes soft limit on size of the listener's new connection
* read and write buffers.
*/
virtual void addSslListener(Network::FilterChainFactory& factory, Ssl::ServerContext& ssl_ctx,
Network::ListenSocket& socket, bool bind_to_port,
bool use_proxy_proto, bool use_orig_dst) PURE;
bool use_proxy_proto, bool use_orig_dst,
size_t per_connection_buffer_limit_bytes) PURE;

/**
* Find a listener based on the provided listener port value.
Expand Down
6 changes: 6 additions & 0 deletions include/envoy/server/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ class Listener {
* allow the listener to hand it off to the listener associated to the original port
*/
virtual bool useOriginalDst() PURE;

/**
* @return size_t providing a soft limit on size of the listener's new connection read and write
* buffers.
*/
virtual size_t perConnectionBufferLimitBytes() PURE;
};

typedef std::unique_ptr<Listener> ListenerPtr;
Expand Down
17 changes: 10 additions & 7 deletions source/common/event/dispatcher_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,20 +83,23 @@ Network::ListenerPtr DispatcherImpl::createListener(Network::ConnectionHandler&
Network::ListenSocket& socket,
Network::ListenerCallbacks& cb,
Stats::Store& stats_store, bool bind_to_port,
bool use_proxy_proto, bool use_orig_dst) {
return Network::ListenerPtr{new Network::ListenerImpl(
conn_handler, *this, socket, cb, stats_store, bind_to_port, use_proxy_proto, use_orig_dst)};
bool use_proxy_proto, bool use_orig_dst,
size_t per_connection_buffer_limit_bytes) {
return Network::ListenerPtr{
new Network::ListenerImpl(conn_handler, *this, socket, cb, stats_store, bind_to_port,
use_proxy_proto, use_orig_dst, per_connection_buffer_limit_bytes)};
}

Network::ListenerPtr DispatcherImpl::createSslListener(Network::ConnectionHandler& conn_handler,
Ssl::ServerContext& ssl_ctx,
Network::ListenSocket& socket,
Network::ListenerCallbacks& cb,
Stats::Store& stats_store, bool bind_to_port,
bool use_proxy_proto, bool use_orig_dst) {
return Network::ListenerPtr{new Network::SslListenerImpl(conn_handler, *this, ssl_ctx, socket, cb,
stats_store, bind_to_port,
use_proxy_proto, use_orig_dst)};
bool use_proxy_proto, bool use_orig_dst,
size_t per_connection_buffer_limit_bytes) {
return Network::ListenerPtr{new Network::SslListenerImpl(
conn_handler, *this, ssl_ctx, socket, cb, stats_store, bind_to_port, use_proxy_proto,
use_orig_dst, per_connection_buffer_limit_bytes)};
}

TimerPtr DispatcherImpl::createTimer(TimerCb cb) { return TimerPtr{new TimerImpl(*this, cb)}; }
Expand Down
7 changes: 4 additions & 3 deletions source/common/event/dispatcher_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@ class DispatcherImpl : Logger::Loggable<Logger::Id::main>, public Dispatcher {
Network::ListenerPtr createListener(Network::ConnectionHandler& conn_handler,
Network::ListenSocket& socket, Network::ListenerCallbacks& cb,
Stats::Store& stats_store, bool bind_to_port,
bool use_proxy_proto, bool use_orig_dst) override;
bool use_proxy_proto, bool use_orig_dst,
size_t per_connection_buffer_limit_bytes) override;
Network::ListenerPtr createSslListener(Network::ConnectionHandler& conn_handler,
Ssl::ServerContext& ssl_ctx, Network::ListenSocket& socket,
Network::ListenerCallbacks& cb, Stats::Store& stats_store,
bool bind_to_port, bool use_proxy_proto,
bool use_orig_dst) override;
bool bind_to_port, bool use_proxy_proto, bool use_orig_dst,
size_t per_connection_buffer_limit_bytes) override;
TimerPtr createTimer(TimerCb cb) override;
void deferredDelete(DeferredDeletablePtr&& to_delete) override;
void exit() override;
Expand Down
7 changes: 6 additions & 1 deletion source/common/json/config_schemas.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,12 @@ const std::string Json::Schema::LISTENER_SCHEMA(R"EOF(
"ssl_context" : {"$ref" : "#/definitions/ssl_context"},
"bind_to_port" : {"type": "boolean"},
"use_proxy_proto" : {"type" : "boolean"},
"use_original_dst" : {"type" : "boolean"}
"use_original_dst" : {"type" : "boolean"},
"per_connection_buffer_limit_bytes" : {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"per_connection_read_buffer_limit_bytes" ? Next we will have write high/low watermark so might get confusing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was planning on using per_connection_buffer_limit_bytes to drive both the watermark for the write buffer (automatically setting low watermark at a fraction like 0.5) and the read buffer, under the assumption that we want to keep config simple and that read/write buffer limits should be somewhat symmetric in general. I know there are scenarios where this isn't true, but this could be reasonable for v1.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK sounds good.

"type" : "integer",
"minimum" : 0,
"exclusiveMinimum" : true
}
},
"required": ["port", "filters"],
"additionalProperties": false
Expand Down
10 changes: 6 additions & 4 deletions source/common/network/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ void ConnectionImpl::onFileEvent(uint32_t events) {
}

ConnectionImpl::IoResult ConnectionImpl::doReadFromSocket() {
PostIoAction action;
PostIoAction action = PostIoAction::KeepOpen;
uint64_t bytes_read = 0;
do {
// 16K read is arbitrary. IIRC, libevent will currently clamp this to 4K. libevent will also
Expand All @@ -288,15 +288,17 @@ ConnectionImpl::IoResult ConnectionImpl::doReadFromSocket() {
} else if (rc == -1) {
// Remote error (might be no data).
conn_log_trace("read error: {}", *this, errno);
if (errno == EAGAIN) {
action = PostIoAction::KeepOpen;
} else {
if (errno != EAGAIN) {
action = PostIoAction::Close;
}

break;
} else {
bytes_read += rc;
if (shouldDrainReadBuffer()) {
setReadBufferReady();
break;
}
}
} while (true);

Expand Down
12 changes: 12 additions & 0 deletions source/common/network/connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class ConnectionImpl : public virtual Connection,
Ssl::Connection* ssl() override { return nullptr; }
State state() override;
void write(Buffer::Instance& data) override;
void setReadBufferLimit(size_t limit) override { read_buffer_limit_ = limit; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uint32_t


// Network::BufferSource
Buffer::Instance& getReadBuffer() override { return read_buffer_; }
Expand All @@ -79,6 +80,16 @@ class ConnectionImpl : public virtual Connection,
virtual void closeSocket(uint32_t close_type);
void doConnect();
void raiseEvents(uint32_t events);
// Should the read buffer be drained?
bool shouldDrainReadBuffer() {
return read_buffer_limit_ > 0 && read_buffer_.length() >= read_buffer_limit_;
}
// Mark read buffer ready to read in the event loop. This is used when yielding following
// shouldDrainReadBuffer().
// TODO(htuch): While this is the basis for also yielding to other connections to provide some
// fair sharing of CPU resources, the underlying event loop does not make any fairness guarantees.
// Reconsider how to make fairness happen.
void setReadBufferReady() { file_event_->activate(Event::FileReadyType::Read); }

static const Address::InstancePtr null_local_address_;

Expand All @@ -87,6 +98,7 @@ class ConnectionImpl : public virtual Connection,
Address::InstancePtr local_address_;
Buffer::OwnedImpl read_buffer_;
Buffer::OwnedImpl write_buffer_;
size_t read_buffer_limit_ = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uint32_t


private:
// clang-format off
Expand Down
8 changes: 6 additions & 2 deletions source/common/network/listener_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,12 @@ void ListenerImpl::listenCallback(evconnlistener*, evutil_socket_t fd, sockaddr*
ListenerImpl::ListenerImpl(Network::ConnectionHandler& conn_handler,
Event::DispatcherImpl& dispatcher, ListenSocket& socket,
ListenerCallbacks& cb, Stats::Store& stats_store, bool bind_to_port,
bool use_proxy_proto, bool use_orig_dst)
bool use_proxy_proto, bool use_orig_dst,
size_t per_connection_buffer_limit_bytes)
: connection_handler_(conn_handler), dispatcher_(dispatcher), socket_(socket), cb_(cb),
bind_to_port_(bind_to_port), use_proxy_proto_(use_proxy_proto), proxy_protocol_(stats_store),
use_original_dst_(use_orig_dst), listener_(nullptr) {
use_original_dst_(use_orig_dst),
per_connection_buffer_limit_bytes_(per_connection_buffer_limit_bytes), listener_(nullptr) {

if (bind_to_port_) {
listener_.reset(
Expand All @@ -90,6 +92,7 @@ void ListenerImpl::errorCallback(evconnlistener*, void*) {
void ListenerImpl::newConnection(int fd, Address::InstancePtr remote_address,
Address::InstancePtr local_address) {
ConnectionPtr new_connection(new ConnectionImpl(dispatcher_, fd, remote_address, local_address));
new_connection->setReadBufferLimit(per_connection_buffer_limit_bytes_);
cb_.onNewConnection(std::move(new_connection));
}

Expand All @@ -98,6 +101,7 @@ void SslListenerImpl::newConnection(int fd, Address::InstancePtr remote_address,
ConnectionPtr new_connection(new Ssl::ConnectionImpl(dispatcher_, fd, remote_address,
local_address, ssl_ctx_,
Ssl::ConnectionImpl::InitialState::Server));
new_connection->setReadBufferLimit(per_connection_buffer_limit_bytes_);
cb_.onNewConnection(std::move(new_connection));
}

Expand Down
8 changes: 5 additions & 3 deletions source/common/network/listener_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ class ListenerImpl : public Listener {
public:
ListenerImpl(Network::ConnectionHandler& conn_handler, Event::DispatcherImpl& dispatcher,
ListenSocket& socket, ListenerCallbacks& cb, Stats::Store& stats_store,
bool bind_to_port, bool use_proxy_proto, bool use_orig_dst);
bool bind_to_port, bool use_proxy_proto, bool use_orig_dst,
size_t per_connection_buffer_limit_bytes);

/**
* Accept/process a new connection.
Expand All @@ -47,6 +48,7 @@ class ListenerImpl : public Listener {
const bool use_proxy_proto_;
ProxyProtocol proxy_protocol_;
const bool use_original_dst_;
const size_t per_connection_buffer_limit_bytes_;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just have a ListenerOptions as a member var and then you can just copy it in via passed in options.


private:
static void errorCallback(evconnlistener* listener, void* context);
Expand All @@ -60,9 +62,9 @@ class SslListenerImpl : public ListenerImpl {
SslListenerImpl(Network::ConnectionHandler& conn_handler, Event::DispatcherImpl& dispatcher,
Ssl::Context& ssl_ctx, ListenSocket& socket, ListenerCallbacks& cb,
Stats::Store& stats_store, bool bind_to_port, bool use_proxy_proto,
bool use_orig_dst)
bool use_orig_dst, size_t per_connection_buffer_limit_bytes)
: ListenerImpl(conn_handler, dispatcher, socket, cb, stats_store, bind_to_port,
use_proxy_proto, use_orig_dst),
use_proxy_proto, use_orig_dst, per_connection_buffer_limit_bytes),
ssl_ctx_(ssl_ctx) {}

// ListenerImpl
Expand Down
4 changes: 4 additions & 0 deletions source/common/ssl/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ Network::ConnectionImpl::IoResult ConnectionImpl::doReadFromSocket() {

if (slices_to_commit > 0) {
read_buffer_.commit(slices, slices_to_commit);
if (shouldDrainReadBuffer()) {
setReadBufferReady();
keep_reading = false;
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions source/server/configuration_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ MainImpl::ListenerConfig::ListenerConfig(MainImpl& parent, Json::Object& json)
bind_to_port_ = json.getBoolean("bind_to_port", true);
use_proxy_proto_ = json.getBoolean("use_proxy_proto", false);
use_original_dst_ = json.getBoolean("use_original_dst", false);
per_connection_buffer_limit_bytes_ =
json.getInteger("per_connection_buffer_limit_bytes", 1024 * 1024);

std::vector<Json::ObjectPtr> filters = json.getObjectArray("filters");
for (size_t i = 0; i < filters.size(); i++) {
Expand Down
2 changes: 2 additions & 0 deletions source/server/configuration_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class MainImpl : Logger::Loggable<Logger::Id::config>, public Main {
Ssl::ServerContext* sslContext() override { return ssl_context_.get(); }
bool useProxyProto() override { return use_proxy_proto_; }
bool useOriginalDst() override { return use_original_dst_; }
size_t perConnectionBufferLimitBytes() override { return per_connection_buffer_limit_bytes_; }

// Network::FilterChainFactory
bool createFilterChain(Network::Connection& connection) override;
Expand All @@ -107,6 +108,7 @@ class MainImpl : Logger::Loggable<Logger::Id::config>, public Main {
Ssl::ServerContextPtr ssl_context_;
bool use_proxy_proto_{};
bool use_original_dst_{};
size_t per_connection_buffer_limit_bytes_{};
std::list<NetworkFilterFactoryCb> filter_factories_;
};

Expand Down
Loading