Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce listener TCP connection buffer configuration and implement … #558

Merged
merged 6 commits into from
Mar 13, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/configuration/listeners/listeners.rst
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ use_original_dst
destination port. If there is no listener associated with the original destination port, the
connection is handled by the listener that receives it. Default is false.

per_connection_buffer_limit_bytes
*(optional, integer)* Soft limit on size of the listener's new connection read and write buffers.
If unspecified, an implementation defined default is applied (1MB).

.. toctree::
:hidden:

Expand Down
36 changes: 11 additions & 25 deletions include/envoy/event/dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,20 +79,13 @@ class Dispatcher {
* @param socket supplies the socket to listen on.
* @param cb supplies the callbacks to invoke for listener events.
* @param stats_store supplies the Stats::Store to use.
* @param bind_to_port specifies if the listener should actually bind to the port.
* a listener that doesn't bind can only receive connections redirected from
* other listeners that that set use_origin_dst to true
* @param use_proxy_proto whether to use the PROXY Protocol V1
* (http://www.haproxy.org/download/1.5/doc/proxy-protocol.txt)
* @param use_orig_dst if a connection was redirected to this port using iptables,
* allow the listener to hand it off to the listener associated to the original port
* @param listener_options listener configuration options.
* @return Network::ListenerPtr a new listener that is owned by the caller.
*/
virtual Network::ListenerPtr createListener(Network::ConnectionHandler& conn_handler,
Network::ListenSocket& socket,
Network::ListenerCallbacks& cb,
Stats::Store& stats_store, bool bind_to_port,
bool use_proxy_proto, bool use_orig_dst) PURE;
virtual Network::ListenerPtr
createListener(Network::ConnectionHandler& conn_handler, Network::ListenSocket& socket,
Network::ListenerCallbacks& cb, Stats::Store& stats_store,
const Network::ListenerOptions& listener_options) PURE;

/**
* Create a listener on a specific port.
Expand All @@ -101,21 +94,14 @@ class Dispatcher {
* @param socket supplies the socket to listen on.
* @param cb supplies the callbacks to invoke for listener events.
* @param stats_store supplies the Stats::Store to use.
* @param bind_to_port specifies if the listener should actually bind to the port.
* a listener that doesn't bind can only receive connections redirected from
* other listeners that set use_origin_dst to true
* @param use_proxy_proto whether to use the PROXY Protocol V1
* (http://www.haproxy.org/download/1.5/doc/proxy-protocol.txt)
* @param use_orig_dst if a connection was redirected to this port using iptables,
* allow the listener to hand it off to the listener associated to the original port
* @param listener_options listener configuration options.
* @return Network::ListenerPtr a new listener that is owned by the caller.
*/
virtual Network::ListenerPtr createSslListener(Network::ConnectionHandler& conn_handler,
Ssl::ServerContext& ssl_ctx,
Network::ListenSocket& socket,
Network::ListenerCallbacks& cb,
Stats::Store& stats_store, bool bind_to_port,
bool use_proxy_proto, bool use_orig_dst) PURE;
virtual Network::ListenerPtr
createSslListener(Network::ConnectionHandler& conn_handler, Ssl::ServerContext& ssl_ctx,
Network::ListenSocket& socket, Network::ListenerCallbacks& cb,
Stats::Store& stats_store,
const Network::ListenerOptions& listener_options) PURE;

/**
* Allocate a timer. @see Event::Timer for docs on how to use the timer.
Expand Down
6 changes: 6 additions & 0 deletions include/envoy/network/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ class Connection : public Event::DeferredDeletable, public FilterManager {
* are installed.
*/
virtual void write(Buffer::Instance& data) PURE;

/**
* Set a soft limit on the size of the read buffer prior to flushing to further stages in the
* processing pipeline.
*/
virtual void setReadBufferLimit(uint32_t limit) PURE;
};

typedef std::unique_ptr<Connection> ConnectionPtr;
Expand Down
22 changes: 5 additions & 17 deletions include/envoy/network/connection_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,32 +21,20 @@ class ConnectionHandler {
* Adds listener to the handler.
* @param factory supplies the configuration factory for new connections.
* @param socket supplies the already bound socket to listen on.
* @param bind_to_port specifies if the listener should actually bind to the port.
* a listener that doesn't bind can only receive connections redirected from
* other listeners that set use_origin_dst to true
* @param use_proxy_proto whether to use the PROXY Protocol V1
* (http://www.haproxy.org/download/1.5/doc/proxy-protocol.txt)
* @param use_orig_dst if a connection was redirected to this port using iptables,
* allow the listener to hand it off to the listener associated to the original port
* @param listener_options listener configuration options.
*/
virtual void addListener(Network::FilterChainFactory& factory, Network::ListenSocket& socket,
bool bind_to_port, bool use_proxy_proto, bool use_orig_dst) PURE;
const Network::ListenerOptions& listener_options) PURE;

/**
* Adds listener to the handler.
* @param factory supplies the configuration factory for new connections.
* @param socket supplies the already bound socket to listen on.
* @param bind_to_port specifies if the listener should actually bind to the port.
* a listener that doesn't bind can only receive connections redirected from
* other listeners that set use_origin_dst to true
* @param use_proxy_proto whether to use the PROXY Protocol V1
* (http://www.haproxy.org/download/1.5/doc/proxy-protocol.txt)
* @param use_orig_dst if a connection was redirected to this port using iptables,
* allow the listener to hand it off to the listener associated to the original port
* @param listener_options listener configuration options.
*/
virtual void addSslListener(Network::FilterChainFactory& factory, Ssl::ServerContext& ssl_ctx,
Network::ListenSocket& socket, bool bind_to_port,
bool use_proxy_proto, bool use_orig_dst) PURE;
Network::ListenSocket& socket,
const Network::ListenerOptions& listener_options) PURE;

/**
* Find a listener based on the provided listener port value.
Expand Down
28 changes: 28 additions & 0 deletions include/envoy/network/listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,34 @@

namespace Network {

/**
* Listener configurations options.
*/
struct ListenerOptions {
// Specifies if the listener should actually bind to the port. A listener that doesn't bind can
// only receive connections redirected from other listeners that set use_origin_dst_ to true.
bool bind_to_port_;
// Whether to use the PROXY Protocol V1
// (http://www.haproxy.org/download/1.5/doc/proxy-protocol.txt)
bool use_proxy_proto_;
// If a connection was redirected to this port using iptables, allow the listener to hand it off
// to the listener associated to the original port.
bool use_original_dst_;
// Soft limit on size of the listener's new connection read and write buffers.
uint32_t per_connection_buffer_limit_bytes_;

/**
* Factory for ListenerOptions with bind_to_port_ set.
* @return ListenerOptions object initialized with bind_to_port_ set.
*/
static ListenerOptions listenerOptionsWithBindToPort() {
return {.bind_to_port_ = true,
.use_proxy_proto_ = false,
.use_original_dst_ = false,
.per_connection_buffer_limit_bytes_ = 0};
}
};

/**
* Callbacks invoked by a listener.
*/
Expand Down
6 changes: 6 additions & 0 deletions include/envoy/server/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ class Listener {
* allow the listener to hand it off to the listener associated to the original port
*/
virtual bool useOriginalDst() PURE;

/**
* @return uint32_t providing a soft limit on size of the listener's new connection read and write
* buffers.
*/
virtual uint32_t perConnectionBufferLimitBytes() PURE;
};

typedef std::unique_ptr<Listener> ListenerPtr;
Expand Down
28 changes: 13 additions & 15 deletions source/common/event/dispatcher_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,24 +79,22 @@ Filesystem::WatcherPtr DispatcherImpl::createFilesystemWatcher() {
return Filesystem::WatcherPtr{new Filesystem::WatcherImpl(*this)};
}

Network::ListenerPtr DispatcherImpl::createListener(Network::ConnectionHandler& conn_handler,
Network::ListenSocket& socket,
Network::ListenerCallbacks& cb,
Stats::Store& stats_store, bool bind_to_port,
bool use_proxy_proto, bool use_orig_dst) {
return Network::ListenerPtr{new Network::ListenerImpl(
conn_handler, *this, socket, cb, stats_store, bind_to_port, use_proxy_proto, use_orig_dst)};
Network::ListenerPtr
DispatcherImpl::createListener(Network::ConnectionHandler& conn_handler,
Network::ListenSocket& socket, Network::ListenerCallbacks& cb,
Stats::Store& stats_store,
const Network::ListenerOptions& listener_options) {
return Network::ListenerPtr{
new Network::ListenerImpl(conn_handler, *this, socket, cb, stats_store, listener_options)};
}

Network::ListenerPtr DispatcherImpl::createSslListener(Network::ConnectionHandler& conn_handler,
Ssl::ServerContext& ssl_ctx,
Network::ListenSocket& socket,
Network::ListenerCallbacks& cb,
Stats::Store& stats_store, bool bind_to_port,
bool use_proxy_proto, bool use_orig_dst) {
Network::ListenerPtr
DispatcherImpl::createSslListener(Network::ConnectionHandler& conn_handler,
Ssl::ServerContext& ssl_ctx, Network::ListenSocket& socket,
Network::ListenerCallbacks& cb, Stats::Store& stats_store,
const Network::ListenerOptions& listener_options) {
return Network::ListenerPtr{new Network::SslListenerImpl(conn_handler, *this, ssl_ctx, socket, cb,
stats_store, bind_to_port,
use_proxy_proto, use_orig_dst)};
stats_store, listener_options)};
}

TimerPtr DispatcherImpl::createTimer(TimerCb cb) { return TimerPtr{new TimerImpl(*this, cb)}; }
Expand Down
7 changes: 3 additions & 4 deletions source/common/event/dispatcher_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,12 @@ class DispatcherImpl : Logger::Loggable<Logger::Id::main>, public Dispatcher {
Filesystem::WatcherPtr createFilesystemWatcher() override;
Network::ListenerPtr createListener(Network::ConnectionHandler& conn_handler,
Network::ListenSocket& socket, Network::ListenerCallbacks& cb,
Stats::Store& stats_store, bool bind_to_port,
bool use_proxy_proto, bool use_orig_dst) override;
Stats::Store& stats_store,
const Network::ListenerOptions& listener_options) override;
Network::ListenerPtr createSslListener(Network::ConnectionHandler& conn_handler,
Ssl::ServerContext& ssl_ctx, Network::ListenSocket& socket,
Network::ListenerCallbacks& cb, Stats::Store& stats_store,
bool bind_to_port, bool use_proxy_proto,
bool use_orig_dst) override;
const Network::ListenerOptions& listener_options) override;
TimerPtr createTimer(TimerCb cb) override;
void deferredDelete(DeferredDeletablePtr&& to_delete) override;
void exit() override;
Expand Down
7 changes: 6 additions & 1 deletion source/common/json/config_schemas.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,12 @@ const std::string Json::Schema::LISTENER_SCHEMA(R"EOF(
"ssl_context" : {"$ref" : "#/definitions/ssl_context"},
"bind_to_port" : {"type": "boolean"},
"use_proxy_proto" : {"type" : "boolean"},
"use_original_dst" : {"type" : "boolean"}
"use_original_dst" : {"type" : "boolean"},
"per_connection_buffer_limit_bytes" : {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"per_connection_read_buffer_limit_bytes" ? Next we will have write high/low watermark so might get confusing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was planning on using per_connection_buffer_limit_bytes to drive both the watermark for the write buffer (automatically setting low watermark at a fraction like 0.5) and the read buffer, under the assumption that we want to keep config simple and that read/write buffer limits should be somewhat symmetric in general. I know there are scenarios where this isn't true, but this could be reasonable for v1.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK sounds good.

"type" : "integer",
"minimum" : 0,
"exclusiveMinimum" : true
}
},
"required": ["port", "filters"],
"additionalProperties": false
Expand Down
10 changes: 6 additions & 4 deletions source/common/network/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ void ConnectionImpl::onFileEvent(uint32_t events) {
}

