Skip to content

Commit

Permalink
Down the rabbit-hole of DeferredParams clientStreams
Browse files Browse the repository at this point in the history
  • Loading branch information
yadij committed Jul 1, 2024
1 parent 9fd23e1 commit 7516249
Show file tree
Hide file tree
Showing 11 changed files with 79 additions and 80 deletions.
6 changes: 3 additions & 3 deletions src/Downloader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ Downloader::doneAll() const

static void
downloaderRecipient(clientStreamNode * node, ClientHttpRequest * http,
HttpReply * rep, StoreIOBuffer receivedData)
const HttpReplyPointer &reply, StoreIOBuffer receivedData)
{
debugs(33, 6, MYNAME);
/* Test preconditions */
Expand All @@ -120,7 +120,7 @@ downloaderRecipient(clientStreamNode * node, ClientHttpRequest * http,
assert(context);

if (context->downloader.valid())
context->downloader->handleReply(node, http, rep, receivedData);
context->downloader->handleReply(node, http, reply, receivedData);
}

static void
Expand Down Expand Up @@ -188,7 +188,7 @@ Downloader::start()
}

void
Downloader::handleReply(clientStreamNode * node, ClientHttpRequest *http, HttpReply *reply, StoreIOBuffer receivedData)
Downloader::handleReply(clientStreamNode * node, ClientHttpRequest *http, const HttpReplyPointer &reply, StoreIOBuffer receivedData)
{
DownloaderContext::Pointer callerContext = dynamic_cast<DownloaderContext *>(node->data.getRaw());
// TODO: remove the following check:
Expand Down
5 changes: 4 additions & 1 deletion src/Downloader.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ class Downloader: virtual public AsyncJob
/// The nested level of Downloader object (downloads inside downloads).
unsigned int nestedLevel() const {return level_;}

void handleReply(clientStreamNode *, ClientHttpRequest *, HttpReply *, StoreIOBuffer);
/* clientStreams API */

/// \copydoc CSCB
void handleReply(clientStreamNode *, ClientHttpRequest *, const HttpReplyPointer &, StoreIOBuffer);

protected:

Expand Down
7 changes: 2 additions & 5 deletions src/clientStreamForward.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,18 @@
#define SQUID_SRC_CLIENTSTREAMFORWARD_H

#include "enums.h" /* for clientStream_status_t */

class Lock;
template <class C> class RefCount;
#include "http/forward.h"

typedef RefCount<Lock> ClientStreamData;

/* Callbacks for ClientStreams API */

class clientStreamNode;
class ClientHttpRequest;
class HttpReply;
class StoreIOBuffer;

/// client stream read callback
typedef void CSCB(clientStreamNode *, ClientHttpRequest *, HttpReply *, StoreIOBuffer);
typedef void CSCB(clientStreamNode *, ClientHttpRequest *, const HttpReplyPointer &, StoreIOBuffer);

/// client stream read
typedef void CSR(clientStreamNode *, ClientHttpRequest *);
Expand Down
10 changes: 5 additions & 5 deletions src/client_side.cc
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,7 @@ ClientHttpRequest::rangeBoundaryStr() const
*/
void
clientSocketRecipient(clientStreamNode * node, ClientHttpRequest * http,
HttpReply * rep, StoreIOBuffer receivedData)
const HttpReplyPointer &reply, StoreIOBuffer receivedData)
{
// do not try to deliver if client already ABORTED
if (!http->getConn() || !cbdataReferenceValid(http->getConn()) || !Comm::IsConnOpen(http->getConn()->clientConnection))
Expand All @@ -819,11 +819,11 @@ clientSocketRecipient(clientStreamNode * node, ClientHttpRequest * http,

// TODO: enforces HTTP/1 MUST on pipeline order, but is irrelevant to HTTP/2
if (context != http->getConn()->pipeline.front())
context->deferRecipientForLater(node, rep, receivedData);
context->deferRecipientForLater(node, reply, receivedData);
else if (http->getConn()->cbControlMsgSent) // 1xx to the user is pending
context->deferRecipientForLater(node, rep, receivedData);
context->deferRecipientForLater(node, reply, receivedData);
else
http->getConn()->handleReply(rep, receivedData);
http->getConn()->handleReply(reply, receivedData);
}

/**
Expand Down Expand Up @@ -3628,7 +3628,7 @@ ConnStateData::sendControlMsg(HttpControlMsg msg)
typedef CommCbMemFunT<HttpControlMsgSink, CommIoCbParams> Dialer;
AsyncCall::Pointer call = JobCallback(33, 5, Dialer, this, HttpControlMsgSink::wroteControlMsg);

if (!writeControlMsgAndCall(rep.getRaw(), call)) {
if (!writeControlMsgAndCall(rep, call)) {
// but still inform the caller (so it may resume its operation)
doneWithControlMsg();
}
Expand Down
4 changes: 2 additions & 2 deletions src/client_side.h
Original file line number Diff line number Diff line change
Expand Up @@ -315,11 +315,11 @@ class ConnStateData:
void add(const Http::StreamPointer &context);

/// handle a control message received by context from a peer and call back
virtual bool writeControlMsgAndCall(HttpReply *rep, AsyncCall::Pointer &call) = 0;
virtual bool writeControlMsgAndCall(const HttpReplyPointer &, AsyncCall::Pointer &) = 0;

/// ClientStream calls this to supply response header (once) and data
/// for the current Http::Stream.
virtual void handleReply(HttpReply *header, StoreIOBuffer receivedData) = 0;
virtual void handleReply(const HttpReplyPointer &, StoreIOBuffer receivedData) = 0;

/// remove no longer needed leading bytes from the input buffer
void consumeInput(const size_t byteCount);
Expand Down
8 changes: 4 additions & 4 deletions src/esi/Esi.cc
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ esiStreamDetach (clientStreamNode *thisNode, ClientHttpRequest *http)
* There is context data or a reply structure
*/
void
esiProcessStream (clientStreamNode *thisNode, ClientHttpRequest *http, HttpReply *rep, StoreIOBuffer receivedData)
esiProcessStream(clientStreamNode *thisNode, ClientHttpRequest *http, const HttpReplyPointer &reply, StoreIOBuffer receivedData)
{
/* test preconditions */
assert (thisNode != nullptr);
Expand All @@ -684,7 +684,7 @@ esiProcessStream (clientStreamNode *thisNode, ClientHttpRequest *http, HttpReply

if (!thisNode->data.getRaw())
/* setup ESI context from reply headers */
thisNode->data = ESIContextNew(rep, thisNode, http);
thisNode->data = ESIContextNew(reply, thisNode, http);

ESIContext::Pointer context = dynamic_cast<ESIContext *>(thisNode->data.getRaw());

Expand All @@ -697,7 +697,7 @@ esiProcessStream (clientStreamNode *thisNode, ClientHttpRequest *http, HttpReply
* has been detected to prevent ESI processing the error body
*/
if (context->flags.passthrough) {
clientStreamCallback (thisNode, http, rep, receivedData);
clientStreamCallback(thisNode, http, reply, receivedData);
return;
}

Expand Down Expand Up @@ -767,7 +767,7 @@ esiProcessStream (clientStreamNode *thisNode, ClientHttpRequest *http, HttpReply
}

/* EOF / Read error / aborted entry */
if (rep == nullptr && receivedData.data == nullptr && receivedData.length == 0 && !context->flags.finishedtemplate) {
if (!reply && receivedData.data == nullptr && receivedData.length == 0 && !context->flags.finishedtemplate) {
/* TODO: get stream status to test the entry for aborts */
/* else flush the esi processor */
debugs(86, 5, "esiProcess: " << context.getRaw() << " Finished reading upstream data");
Expand Down
16 changes: 8 additions & 8 deletions src/esi/Include.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ esiBufferDetach (clientStreamNode *node, ClientHttpRequest *http)
* not be reinstated or it will trigger bug #975 again - RBC 20060903
*/
void
esiBufferRecipient (clientStreamNode *node, ClientHttpRequest *http, HttpReply *rep, StoreIOBuffer receivedData)
esiBufferRecipient(clientStreamNode *node, ClientHttpRequest *http, const HttpReplyPointer &reply, StoreIOBuffer receivedData)
{
/* Test preconditions */
assert (node != nullptr);
Expand All @@ -80,24 +80,24 @@ esiBufferRecipient (clientStreamNode *node, ClientHttpRequest *http, HttpReply *
assert (receivedData.length <= sizeof(esiStream->localbuffer->buf));
assert (!esiStream->finished);

debugs (86,5, "rep " << rep << " body " << receivedData.data << " len " << receivedData.length);
debugs(86, 5, "reply " << reply << " body " << receivedData.data << " len " << receivedData.length);
assert (node->readBuffer.offset == receivedData.offset || receivedData.length == 0);

/* trivial case */

if (http->out.offset != 0) {
assert(rep == nullptr);
assert(!reply);
} else {
if (rep) {
if (rep->sline.status() != Http::scOkay) {
rep = nullptr;
if (reply) {
if (reply->sline.status() != Http::scOkay) {
reply = nullptr;
esiStream->include->includeFail (esiStream);
esiStream->finished = 1;
httpRequestFree (http);
return;
}

rep = nullptr;
reply = nullptr;
}
}

Expand All @@ -121,7 +121,7 @@ esiBufferRecipient (clientStreamNode *node, ClientHttpRequest *http, HttpReply *
}

/* EOF / Read error / aborted entry */
if (rep == nullptr && receivedData.data == nullptr && receivedData.length == 0) {
if (!reply && receivedData.data == nullptr && receivedData.length == 0) {
/* TODO: get stream status to test the entry for aborts */
debugs(86, 5, "Finished reading upstream data in subrequest");
esiStream->include->subRequestDone (esiStream, true);
Expand Down
39 changes: 19 additions & 20 deletions src/servers/FtpServer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ CBDATA_NAMESPACED_CLASS_INIT(Ftp, Server);

namespace Ftp
{
static void PrintReply(MemBuf &mb, const HttpReply *reply, const char *const prefix = "");
static void PrintReply(MemBuf &, const HttpReplyPointer &, const char *const prefix = "");
static bool SupportedCommand(const SBuf &name);
static bool CommandHasPathParameter(const SBuf &cmd);
};
Expand Down Expand Up @@ -504,13 +504,12 @@ Ftp::Server::writeReply(MemBuf &mb)
}

void
Ftp::Server::writeCustomReply(const int code, const char *msg, const HttpReply *reply)
Ftp::Server::writeCustomReply(const int code, const char *msg, const HttpReplyPointer &reply)
{
debugs(33, 7, code << ' ' << msg);
assert(99 < code && code < 1000);

const bool sendDetails = reply != nullptr &&
reply->header.has(Http::HdrType::FTP_STATUS) && reply->header.has(Http::HdrType::FTP_REASON);
const bool sendDetails = reply && reply->header.has(Http::HdrType::FTP_STATUS) && reply->header.has(Http::HdrType::FTP_REASON);

MemBuf mb;
mb.init();
Expand Down Expand Up @@ -775,7 +774,7 @@ Ftp::Server::parseOneRequest()
}

void
Ftp::Server::handleReply(HttpReply *reply, StoreIOBuffer data)
Ftp::Server::handleReply(const HttpReplyPointer &reply, StoreIOBuffer data)
{
// the caller guarantees that we are dealing with the current context only
Http::StreamPointer context = pipeline.front();
Expand Down Expand Up @@ -809,7 +808,7 @@ Ftp::Server::handleReply(HttpReply *reply, StoreIOBuffer data)
}

void
Ftp::Server::handleFeatReply(const HttpReply *reply, StoreIOBuffer)
Ftp::Server::handleFeatReply(const HttpReplyPointer &reply, StoreIOBuffer)
{
if (pipeline.front()->http->request->error) {
writeCustomReply(502, "Server does not support FEAT", reply);
Expand Down Expand Up @@ -878,7 +877,7 @@ Ftp::Server::handleFeatReply(const HttpReply *reply, StoreIOBuffer)
}

void
Ftp::Server::handlePasvReply(const HttpReply *reply, StoreIOBuffer)
Ftp::Server::handlePasvReply(const HttpReplyPointer &reply, StoreIOBuffer)
{
const Http::StreamPointer context(pipeline.front());
assert(context != nullptr);
Expand Down Expand Up @@ -918,7 +917,7 @@ Ftp::Server::handlePasvReply(const HttpReply *reply, StoreIOBuffer)
}

void
Ftp::Server::handlePortReply(const HttpReply *reply, StoreIOBuffer)
Ftp::Server::handlePortReply(const HttpReplyPointer &reply, StoreIOBuffer)
{
if (pipeline.front()->http->request->error) {
writeCustomReply(502, "Server does not support PASV (converted from PORT)", reply);
Expand All @@ -931,7 +930,7 @@ Ftp::Server::handlePortReply(const HttpReply *reply, StoreIOBuffer)
}

void
Ftp::Server::handleErrorReply(const HttpReply *reply, StoreIOBuffer)
Ftp::Server::handleErrorReply(const HttpReplyPointer &reply, StoreIOBuffer)
{
if (!pinning.pinned) // we failed to connect to server
uri.clear();
Expand All @@ -940,9 +939,9 @@ Ftp::Server::handleErrorReply(const HttpReply *reply, StoreIOBuffer)
}

void
Ftp::Server::handleDataReply(const HttpReply *reply, StoreIOBuffer data)
Ftp::Server::handleDataReply(const HttpReplyPointer &reply, StoreIOBuffer data)
{
if (reply != nullptr && reply->sline.status() != Http::scOkay) {
if (reply && reply->sline.status() != Http::scOkay) {
writeForwardedReply(reply);
if (Comm::IsConnOpen(dataConn)) {
debugs(33, 3, "closing " << dataConn << " on KO reply");
Expand Down Expand Up @@ -1026,14 +1025,14 @@ Ftp::Server::replyDataWritingCheckpoint()
}

void
Ftp::Server::handleUploadReply(const HttpReply *reply, StoreIOBuffer)
Ftp::Server::handleUploadReply(const HttpReplyPointer &reply, StoreIOBuffer)
{
writeForwardedReply(reply);
// note that the client data connection may already be closed by now
}

void
Ftp::Server::writeForwardedReply(const HttpReply *reply)
Ftp::Server::writeForwardedReply(const HttpReplyPointer &reply)
{
Must(reply);

Expand All @@ -1056,7 +1055,7 @@ Ftp::Server::writeForwardedReply(const HttpReply *reply)
}

void
Ftp::Server::handleEprtReply(const HttpReply *reply, StoreIOBuffer)
Ftp::Server::handleEprtReply(const HttpReplyPointer &reply, StoreIOBuffer)
{
if (pipeline.front()->http->request->error) {
writeCustomReply(502, "Server does not support PASV (converted from EPRT)", reply);
Expand All @@ -1069,7 +1068,7 @@ Ftp::Server::handleEprtReply(const HttpReply *reply, StoreIOBuffer)
}

void
Ftp::Server::handleEpsvReply(const HttpReply *reply, StoreIOBuffer)
Ftp::Server::handleEpsvReply(const HttpReplyPointer &reply, StoreIOBuffer)
{
if (pipeline.front()->http->request->error) {
writeCustomReply(502, "Cannot connect to server", reply);
Expand All @@ -1092,7 +1091,7 @@ Ftp::Server::handleEpsvReply(const HttpReply *reply, StoreIOBuffer)

/// writes FTP error response with given status and reply-derived error details
void
Ftp::Server::writeErrorReply(const HttpReply *reply, const int scode)
Ftp::Server::writeErrorReply(const HttpReplyPointer &reply, const int scode)
{
const HttpRequest *request = pipeline.front()->http->request;
assert(request);
Expand Down Expand Up @@ -1140,7 +1139,7 @@ Ftp::Server::writeErrorReply(const HttpReply *reply, const int scode)
/// writes FTP response based on HTTP reply that is not an FTP-response wrapper
/// for example, internally-generated Squid "errorpages" end up here (for now)
void
Ftp::Server::writeForwardedForeign(const HttpReply *reply)
Ftp::Server::writeForwardedForeign(const HttpReplyPointer &reply)
{
changeState(fssConnected, "foreign reply");
closeDataConnection();
Expand All @@ -1149,7 +1148,7 @@ Ftp::Server::writeForwardedForeign(const HttpReply *reply)
}

bool
Ftp::Server::writeControlMsgAndCall(HttpReply *reply, AsyncCall::Pointer &call)
Ftp::Server::writeControlMsgAndCall(const HttpReplyPointer &reply, AsyncCall::Pointer &call)
{
// the caller guarantees that we are dealing with the current context only
// the caller should also make sure reply->header.has(Http::HdrType::FTP_STATUS)
Expand All @@ -1158,7 +1157,7 @@ Ftp::Server::writeControlMsgAndCall(HttpReply *reply, AsyncCall::Pointer &call)
}

void
Ftp::Server::writeForwardedReplyAndCall(const HttpReply *reply, AsyncCall::Pointer &call)
Ftp::Server::writeForwardedReplyAndCall(const HttpReplyPointer &reply, AsyncCall::Pointer &call)
{
assert(reply != nullptr);
const HttpHeader &header = reply->header;
Expand Down Expand Up @@ -1210,7 +1209,7 @@ Ftp::Server::writeForwardedReplyAndCall(const HttpReply *reply, AsyncCall::Point
}

static void
Ftp::PrintReply(MemBuf &mb, const HttpReply *reply, const char *const)
Ftp::PrintReply(MemBuf &mb, const HttpReplyPointer &reply, const char *const)
{
const HttpHeader &header = reply->header;

Expand Down
Loading

0 comments on commit 7516249

Please sign in to comment.