Skip to content

Commit

Permalink
Merge pull request #51 from bbockelm/streaming_io_v2
Browse files Browse the repository at this point in the history
Switch S3 writes to use streaming I/O
  • Loading branch information
jhiemstrawisc authored Dec 5, 2024
2 parents fe48042 + 4ca3895 commit 87a041b
Show file tree
Hide file tree
Showing 18 changed files with 1,254 additions and 295 deletions.
34 changes: 28 additions & 6 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ project( xrootd-http/s3 )

option( XROOTD_PLUGINS_BUILD_UNITTESTS "Build the scitokens-cpp unit tests" OFF )
option( XROOTD_PLUGINS_EXTERNAL_GTEST "Use an external/pre-installed copy of GTest" OFF )
option( VALGRIND "Run select unit tests under valgrind" OFF )

set( CMAKE_MODULE_PATH ${PROJECT_SOURCE_DIR}/cmake )
set( CMAKE_BUILD_TYPE Debug )
Expand All @@ -20,6 +21,10 @@ if(NOT XROOTD_PLUGIN_VERSION)
set(XROOTD_PLUGIN_VERSION ${XROOTD_PLUGIN_VERSION} CACHE INTERNAL "")
endif()

if(VALGRIND)
find_program(VALGRIND_BIN valgrind REQUIRED)
endif()

macro(use_cxx17)
if (CMAKE_VERSION VERSION_LESS "3.1")
if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
Expand Down Expand Up @@ -66,17 +71,29 @@ add_definitions( -D_FILE_OFFSET_BITS=64 )

include_directories(${XROOTD_INCLUDES} ${CURL_INCLUDE_DIRS} ${LIBCRYPTO_INCLUDE_DIRS})

add_library(XrdS3 SHARED src/CurlUtil.cc src/S3File.cc src/S3Directory.cc src/S3AccessInfo.cc src/S3FileSystem.cc src/AWSv4-impl.cc src/S3Commands.cc src/HTTPCommands.cc src/TokenFile.cc src/stl_string_utils.cc src/shortfile.cc src/logging.cc)
add_library(XrdHTTPServer SHARED src/CurlUtil.cc src/HTTPFile.cc src/HTTPFileSystem.cc src/HTTPCommands.cc src/TokenFile.cc src/stl_string_utils.cc src/shortfile.cc src/logging.cc)
# On Linux, we hide all the symbols for the final libraries, exposing only what's needed for the XRootD
# runtime loader. So here we create the object library and will create a separate one for testing with
# the symbols exposed.
add_library(XrdS3Obj OBJECT src/CurlUtil.cc src/S3File.cc src/S3Directory.cc src/S3AccessInfo.cc src/S3FileSystem.cc src/AWSv4-impl.cc src/S3Commands.cc src/HTTPCommands.cc src/TokenFile.cc src/stl_string_utils.cc src/shortfile.cc src/logging.cc)
target_include_directories(XrdS3Obj INTERFACE tinyxml2::tinyxml2)
set_target_properties(XrdS3Obj PROPERTIES POSITION_INDEPENDENT_CODE ON)
target_link_libraries(XrdS3Obj -ldl ${XROOTD_UTILS_LIB} ${XROOTD_SERVER_LIB} ${CURL_LIBRARIES} ${LIBCRYPTO_LIBRARIES} tinyxml2::tinyxml2 Threads::Threads)

add_library(XrdS3 MODULE "$<TARGET_OBJECTS:XrdS3Obj>")
target_link_libraries(XrdS3 XrdS3Obj)