ConnectionImpl::IoResult ConnectionImpl::doReadFromSocket() {
PostIoAction action;
PostIoAction action = PostIoAction::KeepOpen;
uint64_t bytes_read = 0;
do {
// 16K read is arbitrary. IIRC, libevent will currently clamp this to 4K. libevent will also
Expand All @@ -288,15 +288,17 @@ ConnectionImpl::IoResult ConnectionImpl::doReadFromSocket() {
} else if (rc == -1) {
// Remote error (might be no data).
conn_log_trace("read error: {}", *this, errno);
if (errno == EAGAIN) {
action = PostIoAction::KeepOpen;
} else {
if (errno != EAGAIN) {
action = PostIoAction::Close;
}

break;
} else {
bytes_read += rc;
if (shouldDrainReadBuffer()) {
setReadBufferReady();
break;
}
}
} while (true);

Expand Down
12 changes: 12 additions & 0 deletions source/common/network/connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class ConnectionImpl : public virtual Connection,
Ssl::Connection* ssl() override { return nullptr; }
State state() override;
void write(Buffer::Instance& data) override;
void setReadBufferLimit(uint32_t limit) override { read_buffer_limit_ = limit; }

// Network::BufferSource
Buffer::Instance& getReadBuffer() override { return read_buffer_; }
Expand All @@ -79,6 +80,16 @@ class ConnectionImpl : public virtual Connection,
virtual void closeSocket(uint32_t close_type);
void doConnect();
void raiseEvents(uint32_t events);
// Should the read buffer be drained?
bool shouldDrainReadBuffer() {
return read_buffer_limit_ > 0 && read_buffer_.length() >= read_buffer_limit_;
}
// Mark read buffer ready to read in the event loop. This is used when yielding following
// shouldDrainReadBuffer().
// TODO(htuch): While this is the basis for also yielding to other connections to provide some
// fair sharing of CPU resources, the underlying event loop does not make any fairness guarantees.
// Reconsider how to make fairness happen.
void setReadBufferReady() { file_event_->activate(Event::FileReadyType::Read); }

static const Address::InstancePtr null_local_address_;

Expand All @@ -87,6 +98,7 @@ class ConnectionImpl : public virtual Connection,
Address::InstancePtr local_address_;
Buffer::OwnedImpl read_buffer_;
Buffer::OwnedImpl write_buffer_;
uint32_t read_buffer_limit_ = 0;

private:
// clang-format off
Expand Down
15 changes: 8 additions & 7 deletions source/common/network/listener_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ void ListenerImpl::listenCallback(evconnlistener*, evutil_socket_t fd, sockaddr*
ListenerImpl* listener = static_cast<ListenerImpl*>(arg);

Address::InstancePtr final_local_address = listener->socket_.localAddress();
if (listener->use_original_dst_ && final_local_address->type() == Address::Type::Ip) {
if (listener->options_.use_original_dst_ && final_local_address->type() == Address::Type::Ip) {
Address::InstancePtr original_local_address = listener->getOriginalDst(fd);
if (original_local_address) {
final_local_address = original_local_address;
Expand All @@ -42,7 +42,7 @@ void ListenerImpl::listenCallback(evconnlistener*, evutil_socket_t fd, sockaddr*
}
}

if (listener->use_proxy_proto_) {
if (listener->options_.use_proxy_proto_) {
listener->proxy_protocol_.newConnection(listener->dispatcher_, fd, *listener);
} else {
Address::InstancePtr final_remote_address;
Expand All @@ -62,13 +62,12 @@ void ListenerImpl::listenCallback(evconnlistener*, evutil_socket_t fd, sockaddr*

ListenerImpl::ListenerImpl(Network::ConnectionHandler& conn_handler,
Event::DispatcherImpl& dispatcher, ListenSocket& socket,
ListenerCallbacks& cb, Stats::Store& stats_store, bool bind_to_port,
bool use_proxy_proto, bool use_orig_dst)
ListenerCallbacks& cb, Stats::Store& stats_store,
const Network::ListenerOptions& listener_options)
: connection_handler_(conn_handler), dispatcher_(dispatcher), socket_(socket), cb_(cb),
bind_to_port_(bind_to_port), use_proxy_proto_(use_proxy_proto), proxy_protocol_(stats_store),
use_original_dst_(use_orig_dst), listener_(nullptr) {
proxy_protocol_(stats_store), options_(listener_options), listener_(nullptr) {

if (bind_to_port_) {
if (options_.bind_to_port_) {
listener_.reset(
evconnlistener_new(&dispatcher_.base(), listenCallback, this, 0, -1, socket.fd()));

Expand All @@ -90,6 +89,7 @@ void ListenerImpl::errorCallback(evconnlistener*, void*) {
void ListenerImpl::newConnection(int fd, Address::InstancePtr remote_address,
Address::InstancePtr local_address) {
ConnectionPtr new_connection(new ConnectionImpl(dispatcher_, fd, remote_address, local_address));
new_connection->setReadBufferLimit(options_.per_connection_buffer_limit_bytes_);
cb_.onNewConnection(std::move(new_connection));
}

Expand All @@ -98,6 +98,7 @@ void SslListenerImpl::newConnection(int fd, Address::InstancePtr remote_address,
ConnectionPtr new_connection(new Ssl::ConnectionImpl(dispatcher_, fd, remote_address,
local_address, ssl_ctx_,
Ssl::ConnectionImpl::InitialState::Server));
new_connection->setReadBufferLimit(options_.per_connection_buffer_limit_bytes_);
cb_.onNewConnection(std::move(new_connection));
}

Expand Down
12 changes: 4 additions & 8 deletions source/common/network/listener_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class ListenerImpl : public Listener {
public:
ListenerImpl(Network::ConnectionHandler& conn_handler, Event::DispatcherImpl& dispatcher,
ListenSocket& socket, ListenerCallbacks& cb, Stats::Store& stats_store,
bool bind_to_port, bool use_proxy_proto, bool use_orig_dst);
const ListenerOptions& listener_options);

/**
* Accept/process a new connection.
Expand All @@ -43,10 +43,8 @@ class ListenerImpl : public Listener {
Event::DispatcherImpl& dispatcher_;
ListenSocket& socket_;
ListenerCallbacks& cb_;
const bool bind_to_port_;
const bool use_proxy_proto_;
ProxyProtocol proxy_protocol_;
const bool use_original_dst_;
const ListenerOptions options_;

private:
static void errorCallback(evconnlistener* listener, void* context);
Expand All @@ -59,10 +57,8 @@ class SslListenerImpl : public ListenerImpl {
public:
SslListenerImpl(Network::ConnectionHandler& conn_handler, Event::DispatcherImpl& dispatcher,
Ssl::Context& ssl_ctx, ListenSocket& socket, ListenerCallbacks& cb,
Stats::Store& stats_store, bool bind_to_port, bool use_proxy_proto,
bool use_orig_dst)
: ListenerImpl(conn_handler, dispatcher, socket, cb, stats_store, bind_to_port,
use_proxy_proto, use_orig_dst),
Stats::Store& stats_store, const Network::ListenerOptions& listener_options)
: ListenerImpl(conn_handler, dispatcher, socket, cb, stats_store, listener_options),
ssl_ctx_(ssl_ctx) {}

// ListenerImpl
Expand Down
4 changes: 4 additions & 0 deletions source/common/ssl/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ Network::ConnectionImpl::IoResult ConnectionImpl::doReadFromSocket() {

if (slices_to_commit > 0) {
read_buffer_.commit(slices, slices_to_commit);
if (shouldDrainReadBuffer()) {
setReadBufferReady();
keep_reading = false;
}
}
}

Expand Down
Loading