Skip to content

Commit

Permalink
Protect dispatchPeer and toWrite Queue If Opening Connection Closes
Browse files Browse the repository at this point in the history
Previously, for OpenBSD there was an intermittent issue with
Listener::dispatchPeer (which setups a new peer for a new connection)
when the client managed to close the connection before dispatchPeer
had finished executing. This was causing a throw for peer->fd(), the
actual-fd (derived from fd) being used an index into the set of peers,
since the peer's fd was empty due to the close.

In the changed code, we allow peer-fd() to return an empty fd without a
throw, but also use the transport::toWriteLock to guard against the
close messing up write attempts and/or new-connection dispatch
attempts.

Specific changes:

peer.h/peer.c: Provide an actualFd() method for peer which can be used
when the caller just wants the actual-fd and doesn't need to know the
intermediate "fd" value (which the caller would have been previously
fetching just to then call getActualFd). Allows us to protect "fd"
from getting closed while that actualFd() method is executing.

listener.cc: Uses peer->actualFd() as described above.

transport.h/transport.cc: Add closeFd(Fd) method to
transport.cc. peer.cc then uses this to close the Fd within
peer::closeFd(). This allows the transport code to clean its toWrite
queue upon the Fd closing.

transport.cc: The "clean up buffers" for toWrite is removed from the
face of Transport::removePeer, now done in closeFd(Fd).

There are also a few places where we check for an Fd being empty, and
some comments added pointing out where Fd is now allowed to be empty,
where before the empty Fd would have been prevented by a throw.
  • Loading branch information
dgreatwood committed Jul 24, 2024
1 parent 46d3be1 commit c62e46d
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 65 deletions.
3 changes: 2 additions & 1 deletion include/pistache/peer.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ namespace Pistache::Tcp

const Address& address() const;
const std::string& hostname();
Fd fd() const;
Fd fd() const; // can return PS_FD_EMPTY
int actualFd() const; // can return -1

void closeFd();

Expand Down
13 changes: 9 additions & 4 deletions include/pistache/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@

#pragma once

#include <pistache/pist_timelog.h>
#include <pistache/pist_quote.h>
#include <pistache/async.h>
#include <pistache/mailbox.h>
#include <pistache/pist_quote.h>
#include <pistache/pist_timelog.h>
#include <pistache/reactor.h>
#include <pistache/stream.h>

Expand Down Expand Up @@ -59,8 +59,11 @@ namespace Pistache::Tcp
#endif
)
{
// Always enqueue reponses for sending. Giving preference to consumer
// context means chunked responses could be sent out of order.
// Always enqueue reponses for sending. Giving preference to
// consumer context means chunked responses could be sent out of
// order.
//
// Note: fd could be PS_FD_EMPTY
return Async::Promise<ssize_t>(
[=](Async::Deferred<ssize_t> deferred) mutable {
BufferHolder holder { buffer };
Expand Down Expand Up @@ -110,6 +113,8 @@ namespace Pistache::Tcp
}
#endif

void closeFd(Fd fd);

// !!!! Make protected like removePeer
void removeAllPeers(); // cleans up toWrite and does CLOSE_FD on each

Expand Down
2 changes: 1 addition & 1 deletion src/common/http.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1180,7 +1180,7 @@ namespace Pistache::Http

auto* transport = writer.transport_;
auto peer = writer.peer();
auto sockFd = peer->fd();
auto sockFd = peer->fd(); // may be PS_FD_EMPTY

auto buffer = buf->buffer();
return transport->asyncWrite(sockFd, buffer,
Expand Down
48 changes: 37 additions & 11 deletions src/common/peer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

#include <pistache/async.h>
#include <pistache/peer.h>
#include <pistache/transport.h>
#include <pistache/pist_quote.h>
#include <pistache/transport.h>

namespace Pistache::Tcp
{
Expand All @@ -40,15 +40,13 @@ namespace Pistache::Tcp
, ssl_(ssl)
, id_(getUniqueId())
{
PS_LOG_DEBUG_ARGS("peer %p, fd %" PIST_QUOTE(PS_FD_PRNTFCD)
", Address ptr %p, ssl %p",
PS_LOG_DEBUG_ARGS("peer %p, fd %" PIST_QUOTE(PS_FD_PRNTFCD) ", Address ptr %p, ssl %p",
this, fd, &addr, ssl);
}

Peer::~Peer()
{
PS_LOG_DEBUG_ARGS("peer %p, fd %" PIST_QUOTE(PS_FD_PRNTFCD)
", Address ptr %p, ssl %p",
PS_LOG_DEBUG_ARGS("peer %p, fd %" PIST_QUOTE(PS_FD_PRNTFCD) ", Address ptr %p, ssl %p",
this, fd_, &addr, ssl_);

closeFd(); // does nothing if already closed
Expand Down Expand Up @@ -117,24 +115,52 @@ namespace Pistache::Tcp

Fd Peer::fd() const
{
if (fd_ == PS_FD_EMPTY)
Fd res_fd(fd_);

if (res_fd == PS_FD_EMPTY)
{
PS_LOG_DEBUG_ARGS("peer %p has no associated fd", this);
return (PS_FD_EMPTY);
}

return res_fd;
}

int Peer::actualFd() const // can return -1
{
Fd this_fd(fd_);

if (this_fd == PS_FD_EMPTY)
{
PS_LOG_DEBUG_ARGS("peer %p has no associated fd", this);
throw std::runtime_error("The peer has no associated fd");
return (-1);
}

return fd_;
return (GET_ACTUAL_FD(this_fd));
}

void Peer::closeFd()
{

PS_LOG_DEBUG_ARGS("peer %p, fd %" PIST_QUOTE(PS_FD_PRNTFCD),
this, fd_);

if (fd_ != PS_FD_EMPTY)

auto this_fd = fd_;

if (this_fd != PS_FD_EMPTY)
{
CLOSE_FD(fd_);
fd_ = PS_FD_EMPTY;

if (transport_)
{
// Getting transport to do the close allows transport to clean
// up any transport usage of the Fd, e.g. in the write queue
transport_->closeFd(this_fd);
}
else
{
CLOSE_FD(this_fd);
}
}
}

Expand Down
Loading

0 comments on commit c62e46d

Please sign in to comment.