target_link_libraries(XrdS3 -ldl ${XROOTD_UTILS_LIB} ${XROOTD_SERVER_LIB} ${CURL_LIBRARIES} ${LIBCRYPTO_LIBRARIES} tinyxml2::tinyxml2 Threads::Threads)
target_link_libraries(XrdHTTPServer -ldl ${XROOTD_UTILS_LIB} ${XROOTD_SERVER_LIB} ${CURL_LIBRARIES} ${LIBCRYPTO_LIBRARIES} Threads::Threads)
add_library(XrdHTTPServerObj OBJECT src/CurlUtil.cc src/HTTPFile.cc src/HTTPFileSystem.cc src/HTTPCommands.cc src/TokenFile.cc src/stl_string_utils.cc src/shortfile.cc src/logging.cc)
set_target_properties(XrdHTTPServerObj PROPERTIES POSITION_INDEPENDENT_CODE ON)
target_link_libraries(XrdHTTPServerObj -ldl ${XROOTD_UTILS_LIB} ${XROOTD_SERVER_LIB} ${CURL_LIBRARIES} ${LIBCRYPTO_LIBRARIES} Threads::Threads)

add_library(XrdHTTPServer MODULE "$<TARGET_OBJECTS:XrdHTTPServerObj>")
target_link_libraries(XrdHTTPServer XrdHTTPServerObj)

# The CMake documentation strongly advises against using these macros; instead, the pkg_check_modules
# is supposed to fill out the full path to ${LIBCRYPTO_LIBRARIES}. As of cmake 3.26.1, this does not
# appear to be the case on Mac OS X. Remove once no longer necessary!
target_link_directories(XrdS3 PUBLIC ${LIBCRYPTO_LIBRARY_DIRS})
target_link_directories(XrdHTTPServer PUBLIC ${LIBCRYPTO_LIBRARY_DIRS})
target_link_directories(XrdS3Obj PUBLIC ${LIBCRYPTO_LIBRARY_DIRS})
target_link_directories(XrdHTTPServerObj PUBLIC ${LIBCRYPTO_LIBRARY_DIRS})

if(NOT APPLE)
set_target_properties(XrdS3 PROPERTIES OUTPUT_NAME "XrdS3-${XROOTD_PLUGIN_VERSION}" SUFFIX ".so" LINK_FLAGS "-Wl,--version-script=${CMAKE_SOURCE_DIR}/configs/export-lib-symbols")
Expand All @@ -94,6 +111,11 @@ install(
)

if( XROOTD_PLUGINS_BUILD_UNITTESTS )
add_library(XrdS3Testing SHARED "$<TARGET_OBJECTS:XrdS3Obj>")
target_link_libraries(XrdS3Testing XrdS3Obj)
add_library(XrdHTTPServerTesting SHARED "$<TARGET_OBJECTS:XrdHTTPServerObj>")
target_link_libraries(XrdHTTPServerTesting XrdHTTPServerObj)

