diff --git a/src/iocore/eventsystem/CMakeLists.txt b/src/iocore/eventsystem/CMakeLists.txt index 656015c2f4f..63131bcf547 100644 --- a/src/iocore/eventsystem/CMakeLists.txt +++ b/src/iocore/eventsystem/CMakeLists.txt @@ -32,6 +32,7 @@ add_library( UnixEThread.cc UnixEvent.cc UnixEventProcessor.cc + UnixSocket.cc ConfigProcessor.cc RecRawStatsImpl.cc RecProcess.cc diff --git a/src/iocore/eventsystem/P_UnixSocketManager.h b/src/iocore/eventsystem/P_UnixSocketManager.h index 7ed78cead4a..5c65f73ad82 100644 --- a/src/iocore/eventsystem/P_UnixSocketManager.h +++ b/src/iocore/eventsystem/P_UnixSocketManager.h @@ -31,6 +31,8 @@ ****************************************************************************/ #pragma once +#include "UnixSocket.h" + #include "tscore/ink_platform.h" #include "tscore/ink_sock.h" #include "iocore/eventsystem/SocketManager.h" @@ -41,19 +43,6 @@ // 1024 - stdin, stderr, stdout #define EPOLL_MAX_DESCRIPTOR_SIZE 32768 -TS_INLINE bool -transient_error() -{ - bool transient = (errno == EINTR); -#ifdef ENOMEM - transient = transient || (errno == ENOMEM); -#endif -#ifdef ENOBUFS - transient = transient || (errno == ENOBUFS); -#endif - return transient; -} - TS_INLINE int SocketManager::open(const char *path, int oflag, mode_t mode) { @@ -71,80 +60,45 @@ SocketManager::open(const char *path, int oflag, mode_t mode) TS_INLINE int64_t SocketManager::read(int fd, void *buf, int size, void * /* pOLP ATS_UNUSED */) { - int64_t r; - do { - r = ::read(fd, buf, size); - if (likely(r >= 0)) { - break; - } - r = -errno; - } while (r == -EINTR); - return r; + UnixSocket sock{fd}; + return sock.read(buf, size); } TS_INLINE int SocketManager::recv(int fd, void *buf, int size, int flags) { - int r; - do { - if (unlikely((r = ::recv(fd, (char *)buf, size, flags)) < 0)) { - r = -errno; - } - } while (r == -EINTR); - return r; + UnixSocket sock{fd}; + return sock.recv(buf, size, flags); } TS_INLINE int SocketManager::recvfrom(int fd, void *buf, int size, int flags, struct sockaddr *addr, socklen_t *addrlen) { - int r; - do { - r = ::recvfrom(fd, (char *)buf, size, flags, addr, addrlen); - if (unlikely(r < 0)) { - r = -errno; - } - } while (r == -EINTR); - return r; + UnixSocket sock{fd}; + return sock.recvfrom(buf, size, flags, addr, addrlen); } TS_INLINE int SocketManager::recvmsg(int fd, struct msghdr *m, int flags, void * /* pOLP ATS_UNUSED */) { - int r; - do { - if (unlikely((r = ::recvmsg(fd, m, flags)) < 0)) { - r = -errno; - } - } while (r == -EINTR); - return r; + UnixSocket sock{fd}; + return sock.recvmsg(m, flags); } #ifdef HAVE_RECVMMSG TS_INLINE int SocketManager::recvmmsg(int fd, struct mmsghdr *msgvec, int vlen, int flags, struct timespec *timeout, void * /* pOLP ATS_UNUSED */) { - int r; - do { - if (unlikely((r = ::recvmmsg(fd, msgvec, vlen, flags, timeout)) < 0)) { - r = -errno; - // EINVAL can ocur if timeout is invalid. - } - } while (r == -EINTR); - return r; + UnixSocket sock{fd}; + return sock.recvmmsg(msgvec, vlen, flags, timeout); } #endif TS_INLINE int64_t SocketManager::write(int fd, void *buf, int size, void * /* pOLP ATS_UNUSED */) { - int64_t r; - do { - if (likely((r = ::write(fd, buf, size)) >= 0)) { - break; - } - r = -errno; - } while (r == -EINTR); - return r; + UnixSocket sock{fd}; + return sock.write(buf, size); } TS_INLINE int64_t @@ -162,37 +116,22 @@ SocketManager::pwrite(int fd, void *buf, int size, off_t offset, char * /* tag A TS_INLINE int SocketManager::send(int fd, void *buf, int size, int flags) { - int r; - do { - if (unlikely((r = ::send(fd, (char *)buf, size, flags)) < 0)) { - r = -errno; - } - } while (r == -EINTR); - return r; + UnixSocket sock{fd}; + return sock.send(buf, size, flags); } TS_INLINE int SocketManager::sendto(int fd, void *buf, int len, int flags, struct sockaddr const *to, int tolen) { - int r; - do { - if (unlikely((r = ::sendto(fd, (char *)buf, len, flags, to, tolen)) < 0)) { - r = -errno; - } - } while (r == -EINTR); - return r; + UnixSocket sock{fd}; + return sock.sendto(buf, len, flags, to, tolen); } TS_INLINE int SocketManager::sendmsg(int fd, struct msghdr *m, int flags, void * /* pOLP ATS_UNUSED */) { - int r; - do { - if (unlikely((r = ::sendmsg(fd, m, flags)) < 0)) { - r = -errno; - } - } while (r == -EINTR); - return r; + UnixSocket sock{fd}; + return sock.sendmsg(m, flags); } TS_INLINE int64_t @@ -222,54 +161,42 @@ SocketManager::fsync(int fildes) TS_INLINE int SocketManager::poll(struct pollfd *fds, unsigned long nfds, int timeout) { - int r; - do { - if ((r = ::poll(fds, nfds, timeout)) >= 0) { - break; - } - r = -errno; - } while (transient_error()); - return r; + return UnixSocket::poll(fds, nfds, timeout); } TS_INLINE int SocketManager::get_sndbuf_size(int s) { - int bsz = 0; - int bszsz, r; - - bszsz = sizeof(bsz); - r = safe_getsockopt(s, SOL_SOCKET, SO_SNDBUF, (char *)&bsz, &bszsz); - return (r == 0 ? bsz : r); + UnixSocket sock{s}; + return sock.get_sndbuf_size(); } TS_INLINE int SocketManager::get_rcvbuf_size(int s) { - int bsz = 0; - int bszsz, r; - - bszsz = sizeof(bsz); - r = safe_getsockopt(s, SOL_SOCKET, SO_RCVBUF, (char *)&bsz, &bszsz); - return (r == 0 ? bsz : r); + UnixSocket sock{s}; + return sock.get_rcvbuf_size(); } TS_INLINE int SocketManager::set_sndbuf_size(int s, int bsz) { - return safe_setsockopt(s, SOL_SOCKET, SO_SNDBUF, (char *)&bsz, sizeof(bsz)); + UnixSocket sock{s}; + return sock.set_sndbuf_size(bsz); } TS_INLINE int SocketManager::set_rcvbuf_size(int s, int bsz) { - return safe_setsockopt(s, SOL_SOCKET, SO_RCVBUF, (char *)&bsz, sizeof(bsz)); + UnixSocket sock{s}; + return sock.set_rcvbuf_size(bsz); } TS_INLINE int SocketManager::getsockname(int s, struct sockaddr *sa, socklen_t *sz) { - return ::getsockname(s, sa, sz); + UnixSocket sock{s}; + return sock.getsockname(sa, sz); } TS_INLINE int @@ -281,11 +208,6 @@ SocketManager::socket(int domain, int type, int protocol) TS_INLINE int SocketManager::shutdown(int s, int how) { - int res; - do { - if (unlikely((res = ::shutdown(s, how)) < 0)) { - res = -errno; - } - } while (res == -EINTR); - return res; + UnixSocket sock{s}; + return sock.shutdown(how); } diff --git a/src/iocore/eventsystem/SocketManager.cc b/src/iocore/eventsystem/SocketManager.cc index f3057042f40..efc8611cf4b 100644 --- a/src/iocore/eventsystem/SocketManager.cc +++ b/src/iocore/eventsystem/SocketManager.cc @@ -25,95 +25,37 @@ SocketManager.cc ****************************************************************************/ + +#include "UnixSocket.h" + #include "tscore/ink_platform.h" #include "P_EventSystem.h" #include "tscore/TextBuffer.h" -#if !HAVE_ACCEPT4 -static int -accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags) -{ - int fd, err; - - do { - fd = accept(sockfd, addr, addrlen); - if (likely(fd >= 0)) - break; - } while (transient_error()); - - if ((fd >= 0) && (flags & SOCK_CLOEXEC) && (safe_fcntl(fd, F_SETFD, FD_CLOEXEC) < 0)) { - err = errno; - close(fd); - errno = err; - return -1; - } - - if ((fd >= 0) && (flags & SOCK_NONBLOCK) && (safe_nonblocking(fd) < 0)) { - err = errno; - close(fd); - errno = err; - return -1; - } - - return fd; -} -#endif - int SocketManager::accept4(int s, struct sockaddr *addr, socklen_t *addrlen, int flags) { - do { - int fd = ::accept4(s, addr, addrlen, flags); - if (likely(fd >= 0)) { - return fd; - } - } while (transient_error()); - - return -errno; + UnixSocket sock{s}; + return sock.accept4(addr, addrlen, flags); } int -SocketManager::ink_bind(int s, struct sockaddr const *name, int namelen, short Proto) +SocketManager::ink_bind(int s, struct sockaddr const *name, int namelen, short /* Proto ATS_UNUSED */) { - (void)Proto; - return safe_bind(s, name, namelen); + UnixSocket sock{s}; + return sock.bind(name, namelen); } int SocketManager::close(int s) { - int res; - - if (s == 0) { - return -EACCES; - } else if (s < 0) { - return -EINVAL; - } - - do { - res = ::close(s); - if (res == -1) { - res = -errno; - } - } while (res == -EINTR); - return res; + UnixSocket sock{s}; + return sock.close(); } bool SocketManager::fastopen_supported() { - static const unsigned TFO_CLIENT_ENABLE = 1; - - ats_scoped_fd fd(::open("/proc/sys/net/ipv4/tcp_fastopen", O_RDONLY)); - int value = 0; - - if (fd) { - TextBuffer buffer(16); - - buffer.slurp(fd.get()); - value = atoi(buffer.bufPtr()); - } - - return value & TFO_CLIENT_ENABLE; + return UnixSocket::client_fastopen_supported(); } diff --git a/src/iocore/eventsystem/UnixSocket.cc b/src/iocore/eventsystem/UnixSocket.cc new file mode 100644 index 00000000000..6a750e5b9bf --- /dev/null +++ b/src/iocore/eventsystem/UnixSocket.cc @@ -0,0 +1,140 @@ +/** @file + + Provides a wrapper for a Unix socket. + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#include "UnixSocket.h" + +#include "tscore/ink_apidefs.h" +#include "tscore/ink_config.h" +#include "tscore/TextBuffer.h" +#include "tscore/ink_memory.h" +#include "tscore/ink_platform.h" +#include "tscore/ink_sock.h" + +#include + +namespace +{ +enum class TCPFastopenMask { + CLIENT_ENABLE = 1, + SERVER_ENABLE, + CLIENT_NOCOOKIE, + SERVER_NOCOOKIE, + SERVER_IMPLICIT_ENABLE, + + MAX_VALUE, +}; +} // end anonymous namespace + +#if !HAVE_ACCEPT4 +static int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags); +#endif +static unsigned int read_uint_from_fd(int fd); + +int +UnixSocket::bind(struct sockaddr const *name, int namelen) +{ + return safe_bind(this->sock_fd, name, namelen); +} + +int +UnixSocket::accept4(struct sockaddr *addr, socklen_t *addrlen, int flags) const +{ + do { + int fd = ::accept4(this->sock_fd, addr, addrlen, flags); + if (likely(fd >= 0)) { + return fd; + } + } while (transient_error()); + + return -errno; +} + +#if !HAVE_ACCEPT4 +static int +accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags) +{ + int fd, err; + + do { + fd = accept(sockfd, addr, addrlen); + if (likely(fd >= 0)) + break; + } while (transient_error()); + + if ((fd >= 0) && (flags & SOCK_CLOEXEC) && (safe_fcntl(fd, F_SETFD, FD_CLOEXEC) < 0)) { + err = errno; + close(fd); + errno = err; + return -1; + } + + if ((fd >= 0) && (flags & SOCK_NONBLOCK) && (safe_nonblocking(fd) < 0)) { + err = errno; + close(fd); + errno = err; + return -1; + } + + return fd; +} +#endif // !HAVE_ACCEPT4 + +int +UnixSocket::close() +{ + int res; + + if (this->sock_fd == 0) { + return -EACCES; + } else if (this->sock_fd < 0) { + return -EINVAL; + } + + do { + res = ::close(this->sock_fd); + if (res == -1) { + res = -errno; + } + } while (res == -EINTR); + return res; +} + +bool +UnixSocket::client_fastopen_supported() +{ + ats_scoped_fd fd{::open("/proc/sys/net/ipv4/tcp_fastopen", O_RDONLY)}; + unsigned int bitfield{read_uint_from_fd(fd.get())}; + return bitfield & static_cast(TCPFastopenMask::CLIENT_ENABLE); +} + +static unsigned int +read_uint_from_fd(int fd) +{ + int result{}; + if (fd) { + TextBuffer buffer(16); + buffer.slurp(fd); + result = std::atoi(buffer.bufPtr()); + } + return static_cast(result); +} diff --git a/src/iocore/eventsystem/UnixSocket.h b/src/iocore/eventsystem/UnixSocket.h new file mode 100644 index 00000000000..7d80112dbd1 --- /dev/null +++ b/src/iocore/eventsystem/UnixSocket.h @@ -0,0 +1,308 @@ +/** @file + + Provides a wrapper for a Unix socket. + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#pragma once + +#include "tscore/ink_apidefs.h" +#include "tscore/ink_platform.h" +#include "tscore/ink_sock.h" + +#include + +#define NO_SOCK -1 + +#ifndef SOCK_NONBLOCK +#define SOCK_NONBLOCK O_NONBLOCK +#endif + +#ifndef SOCK_CLOEXEC +#define SOCK_CLOEXEC O_CLOEXEC +#endif + +#ifndef MSG_FASTOPEN +#if defined(__linux__) +#define MSG_FASTOPEN 0x20000000 +#else +#define MSG_FASTOPEN 0 +#endif +#endif + +bool transient_error(); + +class UnixSocket +{ +public: + UnixSocket(int fd); + + /** Get a new socket. + * + * Call has_socket() to determine whether this call succeeded. If the call + * failed, errno will be set to indicate the error. + * + * @see has_socket + */ + UnixSocket(int domain, int ctype, int protocol); + + bool has_socket() const; + + int bind(struct sockaddr const *name, int namelen); + int accept4(struct sockaddr *addr, socklen_t *addrlen, int flags) const; + + std::int64_t read(void *buf, int size) const; + + int recv(void *buf, int size, int flags) const; + int recvfrom(void *buf, int size, int flags, struct sockaddr *addr, socklen_t *addrlen) const; + int recvmsg(struct msghdr *m, int flags) const; +#ifdef HAVE_RECVMMSG + int recvmmsg(struct mmsghdr *msgvec, int vlen, int flags, struct timespec *timeout) const; +#endif + + std::int64_t write(void *buf, int size) const; + + int send(void *buf, int size, int flags) const; + int sendto(void *buf, int size, int flags, struct sockaddr const *to, int tolen) const; + int sendmsg(struct msghdr const *m, int flags) const; + + static int poll(struct pollfd *fds, unsigned long nfds, int timeout); + + int getsockname(struct sockaddr *sa, socklen_t *sz) const; + + int get_sndbuf_size() const; + int get_rcvbuf_size() const; + int set_sndbuf_size(int bsz); + int set_rcvbuf_size(int bsz); + + int close(); + int shutdown(int how); + + static bool client_fastopen_supported(); + +private: + int sock_fd{NO_SOCK}; +}; + +inline UnixSocket::UnixSocket(int fd) : sock_fd{fd} {} + +inline UnixSocket::UnixSocket(int domain, int type, int protocol) +{ + this->sock_fd = socket(domain, type, protocol); +} + +inline bool +UnixSocket::has_socket() const +{ + return NO_SOCK != this->sock_fd; +} + +inline std::int64_t +UnixSocket::read(void *buf, int size) const +{ + std::int64_t r; + do { + r = ::read(this->sock_fd, buf, size); + if (likely(r >= 0)) { + break; + } + r = -errno; + } while (r == -EINTR); + return r; +} + +inline int +UnixSocket::recv(void *buf, int size, int flags) const +{ + int r; + do { + if (unlikely((r = ::recv(this->sock_fd, static_cast(buf), size, flags)) < 0)) { + r = -errno; + } + } while (r == -EINTR); + return r; +} + +inline int +UnixSocket::recvfrom(void *buf, int size, int flags, struct sockaddr *addr, socklen_t *addrlen) const +{ + int r; + do { + r = ::recvfrom(this->sock_fd, static_cast(buf), size, flags, addr, addrlen); + if (unlikely(r < 0)) { + r = -errno; + } + } while (r == -EINTR); + return r; +} + +inline int +UnixSocket::recvmsg(struct msghdr *m, int flags) const +{ + int r; + do { + if (unlikely((r = ::recvmsg(this->sock_fd, m, flags)) < 0)) { + r = -errno; + } + } while (r == -EINTR); + return r; +} + +#ifdef HAVE_RECVMMSG +inline int +UnixSocket::recvmmsg(struct mmsghdr *msgvec, int vlen, int flags, struct timespec *timeout) const +{ + int r; + do { + if (unlikely((r = ::recvmmsg(this->sock_fd, msgvec, vlen, flags, timeout)) < 0)) { + r = -errno; + // EINVAL can ocur if timeout is invalid. + } + } while (r == -EINTR); + return r; +} +#endif + +inline std::int64_t +UnixSocket::write(void *buf, int size) const +{ + std::int64_t r; + do { + if (likely((r = ::write(this->sock_fd, buf, size)) >= 0)) { + break; + } + r = -errno; + } while (r == -EINTR); + return r; +} + +inline int +UnixSocket::send(void *buf, int size, int flags) const +{ + int r; + do { + if (unlikely((r = ::send(this->sock_fd, static_cast(buf), size, flags)) < 0)) { + r = -errno; + } + } while (r == -EINTR); + return r; +} + +inline int +UnixSocket::sendto(void *buf, int len, int flags, struct sockaddr const *to, int tolen) const +{ + int r; + do { + if (unlikely((r = ::sendto(this->sock_fd, (char *)buf, len, flags, to, tolen)) < 0)) { + r = -errno; + } + } while (r == -EINTR); + return r; +} + +inline int +UnixSocket::sendmsg(struct msghdr const *m, int flags) const +{ + int r; + do { + if (unlikely((r = ::sendmsg(this->sock_fd, m, flags)) < 0)) { + r = -errno; + } + } while (r == -EINTR); + return r; +} + +inline int +UnixSocket::poll(struct pollfd *fds, unsigned long nfds, int timeout) +{ + int r; + do { + if ((r = ::poll(fds, nfds, timeout)) >= 0) { + break; + } + r = -errno; + } while (transient_error()); + return r; +} + +inline int +UnixSocket::getsockname(struct sockaddr *sa, socklen_t *sz) const +{ + return ::getsockname(this->sock_fd, sa, sz); +} + +inline int +UnixSocket::get_sndbuf_size() const +{ + int bsz = 0; + int bszsz, r; + + bszsz = sizeof(bsz); + r = safe_getsockopt(this->sock_fd, SOL_SOCKET, SO_SNDBUF, (char *)&bsz, &bszsz); + return (r == 0 ? bsz : r); +} + +inline int +UnixSocket::get_rcvbuf_size() const +{ + int bsz = 0; + int bszsz, r; + + bszsz = sizeof(bsz); + r = safe_getsockopt(this->sock_fd, SOL_SOCKET, SO_RCVBUF, (char *)&bsz, &bszsz); + return (r == 0 ? bsz : r); +} + +inline int +UnixSocket::set_sndbuf_size(int bsz) +{ + return safe_setsockopt(this->sock_fd, SOL_SOCKET, SO_SNDBUF, (char *)&bsz, sizeof(bsz)); +} + +inline int +UnixSocket::set_rcvbuf_size(int bsz) +{ + return safe_setsockopt(this->sock_fd, SOL_SOCKET, SO_RCVBUF, (char *)&bsz, sizeof(bsz)); +} + +inline int +UnixSocket::shutdown(int how) +{ + int res; + do { + if (unlikely((res = ::shutdown(this->sock_fd, how)) < 0)) { + res = -errno; + } + } while (res == -EINTR); + return res; +} + +inline bool +transient_error() +{ + bool transient = (errno == EINTR); +#ifdef ENOMEM + transient = transient || (errno == ENOMEM); +#endif +#ifdef ENOBUFS + transient = transient || (errno == ENOBUFS); +#endif + return transient; +}