From c62e46d1b44360d3a5eccfab8d0536dbeb76cbfa Mon Sep 17 00:00:00 2001 From: DMG Date: Wed, 24 Jul 2024 15:42:53 -0700 Subject: [PATCH] Protect dispatchPeer and toWrite Queue If Opening Connection Closes Previously, for OpenBSD there was an intermittent issue with Listener::dispatchPeer (which setups a new peer for a new connection) when the client managed to close the connection before dispatchPeer had finished executing. This was causing a throw for peer->fd(), the actual-fd (derived from fd) being used an index into the set of peers, since the peer's fd was empty due to the close. In the changed code, we allow peer-fd() to return an empty fd without a throw, but also use the transport::toWriteLock to guard against the close messing up write attempts and/or new-connection dispatch attempts. Specific changes: peer.h/peer.c: Provide an actualFd() method for peer which can be used when the caller just wants the actual-fd and doesn't need to know the intermediate "fd" value (which the caller would have been previously fetching just to then call getActualFd). Allows us to protect "fd" from getting closed while that actualFd() method is executing. listener.cc: Uses peer->actualFd() as described above. transport.h/transport.cc: Add closeFd(Fd) method to transport.cc. peer.cc then uses this to close the Fd within peer::closeFd(). This allows the transport code to clean its toWrite queue upon the Fd closing. transport.cc: The "clean up buffers" for toWrite is removed from the face of Transport::removePeer, now done in closeFd(Fd). There are also a few places where we check for an Fd being empty, and some comments added pointing out where Fd is now allowed to be empty, where before the empty Fd would have been prevented by a throw. --- include/pistache/peer.h | 3 +- include/pistache/transport.h | 13 ++-- src/common/http.cc | 2 +- src/common/peer.cc | 48 ++++++++++---- src/common/transport.cc | 122 ++++++++++++++++++++++------------- src/server/listener.cc | 33 +++++++++- version.txt | 2 +- 7 files changed, 158 insertions(+), 65 deletions(-) diff --git a/include/pistache/peer.h b/include/pistache/peer.h index 936748d2e..477999bb7 100644 --- a/include/pistache/peer.h +++ b/include/pistache/peer.h @@ -52,7 +52,8 @@ namespace Pistache::Tcp const Address& address() const; const std::string& hostname(); - Fd fd() const; + Fd fd() const; // can return PS_FD_EMPTY + int actualFd() const; // can return -1 void closeFd(); diff --git a/include/pistache/transport.h b/include/pistache/transport.h index 887ae970b..a26009651 100644 --- a/include/pistache/transport.h +++ b/include/pistache/transport.h @@ -12,10 +12,10 @@ #pragma once -#include -#include #include #include +#include +#include #include #include @@ -59,8 +59,11 @@ namespace Pistache::Tcp #endif ) { - // Always enqueue reponses for sending. Giving preference to consumer - // context means chunked responses could be sent out of order. + // Always enqueue reponses for sending. Giving preference to + // consumer context means chunked responses could be sent out of + // order. + // + // Note: fd could be PS_FD_EMPTY return Async::Promise( [=](Async::Deferred deferred) mutable { BufferHolder holder { buffer }; @@ -110,6 +113,8 @@ namespace Pistache::Tcp } #endif + void closeFd(Fd fd); + // !!!! Make protected like removePeer void removeAllPeers(); // cleans up toWrite and does CLOSE_FD on each diff --git a/src/common/http.cc b/src/common/http.cc index dbafafe25..d4293605f 100644 --- a/src/common/http.cc +++ b/src/common/http.cc @@ -1180,7 +1180,7 @@ namespace Pistache::Http auto* transport = writer.transport_; auto peer = writer.peer(); - auto sockFd = peer->fd(); + auto sockFd = peer->fd(); // may be PS_FD_EMPTY auto buffer = buf->buffer(); return transport->asyncWrite(sockFd, buffer, diff --git a/src/common/peer.cc b/src/common/peer.cc index 51002ea5e..82b0c2e2d 100644 --- a/src/common/peer.cc +++ b/src/common/peer.cc @@ -19,8 +19,8 @@ #include #include -#include #include +#include namespace Pistache::Tcp { @@ -40,15 +40,13 @@ namespace Pistache::Tcp , ssl_(ssl) , id_(getUniqueId()) { - PS_LOG_DEBUG_ARGS("peer %p, fd %" PIST_QUOTE(PS_FD_PRNTFCD) - ", Address ptr %p, ssl %p", + PS_LOG_DEBUG_ARGS("peer %p, fd %" PIST_QUOTE(PS_FD_PRNTFCD) ", Address ptr %p, ssl %p", this, fd, &addr, ssl); } Peer::~Peer() { - PS_LOG_DEBUG_ARGS("peer %p, fd %" PIST_QUOTE(PS_FD_PRNTFCD) - ", Address ptr %p, ssl %p", + PS_LOG_DEBUG_ARGS("peer %p, fd %" PIST_QUOTE(PS_FD_PRNTFCD) ", Address ptr %p, ssl %p", this, fd_, &addr, ssl_); closeFd(); // does nothing if already closed @@ -117,24 +115,52 @@ namespace Pistache::Tcp Fd Peer::fd() const { - if (fd_ == PS_FD_EMPTY) + Fd res_fd(fd_); + + if (res_fd == PS_FD_EMPTY) + { + PS_LOG_DEBUG_ARGS("peer %p has no associated fd", this); + return (PS_FD_EMPTY); + } + + return res_fd; + } + + int Peer::actualFd() const // can return -1 + { + Fd this_fd(fd_); + + if (this_fd == PS_FD_EMPTY) { PS_LOG_DEBUG_ARGS("peer %p has no associated fd", this); - throw std::runtime_error("The peer has no associated fd"); + return (-1); } - return fd_; + return (GET_ACTUAL_FD(this_fd)); } void Peer::closeFd() { + PS_LOG_DEBUG_ARGS("peer %p, fd %" PIST_QUOTE(PS_FD_PRNTFCD), this, fd_); - - if (fd_ != PS_FD_EMPTY) + + auto this_fd = fd_; + + if (this_fd != PS_FD_EMPTY) { - CLOSE_FD(fd_); fd_ = PS_FD_EMPTY; + + if (transport_) + { + // Getting transport to do the close allows transport to clean + // up any transport usage of the Fd, e.g. in the write queue + transport_->closeFd(this_fd); + } + else + { + CLOSE_FD(this_fd); + } } } diff --git a/src/common/transport.cc b/src/common/transport.cc index 1278b57b0..e0f4564b6 100644 --- a/src/common/transport.cc +++ b/src/common/transport.cc @@ -127,11 +127,16 @@ namespace Pistache::Tcp { handlePeer(peer); } + + Guard guard(toWriteLock); Fd fd = peer->fd(); + if (fd == PS_FD_EMPTY) { - Guard guard(toWriteLock); - toWrite.emplace(fd, std::deque {}); + PS_LOG_DEBUG("Empty Fd"); + return; } + + toWrite.emplace(fd, std::deque {}); } #ifdef DEBUG @@ -269,10 +274,21 @@ namespace Pistache::Tcp void Transport::handleIncoming(const std::shared_ptr& peer) { + if (!peer) + { + PS_LOG_DEBUG("Null peer"); + return; + } + char buffer[Const::MaxBuffer] = { 0 }; ssize_t totalBytes = 0; - int fdactual = GET_ACTUAL_FD(peer->fd()); + int fdactual = peer->actualFd(); + if (fdactual < 0) + { + PS_LOG_DEBUG_ARGS("Peer %p has no actual Fd", peer.get()); + return; + } for (;;) { @@ -341,7 +357,11 @@ namespace Pistache::Tcp void Transport::removePeer(const std::shared_ptr& peer) { Fd fd = peer->fd(); - + if (fd == PS_FD_EMPTY) + { + PS_LOG_DEBUG("Empty Fd"); + return; + } { // See comment in transport.h on why peers_ must be mutex-protected std::lock_guard l_guard(peers_mutex_); @@ -357,12 +377,6 @@ namespace Pistache::Tcp } } - { - // Clean up buffers - Guard guard(toWriteLock); - toWrite.erase(fd); - } - // Don't rely on close deleting this FD from the epoll "interest" list. // This is needed in case the FD has been shared with another process. // Sharing should no longer happen by accident as SOCK_CLOEXEC is now set on @@ -376,10 +390,24 @@ namespace Pistache::Tcp peer->closeFd(); } + void Transport::closeFd(Fd fd) + { + if (fd == PS_FD_EMPTY) + { + PS_LOG_DEBUG("Trying to close empty Fd"); + return; + } + + Guard guard(toWriteLock); + toWrite.erase(fd); // Clean up write buffers + + CLOSE_FD(fd); + } + void Transport::removeAllPeers() { PS_TIMEDBG_START_THIS; - + for (;;) { std::shared_ptr peer; @@ -399,7 +427,7 @@ namespace Pistache::Tcp } } - removePeer(peer);// removePeer locks mutex, erases peer from peers_ + removePeer(peer); // removePeer locks mutex, erases peer from peers_ } } @@ -554,9 +582,9 @@ namespace Pistache::Tcp #ifdef __NetBSD__ { // encapsulate - int tcp_nodelay = 0; - sock_opt_res = getsockopt(GET_ACTUAL_FD(fd), tcp_prot_num_, - TCP_NODELAY, &tcp_nodelay, &len); + int tcp_nodelay = 0; + sock_opt_res = getsockopt(GET_ACTUAL_FD(fd), tcp_prot_num_, + TCP_NODELAY, &tcp_nodelay, &len); if (sock_opt_res == 0) tcp_no_push = !tcp_nodelay; } @@ -569,7 +597,7 @@ namespace Pistache::Tcp #endif &tcp_no_push, &len); #endif // of ifdef __NetBSD__ ... else - + if (sock_opt_res == 0) { if (((tcp_no_push == 0) && (msg_more_style)) || ((tcp_no_push != 0) && (!msg_more_style))) @@ -680,16 +708,16 @@ namespace Pistache::Tcp // https://www.man7.org/linux/man-pages/man2/sendfile.2.html // Copies FROM "in_fd" TO "out_fd" // Returns number of bytes written on success, -1 with errno set on error - // + // // If offset is not NULL, then sendfile() does not modify the file offset // of in_fd; otherwise the file offset is adjusted to reflect the number of // bytes read from in_fd. - ssize_t my_sendfile(int out_fd, int in_fd, off_t *offset, size_t count) + ssize_t my_sendfile(int out_fd, int in_fd, off_t* offset, size_t count) { - char buff[65536+16]; + char buff[65536 + 16]; - int read_errors = 0; - int write_errors = 0; + int read_errors = 0; + int write_errors = 0; ssize_t bytes_written_res = 0; off_t in_fd_start_pos = -1; @@ -700,32 +728,30 @@ namespace Pistache::Tcp if (in_fd_start_pos < 0) { PS_LOG_DEBUG("lseek error"); - return(in_fd_start_pos); + return (in_fd_start_pos); } - if (lseek(in_fd, *offset, SEEK_SET) < 0) { PS_LOG_DEBUG("lseek error"); - return(-1); + return (-1); } } - for(;;) + for (;;) { - size_t bytes_to_read = count ? - std::min(sizeof(buff)-16, count) : (sizeof(buff)-16); - + size_t bytes_to_read = count ? std::min(sizeof(buff) - 16, count) : (sizeof(buff) - 16); + ssize_t bytes_read = read(in_fd, &(buff[0]), bytes_to_read); if (bytes_read == 0) // End of file break; - + if (bytes_read < 0) { if ((errno == EINTR) || (errno == EAGAIN)) { PS_LOG_DEBUG("read-interrupted error"); - + read_errors++; if (read_errors < 256) continue; @@ -740,10 +766,10 @@ namespace Pistache::Tcp read_errors = 0; bool re_adjust_pos = false; - + if ((count) && (bytes_read > ((ssize_t)count))) { - bytes_read = ((ssize_t)count); + bytes_read = ((ssize_t)count); re_adjust_pos = true; } @@ -760,11 +786,10 @@ namespace Pistache::Tcp ssize_t bytes_written = write(out_fd, p, bytes_read); if (bytes_written <= 0) { - if ((bytes_written == 0) || (errno == EINTR) || - (errno == EAGAIN)) + if ((bytes_written == 0) || (errno == EINTR) || (errno == EAGAIN)) { PS_LOG_DEBUG("write-interrupted error"); - + write_errors++; if (write_errors < 256) continue; @@ -777,7 +802,7 @@ namespace Pistache::Tcp break; } write_errors = 0; - + bytes_read -= bytes_written; p += bytes_written; bytes_written_res += bytes_written; @@ -789,16 +814,15 @@ namespace Pistache::Tcp // if offset non null, set in_fd file pos to pos from start of this // function - if ((offset) && (bytes_written_res >= 0) && - (lseek(in_fd, in_fd_start_pos, SEEK_SET) < 0)) + if ((offset) && (bytes_written_res >= 0) && (lseek(in_fd, in_fd_start_pos, SEEK_SET) < 0)) { PS_LOG_DEBUG("lseek error"); bytes_written_res = -1; } - return(bytes_written_res); + return (bytes_written_res); } - + #endif // ifdef _IS_BSD ssize_t Transport::sendFile(Fd fd, int file, off_t offset, size_t len) @@ -836,7 +860,7 @@ namespace Pistache::Tcp if (it_second_ssl_is_null) { #ifdef DEBUG - const char * sendfile_fn_name = + const char* sendfile_fn_name = #ifdef _IS_BSD "my_sendfile"; #else @@ -846,9 +870,8 @@ namespace Pistache::Tcp #endif /* PISTACHE_USE_SSL */ PS_LOG_DEBUG_ARGS( - "%s fd %" PIST_QUOTE(PS_FD_PRNTFCD) - " actual-fd %d, file fd %d, len %d", sendfile_fn_name, - fd, GET_ACTUAL_FD(fd), file, len); + "%s fd %" PIST_QUOTE(PS_FD_PRNTFCD) " actual-fd %d, file fd %d, len %d", sendfile_fn_name, + fd, GET_ACTUAL_FD(fd), file, len); #ifdef _USE_LIBEVENT_LIKE_APPLE // !!!! Should we do configureMsgMoreStyle for SSL as well? And @@ -885,9 +908,9 @@ namespace Pistache::Tcp #else #ifdef _IS_BSD - bytesWritten = my_sendfile(GET_ACTUAL_FD(fd), file, &offset, len); + bytesWritten = my_sendfile(GET_ACTUAL_FD(fd), file, &offset, len); #else - bytesWritten = ::sendfile(GET_ACTUAL_FD(fd), file, &offset, len); + bytesWritten = ::sendfile(GET_ACTUAL_FD(fd), file, &offset, len); #endif #endif @@ -1002,6 +1025,8 @@ namespace Pistache::Tcp break; auto fd = write->peerFd; + if (fd == PS_FD_EMPTY) + continue; if (!isPeerFd(fd)) continue; @@ -1052,6 +1077,11 @@ namespace Pistache::Tcp PS_TIMEDBG_START_THIS; Fd fd = peer->fd(); + if (fd == PS_FD_EMPTY) + { + PS_LOG_DEBUG("Empty Fd"); + return; + } { // See comment in transport.h on why peers_ must be mutex-protected diff --git a/src/server/listener.cc b/src/server/listener.cc index a5bc57cdc..410c79af9 100644 --- a/src/server/listener.cc +++ b/src/server/listener.cc @@ -878,8 +878,39 @@ namespace Pistache::Tcp { PS_TIMEDBG_START_THIS; + if (!peer) + { + PS_LOG_DEBUG("Null peer"); + return; + } + + // There is some risk that the Fd belonging to the peer could be closed + // in another thread before this dispatchPeer routine completes. In + // particular, that has been seen to happen occasionally in + // rest_server_test.response_status_code_test in OpenBSD. + // + // To guard against that, we simply need to check for an invalid Fd. We + // also check for an invalid actual-fd for safety's sake. + + int actual_fd = -1; + try + { + actual_fd = peer->actualFd(); + } + catch (...) + { + PS_LOG_INFO_ARGS("Failed to get actual fd from peer %p", + peer.get()); + return; + } + if (actual_fd == -1) + { + PS_LOG_INFO_ARGS("No actual fd for peer %p", peer.get()); + return; + } + auto handlers = reactor_->handlers(transportKey); - auto idx = (GET_ACTUAL_FD(peer->fd())) % handlers.size(); + auto idx = actual_fd % handlers.size(); auto transport = std::static_pointer_cast(handlers[idx]); transport->handleNewPeer(peer); diff --git a/version.txt b/version.txt index 198aaa4da..b4aaa9407 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -0.4.1.20240722 +0.4.1.20240724