Skip to content

Commit

Permalink
Server-side support for UNIX-domain stream sockets
Browse files Browse the repository at this point in the history
  • Loading branch information
ronen-fr committed Aug 8, 2019
1 parent 0f1c501 commit 2c364e9
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 30 deletions.
4 changes: 4 additions & 0 deletions include/seastar/core/posix.hh
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,10 @@ public:
auto r = ::bind(_fd, &sa, sl);
throw_system_error_on(r == -1, "bind");
}
void unixdomain_bind(sockaddr_un sa) {
auto r = ::bind(_fd, (struct sockaddr *)&sa, sizeof(sockaddr_un));
throw_system_error_on(r == -1, "bind");
}
void connect(sockaddr& sa, socklen_t sl) {
auto r = ::connect(_fd, &sa, sl);
if (r == -1 && errno == EINPROGRESS) {
Expand Down
2 changes: 1 addition & 1 deletion include/seastar/core/sharded.hh
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public:
class no_sharded_instance_exception : public std::exception {
public:
virtual const char* what() const noexcept override {
return "sharded instance does not exists";
return "sharded instance does not exist";
}
};

Expand Down
2 changes: 2 additions & 0 deletions include/seastar/net/posix-stack.hh
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ public:
};
using posix_tcp_ap_server_socket_impl = posix_ap_server_socket_impl<transport::TCP>;
using posix_sctp_ap_server_socket_impl = posix_ap_server_socket_impl<transport::SCTP>;
using posix_unix_ap_server_socket_impl = posix_ap_server_socket_impl<transport::UNIX>;

template <transport Transport>
class posix_server_socket_impl : public server_socket_impl {
Expand All @@ -166,6 +167,7 @@ public:
};
using posix_server_tcp_socket_impl = posix_server_socket_impl<transport::TCP>;
using posix_server_sctp_socket_impl = posix_server_socket_impl<transport::SCTP>;
using posix_server_unix_socket_impl = posix_server_socket_impl<transport::UNIX>;

template <transport Transport>
class posix_reuseport_server_socket_impl : public server_socket_impl {
Expand Down
24 changes: 23 additions & 1 deletion include/seastar/net/socket_defs.hh
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@
#include <iosfwd>
#include <array>
#include <sys/socket.h>
#include <sys/un.h>
#include <netinet/ip.h>
#include <seastar/net/byteorder.hh>
#include <filesystem>
#include <string_view>

namespace seastar {

Expand All @@ -34,6 +37,7 @@ class inet_address;

struct ipv4_addr;
struct ipv6_addr;
struct ud_addr; // unix-domain socket address

class socket_address {
public:
Expand All @@ -42,6 +46,7 @@ public:
::sockaddr sa;
::sockaddr_in in;
::sockaddr_in6 in6;
::sockaddr_un un;
} u;
socket_address(const sockaddr_in& sa) {
u.in = sa;
Expand All @@ -53,13 +58,17 @@ public:
socket_address(ipv4_addr);
socket_address(const ipv6_addr&);
socket_address(const net::inet_address&, uint16_t p = 0);
socket_address(const std::filesystem::path);
socket_address(const ud_addr&);
socket_address();
::sockaddr& as_posix_sockaddr() { return u.sa; }
::sockaddr_un& as_posix_sockaddr_unix() { return u.un; }
::sockaddr_in& as_posix_sockaddr_in() { return u.in; }
::sockaddr_in6& as_posix_sockaddr_in6() { return u.in6; }
const ::sockaddr& as_posix_sockaddr() const { return u.sa; }
const ::sockaddr_in& as_posix_sockaddr_in() const { return u.in; }
const ::sockaddr_in6& as_posix_sockaddr_in6() const { return u.in6; }
const ::sockaddr_un& as_posix_sockaddr_unix() const { return u.un; }

socket_address(uint32_t, uint16_t p = 0);

Expand All @@ -77,7 +86,8 @@ std::ostream& operator<<(std::ostream&, const socket_address&);

enum class transport {
TCP = IPPROTO_TCP,
SCTP = IPPROTO_SCTP
SCTP = IPPROTO_SCTP,
UNIX
};


Expand Down Expand Up @@ -123,6 +133,14 @@ struct ipv6_addr {
}
};

struct ud_addr {
std::filesystem::path sfile_;

explicit ud_addr(const std::string&);
explicit ud_addr(std::string_view fn) { sfile_ = fn; }
explicit ud_addr(const std::filesystem::path& fn) { sfile_ = fn; }
};

std::ostream& operator<<(std::ostream&, const ipv4_addr&);
std::ostream& operator<<(std::ostream&, const ipv6_addr&);

Expand All @@ -141,5 +159,9 @@ template<>
struct hash<seastar::ipv4_addr> {
size_t operator()(const seastar::ipv4_addr&) const;
};
template<>
struct hash<seastar::ud_addr> {
size_t operator()(const seastar::ud_addr&) const;
};

}
2 changes: 1 addition & 1 deletion src/core/alien.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ size_t message_queue::process_queue(lf_queue& q, Func process) {
++nr;
}
std::fill(std::begin(items) + nr, std::begin(items) + nr + prefetch_cnt, nr ? items[nr - 1] : wi);
unsigned i = 0;
size_t i = 0;
do {
prefetch_n<2>(std::begin(items) + i, std::begin(items) + i + prefetch_cnt);
process(wi);
Expand Down
40 changes: 27 additions & 13 deletions src/core/reactor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1775,21 +1775,35 @@ void reactor_backend_epoll::forget(pollable_fd_state& fd) {

pollable_fd
reactor::posix_listen(socket_address sa, listen_options opts) {
file_desc fd = file_desc::socket(sa.u.sa.sa_family, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, int(opts.proto));
if (opts.reuse_address) {
fd.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1);
}
if (_reuseport)
fd.setsockopt(SOL_SOCKET, SO_REUSEPORT, 1);
if (opts.proto == transport::UNIX) {

try {
fd.bind(sa.u.sa, sizeof(sa.u.sas));
fd.listen(100);
} catch (const std::system_error& s) {
throw std::system_error(s.code(), fmt::format("posix_listen failed for address {}", sa));
}
file_desc fd = file_desc::socket(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
try {
fd.unixdomain_bind(sa.as_posix_sockaddr_unix());
fd.listen(100);
} catch (const std::system_error& s) {
throw std::system_error(s.code(), fmt::format("posix_listen failed for unix-domain path {}", sa.as_posix_sockaddr_unix().sun_path));
}

return pollable_fd(std::move(fd));

return pollable_fd(std::move(fd));
} else {
file_desc fd = file_desc::socket(sa.u.sa.sa_family, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, int(opts.proto));
if (opts.reuse_address) {
fd.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1);
}
if (_reuseport)
fd.setsockopt(SOL_SOCKET, SO_REUSEPORT, 1);

try {
fd.bind(sa.u.sa, sizeof(sa.u.sas));
fd.listen(100);
} catch (const std::system_error& s) {
throw std::system_error(s.code(), fmt::format("posix_listen failed for address {}", sa));
}

return pollable_fd(std::move(fd));
}
}

bool
Expand Down
79 changes: 65 additions & 14 deletions src/net/posix-stack.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,32 @@ class posix_connected_socket_operations<transport::SCTP> {
}
};

template <>
class posix_connected_socket_operations<transport::UNIX> {
public:
void set_nodelay(file_desc& _fd, bool nodelay) {
// meaningless for Unix-domain. No Nagle's algorithm...
}
bool get_nodelay(file_desc& _fd) const {
return true;
}
void set_keepalive(file_desc& _fd, bool keepalive) {
// meaningless for Unix-domain
}
bool get_keepalive(file_desc& _fd) const {
return true; //_fd.getsockopt<int>(SOL_SOCKET, SO_KEEPALIVE);
}
void set_keepalive_parameters(file_desc& _fd, const keepalive_params& params) {
}
keepalive_params get_keepalive_parameters(file_desc& _fd) const {
return tcp_keepalive_params {
std::chrono::seconds(1),
std::chrono::seconds(1),
0
};
}
};

template <transport Transport>
class posix_connected_socket_impl final : public connected_socket_impl, posix_connected_socket_operations<Transport> {
lw_shared_ptr<pollable_fd> _fd;
Expand Down Expand Up @@ -153,8 +179,10 @@ class posix_connected_socket_impl final : public connected_socket_impl, posix_co
friend class posix_ap_network_stack;
friend class posix_socket_impl;
};

using posix_connected_tcp_socket_impl = posix_connected_socket_impl<transport::TCP>;
using posix_connected_sctp_socket_impl = posix_connected_socket_impl<transport::SCTP>;
using posix_connected_unix_socket_impl = posix_connected_socket_impl<transport::UNIX>;

class posix_socket_impl final : public socket_impl {
lw_shared_ptr<pollable_fd> _fd;
Expand All @@ -165,6 +193,9 @@ class posix_socket_impl final : public socket_impl {
static thread_local std::uniform_int_distribution<uint16_t> u(49152/smp::count + 1, 65535/smp::count - 1);
return repeat([this, sa, local, proto, attempts = 0, requested_port = ntoh(local.as_posix_sockaddr_in().sin_port)] () mutable {
uint16_t port = attempts++ < 5 && requested_port == 0 && proto == transport::TCP ? u(random_engine) * smp::count + engine().cpu_id() : requested_port;
if (proto == transport::UNIX)
return make_exception_future<bool_class<stop_iteration_tag>>(std::system_error(ESOCKTNOSUPPORT, std::system_category()));

local.as_posix_sockaddr_in().sin_port = hton(port);
return futurize_apply([this, sa, local] { return engine().posix_connect(_fd, sa, local); }).then_wrapped([] (future<> f) {
try {
Expand All @@ -187,10 +218,17 @@ class posix_socket_impl final : public socket_impl {
_fd = engine().make_pollable_fd(sa, proto);
return find_port_and_connect(sa, local, proto).then([fd = _fd, proto, allocator = _allocator] () mutable {
std::unique_ptr<connected_socket_impl> csi;
if (proto == transport::TCP) {
switch (proto) {
case transport::TCP:
csi.reset(new posix_connected_tcp_socket_impl(std::move(fd), allocator));
} else {
break;
case transport::SCTP:
csi.reset(new posix_connected_sctp_socket_impl(std::move(fd), allocator));
break;
case transport::UNIX:
// no support yet for unix-domain client side
return make_exception_future<connected_socket>(std::system_error(ESOCKTNOSUPPORT, std::system_category()));
;
}
return make_ready_future<connected_socket>(connected_socket(std::move(csi)));
});
Expand Down Expand Up @@ -367,16 +405,22 @@ posix_data_sink_impl::close() {

server_socket
posix_network_stack::listen(socket_address sa, listen_options opt) {
if (opt.proto == transport::TCP) {
switch (opt.proto) {
case transport::TCP:
default:
return _reuseport ?
server_socket(std::make_unique<posix_reuseport_server_tcp_socket_impl>(sa, engine().posix_listen(sa, opt), _allocator))
:
server_socket(std::make_unique<posix_server_tcp_socket_impl>(sa, engine().posix_listen(sa, opt), opt.lba, _allocator));
} else {
case transport::SCTP:
return _reuseport ?
server_socket(std::make_unique<posix_reuseport_server_sctp_socket_impl>(sa, engine().posix_listen(sa, opt), _allocator))
:
server_socket(std::make_unique<posix_server_sctp_socket_impl>(sa, engine().posix_listen(sa, opt), opt.lba, _allocator));
case transport::UNIX: {
auto server_imp = std::make_unique<posix_server_unix_socket_impl>(sa, engine().posix_listen(sa, opt), opt.lba, _allocator);
return server_socket(std::move(server_imp));
}
}
}

Expand All @@ -391,16 +435,23 @@ thread_local std::unordered_multimap<socket_address, typename posix_ap_server_so

server_socket
posix_ap_network_stack::listen(socket_address sa, listen_options opt) {
if (opt.proto == transport::TCP) {
return _reuseport ?
server_socket(std::make_unique<posix_reuseport_server_tcp_socket_impl>(sa, engine().posix_listen(sa, opt)))
:
server_socket(std::make_unique<posix_tcp_ap_server_socket_impl>(sa));
} else {
return _reuseport ?
server_socket(std::make_unique<posix_reuseport_server_sctp_socket_impl>(sa, engine().posix_listen(sa, opt)))
:
server_socket(std::make_unique<posix_sctp_ap_server_socket_impl>(sa));
switch (opt.proto) {

case transport::TCP:
default:
return _reuseport ?
server_socket(std::make_unique<posix_reuseport_server_tcp_socket_impl>(sa, engine().posix_listen(sa, opt)))
:
server_socket(std::make_unique<posix_tcp_ap_server_socket_impl>(sa));

case transport::SCTP:
return _reuseport ?
server_socket(std::make_unique<posix_reuseport_server_sctp_socket_impl>(sa, engine().posix_listen(sa, opt)))
:
server_socket(std::make_unique<posix_sctp_ap_server_socket_impl>(sa));

case transport::UNIX:
return server_socket(std::make_unique<posix_server_unix_socket_impl>(sa, engine().posix_listen(sa, opt), opt.lba));
}
}

Expand Down
11 changes: 11 additions & 0 deletions src/net/stack.cc
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,17 @@ socket_address::socket_address(const ipv6_addr& addr)
std::copy(addr.ip.begin(), addr.ip.end(), u.in6.sin6_addr.s6_addr);
}

socket_address::socket_address(const ud_addr& addr)
{
u.un.sun_family = AF_UNIX;

// Note: I am limiting the socket name to 107, while Linux supports
// the non-portable limit of 108 (allowing strings without the terminating
// '\0'.
strncpy(u.un.sun_path, addr.sfile_.c_str(), sizeof(u.un.sun_path)-1);
u.un.sun_path[sizeof(u.un.sun_path)-1] = '\0';
}

socket_address::socket_address(uint32_t ipv4, uint16_t p)
: socket_address(make_ipv4_address(ipv4, p))
{}
Expand Down

0 comments on commit 2c364e9

Please sign in to comment.