Skip to content

Commit

Permalink
listen_socket: Support multiple options and allow socket options to b…
Browse files Browse the repository at this point in the history
…e set before bind(). (#2734)

Support multiple options by collecting options to a list and then
applying all of them in order on the socket.

So far the listen socket options have been set in the internal
setSocket() function after the socket has already been bound. This
has the benefit of (re)setting the options whenever a listener
configuration is changed, which may involve a change in listener
filter configurarion. Some socket options, however, need to be set
before the bind() call for them to have the desired effect.

This commit adds an enum SocketState parameter to the setOption()
callback, telling the options implementation whether the options are being
set before or after the bind() call. The setOption() callback is called
on the listen socket once with state=PreBind when the socket is first
created right before the bind() call. Then, after the bind() call and
when whenever the listener socket is reused for a new or modified listener,
the setOption() callback is called as before, but now with state=PostBind.

Client connections will have the setOptions() called before the bind
(like before) and with state=PreBind. Client sockets are not recycled
like the listen sockets, so they will not have the setOptions() called
again later.

Risk Level: Low

Signed-off-by: Jarno Rajahalme <jarno@covalent.io>
  • Loading branch information
jrajahalme authored and htuch committed Mar 25, 2018
1 parent a596be7 commit 5123d31
Show file tree
Hide file tree
Showing 27 changed files with 248 additions and 153 deletions.
29 changes: 17 additions & 12 deletions include/envoy/network/listen_socket.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <memory>
#include <vector>

#include "envoy/common/pure.h"
#include "envoy/network/address.h"
Expand Down Expand Up @@ -30,36 +31,40 @@ class Socket {
*/
virtual void close() PURE;

enum class SocketState { PreBind, PostBind };

/**
* Visitor class for setting socket options.
*/
class Options {
class Option {
public:
virtual ~Options() {}
virtual ~Option() {}

/**
* @param socket the socket on which to apply options.
* @param state the current state of the socket. Significant for options that can only be
* set for some particular state of the socket.
* @return true if succeeded, false otherwise.
*/
virtual bool setOptions(Socket& socket) const PURE;
virtual bool setOption(Socket& socket, SocketState state) const PURE;

/**
* @return bits that can be used to separate connections based on the options. Should return
* zero if connections with different options can be pooled together. This is limited
* to 32 bits to allow these bits to be efficiently combined into a larger hash key
* used in connection pool lookups.
* @param vector of bytes to which the option should append hash key data that will be used
* to separate connections based on the option. Any data already in the key vector must
* not be modified.
*/
virtual uint32_t hashKey() const PURE;
virtual void hashKey(std::vector<uint8_t>& key) const PURE;
};
typedef std::shared_ptr<Options> OptionsSharedPtr;
typedef std::unique_ptr<Option> OptionPtr;
typedef std::shared_ptr<std::vector<OptionPtr>> OptionsSharedPtr;

/**
* Set the socket options for later retrieval with options().
* Add a socket option visitor for later retrieval with options().
*/
virtual void setOptions(const OptionsSharedPtr&) PURE;
virtual void addOption(OptionPtr&&) PURE;

/**
* @return the socket options stored earlier with setOptions(), if any.
* @return the socket options stored earlier with addOption() calls, if any.
*/
virtual const OptionsSharedPtr& options() const PURE;
};
Expand Down
2 changes: 1 addition & 1 deletion include/envoy/server/filter_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class ListenerFactoryContext : public FactoryContext {
/**
* Store socket options to be set on the listen socket before listening.
*/
virtual void setListenSocketOptions(const Network::Socket::OptionsSharedPtr& options) PURE;
virtual void addListenSocketOption(Network::Socket::OptionPtr&& option) PURE;
};

/**
Expand Down
4 changes: 3 additions & 1 deletion include/envoy/server/listener_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ class ListenerComponentFactory {
/**
* Creates a socket.
* @param address supplies the socket's address.
* @param options to be set on the created socket just before calling 'bind()'.
* @param bind_to_port supplies whether to actually bind the socket.
* @return Network::SocketSharedPtr an initialized and potentially bound socket.
*/
virtual Network::SocketSharedPtr
createListenSocket(Network::Address::InstanceConstSharedPtr address, bool bind_to_port) PURE;
createListenSocket(Network::Address::InstanceConstSharedPtr address,
const Network::Socket::OptionsSharedPtr& options, bool bind_to_port) PURE;

/**
* Creates a list of filter factories.
Expand Down
16 changes: 9 additions & 7 deletions source/common/network/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -539,13 +539,15 @@ ClientConnectionImpl::ClientConnectionImpl(
: ConnectionImpl(dispatcher, std::make_unique<ClientSocketImpl>(remote_address),
std::move(transport_socket), false) {
if (options) {
if (!options->setOptions(*socket_)) {
// Set a special error state to ensure asynchronous close to give the owner of the
// ConnectionImpl a chance to add callbacks and detect the "disconnect".
immediate_error_event_ = ConnectionEvent::LocalClose;
// Trigger a write event to close this connection out-of-band.
file_event_->activate(Event::FileReadyType::Write);
return;
for (const auto& option : *options) {
if (!option->setOption(*socket_, Socket::SocketState::PreBind)) {
// Set a special error state to ensure asynchronous close to give the owner of the
// ConnectionImpl a chance to add callbacks and detect the "disconnect".
immediate_error_event_ = ConnectionEvent::LocalClose;
// Trigger a write event to close this connection out-of-band.
file_event_->activate(Event::FileReadyType::Write);
return;
}
}
}
if (source_address != nullptr) {
Expand Down
23 changes: 20 additions & 3 deletions source/common/network/listen_socket_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,38 @@ void ListenSocketImpl::doBind() {
}
}

TcpListenSocket::TcpListenSocket(const Address::InstanceConstSharedPtr& address, bool bind_to_port)
void ListenSocketImpl::setListenSocketOptions(const Network::Socket::OptionsSharedPtr& options) {
if (options) {
for (const auto& option : *options) {
if (!option->setOption(*this, SocketState::PreBind)) {
throw EnvoyException("ListenSocket: Setting socket options failed");
}
}
}
}

TcpListenSocket::TcpListenSocket(const Address::InstanceConstSharedPtr& address,
const Network::Socket::OptionsSharedPtr& options,
bool bind_to_port)
: ListenSocketImpl(address->socket(Address::SocketType::Stream), address) {
RELEASE_ASSERT(fd_ != -1);

int on = 1;
int rc = setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
RELEASE_ASSERT(rc != -1);

setListenSocketOptions(options);

if (bind_to_port) {
doBind();
}
}

TcpListenSocket::TcpListenSocket(int fd, const Address::InstanceConstSharedPtr& address)
: ListenSocketImpl(fd, address) {}
TcpListenSocket::TcpListenSocket(int fd, const Address::InstanceConstSharedPtr& address,
const Network::Socket::OptionsSharedPtr& options)
: ListenSocketImpl(fd, address) {
setListenSocketOptions(options);
}

UdsListenSocket::UdsListenSocket(const Address::InstanceConstSharedPtr& address)
: ListenSocketImpl(address->socket(Address::SocketType::Stream), address) {
Expand Down
14 changes: 11 additions & 3 deletions source/common/network/listen_socket_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@ class SocketImpl : public virtual Socket {
fd_ = -1;
}
}
void setOptions(const OptionsSharedPtr& options) override { options_ = options; }
void addOption(OptionPtr&& option) override {
if (!options_) {
options_ = std::make_shared<std::vector<OptionPtr>>();
}
options_->emplace_back(std::move(option));
}
const OptionsSharedPtr& options() const override { return options_; }

protected:
Expand All @@ -44,15 +49,18 @@ class ListenSocketImpl : public SocketImpl {
: SocketImpl(fd, local_address) {}

void doBind();
void setListenSocketOptions(const Network::Socket::OptionsSharedPtr& options);
};

/**
* Wraps a unix socket.
*/
class TcpListenSocket : public ListenSocketImpl {
public:
TcpListenSocket(const Address::InstanceConstSharedPtr& address, bool bind_to_port);
TcpListenSocket(int fd, const Address::InstanceConstSharedPtr& address);
TcpListenSocket(const Address::InstanceConstSharedPtr& address,
const Network::Socket::OptionsSharedPtr& options, bool bind_to_port);
TcpListenSocket(int fd, const Address::InstanceConstSharedPtr& address,
const Network::Socket::OptionsSharedPtr& options);
};

typedef std::unique_ptr<TcpListenSocket> TcpListenSocketPtr;
Expand Down
17 changes: 10 additions & 7 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -836,28 +836,31 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::connPool(
}

// Inherit socket options from downstream connection, if set.
absl::optional<uint32_t> hash_key;
std::vector<uint8_t> hash_key = {uint8_t(protocol), uint8_t(priority)};

// Use downstream connection socket options for computing connection pool hash key, if any.
// This allows socket options to control connection pooling so that connections with
// different options are not pooled together.
bool have_options = false;
if (context && context->downstreamConnection()) {
const Network::ConnectionSocket::OptionsSharedPtr& options =
context->downstreamConnection()->socketOptions();
if (options) {
hash_key = options->hashKey();
for (const auto& option : *options) {
have_options = true;
option->hashKey(hash_key);
}
}
}

ConnPoolsContainer& container = parent_.host_http_conn_pool_map_[host];
const auto key = container.key(priority, protocol, hash_key ? hash_key.value() : 0);
if (!container.pools_[key]) {
container.pools_[key] = parent_.parent_.factory_.allocateConnPool(
if (!container.pools_[hash_key]) {
container.pools_[hash_key] = parent_.parent_.factory_.allocateConnPool(
parent_.thread_local_dispatcher_, host, priority, protocol,
hash_key ? context->downstreamConnection()->socketOptions() : nullptr);
have_options ? context->downstreamConnection()->socketOptions() : nullptr);
}

return container.pools_[key].get();
return container.pools_[hash_key].get();
}

ClusterManagerPtr ProdClusterManagerFactory::clusterManagerFromProto(
Expand Down
14 changes: 2 additions & 12 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
#include <cstdint>
#include <functional>
#include <list>
#include <map>
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>

#include "envoy/config/bootstrap/v2/bootstrap.pb.h"
Expand Down Expand Up @@ -202,17 +202,7 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
*/
struct ThreadLocalClusterManagerImpl : public ThreadLocal::ThreadLocalObject {
struct ConnPoolsContainer {
typedef std::unordered_map<uint64_t, Http::ConnectionPool::InstancePtr> ConnPools;

uint64_t key(ResourcePriority priority, Http::Protocol protocol, uint32_t hash_key) {
// One bit needed for priority
static_assert(NumResourcePriorities == 2,
"Fix shifts below to match number of bits needed for 'priority'");
// Two bits needed for protocol
static_assert(Http::NumProtocols <= 4,
"Fix shifts below to match number of bits needed for 'protocol'");
return uint64_t(hash_key) << 3 | uint64_t(protocol) << 1 | uint64_t(priority);
}
typedef std::map<std::vector<uint8_t>, Http::ConnectionPool::InstancePtr> ConnPools;

ConnPools pools_;
uint64_t drains_remaining_{};
Expand Down
1 change: 1 addition & 0 deletions source/server/config_validation/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ class ValidationInstance : Logger::Loggable<Logger::Id::main>,
return ProdListenerComponentFactory::createListenerFilterFactoryList_(filters, context);
}
Network::SocketSharedPtr createListenSocket(Network::Address::InstanceConstSharedPtr,
const Network::Socket::OptionsSharedPtr&,
bool) override {
// Returned sockets are not currently used so we can return nothing here safely vs. a
// validation mock.
Expand Down
2 changes: 1 addition & 1 deletion source/server/http/admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ AdminImpl::AdminImpl(const std::string& access_log_path, const std::string& prof
Network::Address::InstanceConstSharedPtr address, Server::Instance& server,
Stats::ScopePtr&& listener_scope)
: server_(server), profile_path_(profile_path),
socket_(new Network::TcpListenSocket(address, true)),
socket_(new Network::TcpListenSocket(address, nullptr, true)),
stats_(Http::ConnectionManagerImpl::generateStats("http.admin.", server_.stats())),
tracing_stats_(Http::ConnectionManagerImpl::generateTracingStats("http.admin.tracing.",
server_.stats())),
Expand Down
33 changes: 19 additions & 14 deletions source/server/listener_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ ProdListenerComponentFactory::createListenerFilterFactoryList_(

Network::SocketSharedPtr
ProdListenerComponentFactory::createListenSocket(Network::Address::InstanceConstSharedPtr address,
const Network::Socket::OptionsSharedPtr& options,
bool bind_to_port) {
ASSERT(address->type() == Network::Address::Type::Ip ||
address->type() == Network::Address::Type::Pipe);
Expand All @@ -94,9 +95,9 @@ ProdListenerComponentFactory::createListenSocket(Network::Address::InstanceConst
const int fd = server_.hotRestart().duplicateParentListenSocket(addr);
if (fd != -1) {
ENVOY_LOG(debug, "obtained socket for address {} from parent", addr);
return std::make_shared<Network::TcpListenSocket>(fd, address);
return std::make_shared<Network::TcpListenSocket>(fd, address, options);
}
return std::make_shared<Network::TcpListenSocket>(address, bind_to_port);
return std::make_shared<Network::TcpListenSocket>(address, options, bind_to_port);
}

DrainManagerPtr
Expand Down Expand Up @@ -275,14 +276,17 @@ void ListenerImpl::setSocket(const Network::SocketSharedPtr& socket) {
socket_ = socket;
// Server config validation sets nullptr sockets.
if (socket_ && listen_socket_options_) {
bool ok = listen_socket_options_->setOptions(*socket_);
const std::string message =
fmt::format("{}: Setting socket options {}", name_, ok ? "succeeded" : "failed");
if (!ok) {
ENVOY_LOG(warn, "{}", message);
throw EnvoyException(message);
} else {
ENVOY_LOG(debug, "{}", message);
// 'pre_bind = false' as bind() is never done after this.
for (const auto& option : *listen_socket_options_) {
bool ok = option->setOption(*socket_, Network::Socket::SocketState::PostBind);
const std::string message =
fmt::format("{}: Setting socket options {}", name_, ok ? "succeeded" : "failed");
if (!ok) {
ENVOY_LOG(warn, "{}", message);
throw EnvoyException(message);
} else {
ENVOY_LOG(debug, "{}", message);
}
}
}
}
Expand Down Expand Up @@ -394,10 +398,11 @@ bool ListenerManagerImpl::addOrUpdateListener(const envoy::api::v2::Listener& co
draining_listener_socket = existing_draining_listener->listener_->getSocket();
}

new_listener->setSocket(
draining_listener_socket
? draining_listener_socket
: factory_.createListenSocket(new_listener->address(), new_listener->bindToPort()));
new_listener->setSocket(draining_listener_socket
? draining_listener_socket
: factory_.createListenSocket(new_listener->address(),
new_listener->listenSocketOptions(),
new_listener->bindToPort()));
if (workers_started_) {
new_listener->debugLog("add warming listener");
warming_listeners_.emplace_back(std::move(new_listener));
Expand Down
10 changes: 8 additions & 2 deletions source/server/listener_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class ProdListenerComponentFactory : public ListenerComponentFactory,
}

Network::SocketSharedPtr createListenSocket(Network::Address::InstanceConstSharedPtr address,
const Network::Socket::OptionsSharedPtr& options,
bool bind_to_port) override;
DrainManagerPtr createDrainManager(envoy::api::v2::Listener::DrainType drain_type) override;
uint64_t nextListenerTag() override { return next_listener_tag_++; }
Expand Down Expand Up @@ -208,6 +209,8 @@ class ListenerImpl : public Network::ListenerConfig,
void initialize();
DrainManager& localDrainManager() const { return *local_drain_manager_; }
void setSocket(const Network::SocketSharedPtr& socket);
void setSocketAndOptions(const Network::SocketSharedPtr& socket);
const Network::Socket::OptionsSharedPtr& listenSocketOptions() { return listen_socket_options_; }

// Network::ListenerConfig
Network::FilterChainFactory& filterChainFactory() override { return *this; }
Expand Down Expand Up @@ -246,8 +249,11 @@ class ListenerImpl : public Network::ListenerConfig,
ThreadLocal::Instance& threadLocal() override { return parent_.server_.threadLocal(); }
Admin& admin() override { return parent_.server_.admin(); }
const envoy::api::v2::core::Metadata& listenerMetadata() const override { return metadata_; };
void setListenSocketOptions(const Network::Socket::OptionsSharedPtr& options) override {
listen_socket_options_ = options;
void addListenSocketOption(Network::Socket::OptionPtr&& option) override {
if (!listen_socket_options_) {
listen_socket_options_ = std::make_shared<std::vector<Network::Socket::OptionPtr>>();
}
listen_socket_options_->emplace_back(std::move(option));
}

// Network::DrainDecision
Expand Down
2 changes: 1 addition & 1 deletion test/common/http/codec_client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ class CodecNetworkTest : public testing::TestWithParam<Network::Address::IpVersi
Network::MockConnectionHandler connection_handler_;
Network::Address::InstanceConstSharedPtr source_address_;
Stats::IsolatedStoreImpl stats_store_;
Network::TcpListenSocket socket_{Network::Test::getAnyAddress(GetParam()), true};
Network::TcpListenSocket socket_{Network::Test::getAnyAddress(GetParam()), nullptr, true};
Http::MockClientConnection* codec_;
std::unique_ptr<CodecClientForTest> client_;
std::shared_ptr<Upstream::MockClusterInfo> cluster_{new NiceMock<Upstream::MockClusterInfo>()};
Expand Down
1 change: 1 addition & 0 deletions test/common/network/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ envoy_cc_test(
deps = [
"//source/common/network:listen_socket_lib",
"//source/common/network:utility_lib",
"//test/mocks/network:network_mocks",
"//test/test_common:environment_lib",
"//test/test_common:network_utility_lib",
],
Expand Down
Loading

0 comments on commit 5123d31

Please sign in to comment.