if( NOT XROOTD_PLUGINS_EXTERNAL_GTEST )
include(ExternalProject)
ExternalProject_Add(gtest
Expand Down
4 changes: 2 additions & 2 deletions src/AWSv4-impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ void convertMessageDigestToLowercaseHex(const unsigned char *messageDigest,
free(buffer);
}

bool doSha256(const std::string &payload, unsigned char *messageDigest,
bool doSha256(const std::string_view payload, unsigned char *messageDigest,
unsigned int *mdLength) {
EVP_MD_CTX *mdctx = EVP_MD_CTX_create();
if (mdctx == NULL) {
Expand All @@ -116,7 +116,7 @@ bool doSha256(const std::string &payload, unsigned char *messageDigest,
return false;
}

if (!EVP_DigestUpdate(mdctx, payload.c_str(), payload.length())) {
if (!EVP_DigestUpdate(mdctx, payload.data(), payload.length())) {
EVP_MD_CTX_destroy(mdctx);
return false;
}
Expand Down
3 changes: 2 additions & 1 deletion src/AWSv4-impl.hh
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include <map>
#include <string>
#include <string_view>

namespace AWSv4Impl {

Expand All @@ -34,7 +35,7 @@ void convertMessageDigestToLowercaseHex(const unsigned char *messageDigest,
unsigned int mdLength,
std::string &hexEncoded);

bool doSha256(const std::string &payload, unsigned char *messageDigest,
bool doSha256(const std::string_view payload, unsigned char *messageDigest,
unsigned int *mdLength);

bool createSignature(const std::string &secretAccessKey,
Expand Down
45 changes: 34 additions & 11 deletions src/CurlUtil.cc
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ void CurlWorker::Run() {
// is waiting on it is undefined behavior.
auto queue_ref = m_queue;
auto &queue = *queue_ref.get();
m_logger.Log(LogMask::Debug, "CurlWorker::Run", "Started a curl worker");
m_unpause_queue.reset(new HandlerQueue());
m_logger.Log(LogMask::Debug, "Run", "Started a curl worker");

CURLM *multi_handle = curl_multi_init();
if (multi_handle == nullptr) {
Expand All @@ -199,21 +200,40 @@ void CurlWorker::Run() {
CURLMcode mres = CURLM_OK;

std::vector<struct curl_waitfd> waitfds;
waitfds.resize(1);
waitfds.resize(2);
// The `curl_multi_wait` call in the event loop needs to be interrupted when
// additional work comes into one of the two queues (either the global queue
// or the per-worker unpause queue). To do this, the queue objects will
// write to a file descriptor when a new HTTP request is ready; we add these
// FDs to the list of FDs for libcurl to poll in order to trigger a wakeup.
// The `Consume`/`TryConsume` methods will have a side-effect of reading
// from the pipe if a request is available.
waitfds[0].fd = queue.PollFD();
waitfds[0].events = CURL_WAIT_POLLIN;
waitfds[0].revents = 0;
waitfds[1].fd = m_unpause_queue->PollFD();
waitfds[1].events = CURL_WAIT_POLLIN;
waitfds[1].revents = 0;

while (true) {
while (running_handles < static_cast<int>(m_max_ops)) {
auto op = m_unpause_queue->TryConsume();
if (!op) {
break;
}
op->ContinueHandle();
}
while (running_handles < static_cast<int>(m_max_ops)) {
auto op =
running_handles == 0 ? queue.Consume() : queue.TryConsume();
if (!op) {
break;
}
op->SetUnpauseQueue(m_unpause_queue);

auto curl = queue.GetHandle();
if (curl == nullptr) {
m_logger.Log(LogMask::Debug, "CurlWorker",
m_logger.Log(LogMask::Debug, "Run",
"Unable to allocate a curl handle");
op->Fail("E_NOMEM", "Unable to get allocate a curl handle");
continue;
Expand All @@ -223,10 +243,10 @@ void CurlWorker::Run() {
op->Fail(op->getErrorCode(), op->getErrorMessage());
}
} catch (...) {
m_logger.Log(LogMask::Debug, "CurlWorker",
"Unable to setup the curl handle");
m_logger.Log(LogMask::Debug, "Run",
"Unable to set up the curl handle");
op->Fail("E_NOMEM",
"Failed to setup the curl handle for the operation");
"Failed to set up the curl handle for the operation");
continue;
}
m_op_map[curl] = op;
Expand All @@ -236,8 +256,7 @@ void CurlWorker::Run() {
std::stringstream ss;
ss << "Unable to add operation to the curl multi-handle: "
<< curl_multi_strerror(mres);
m_logger.Log(LogMask::Debug, "CurlWorker",
ss.str().c_str());
m_logger.Log(LogMask::Debug, "Run", ss.str().c_str());
}
op->Fail("E_CURL_LIB",
"Unable to add operation to the curl multi-handle");
Expand All @@ -253,7 +272,7 @@ void CurlWorker::Run() {
if (m_logger.getMsgMask() & LogMask::Debug) {
std::stringstream ss;
ss << "Curl worker thread " << getpid() << " is running "
<< running_handles << "operations";
<< running_handles << " operations";
m_logger.Log(LogMask::Debug, "CurlWorker", ss.str().c_str());
}
last_marker = now;
Expand All @@ -277,7 +296,8 @@ void CurlWorker::Run() {
} else if (mres != CURLM_OK) {
if (m_logger.getMsgMask() & LogMask::Warning) {
std::stringstream ss;
ss << "Failed to perform multi-handle operation: " << mres;
ss << "Failed to perform multi-handle operation: "
<< curl_multi_strerror(mres);
m_logger.Log(LogMask::Warning, "CurlWorker", ss.str().c_str());
}
break;
Expand All @@ -298,17 +318,20 @@ void CurlWorker::Run() {
}
auto &op = iter->second;
auto res = msg->data.result;
m_logger.Log(LogMask::Dump, "Run",
"Processing result from curl");
op->ProcessCurlResult(iter->first, res);
op->ReleaseHandle(iter->first);
op->Notify();
running_handles -= 1;
curl_multi_remove_handle(multi_handle, iter->first);
if (res == CURLE_OK) {
// If the handle was successful, then we can recycle it.
queue.RecycleHandle(iter->first);
} else {
curl_easy_cleanup(iter->first);
m_op_map.erase(iter);
}
m_op_map.erase(iter);
}
} while (msg);
}
Expand Down
34 changes: 17 additions & 17 deletions src/CurlUtil.hh
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@

#include <condition_variable>
#include <deque>
#include <memory>
#include <mutex>
#include <unordered_map>
#include <vector>
#include <memory>

// Forward dec'ls
typedef void CURL;
Expand All @@ -43,25 +43,25 @@ CURL *GetHandle(bool verbose);
* multi-curl driver thread is based on polling FD's
*/
class HandlerQueue {
public:
HandlerQueue();
public:
HandlerQueue();

void Produce(HTTPRequest *handler);
void Produce(HTTPRequest *handler);

HTTPRequest *Consume();
HTTPRequest *TryConsume();
HTTPRequest *Consume();
HTTPRequest *TryConsume();

int PollFD() const {return m_read_fd;}
int PollFD() const { return m_read_fd; }

CURL *GetHandle();
void RecycleHandle(CURL *);
CURL *GetHandle();
void RecycleHandle(CURL *);

private:
std::deque<HTTPRequest*> m_ops;
thread_local static std::vector<CURL*> m_handles;
std::condition_variable m_cv;
std::mutex m_mutex;
const static unsigned m_max_pending_ops{20};
int m_read_fd{-1};
int m_write_fd{-1};
private:
std::deque<HTTPRequest *> m_ops;
thread_local static std::vector<CURL *> m_handles;
std::condition_variable m_cv;
std::mutex m_mutex;
const static unsigned m_max_pending_ops{20};
int m_read_fd{-1};
int m_write_fd{-1};
};
41 changes: 21 additions & 20 deletions src/CurlWorker.hh
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,25 @@ class HTTPRequest;
class HandlerQueue;

class CurlWorker {
public:
CurlWorker(std::shared_ptr<HandlerQueue> queue, XrdSysError &logger) :
m_queue(queue),
m_logger(logger)
{}

CurlWorker(const CurlWorker &) = delete;

void Run();
static void RunStatic(CurlWorker *myself);
static unsigned GetPollThreads() {return m_workers;}

private:
std::shared_ptr<HandlerQueue> m_queue;
std::unordered_map<CURL*, HTTPRequest *> m_op_map;
XrdSysError &m_logger;

const static unsigned m_workers{5};
const static unsigned m_max_ops{20};
const static unsigned m_marker_period{5};
public:
CurlWorker(std::shared_ptr<HandlerQueue> queue, XrdSysError &logger)
: m_queue(queue), m_logger(logger) {}

CurlWorker(const CurlWorker &) = delete;

void Run();
static void RunStatic(CurlWorker *myself);
static unsigned GetPollThreads() { return m_workers; }

private:
std::shared_ptr<HandlerQueue> m_queue;
std::shared_ptr<HandlerQueue>
m_unpause_queue; // Queue for notifications that a handle can be
// unpaused.
std::unordered_map<CURL *, HTTPRequest *> m_op_map;
XrdSysError &m_logger;

const static unsigned m_workers{5};
const static unsigned m_max_ops{20};
const static unsigned m_marker_period{5};
};
Loading

0 comments on commit 87a041b

Please sign in to comment.