From b92f13468eea5abab00a5dfa568935178a79a0d4 Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Mon, 28 Oct 2024 08:30:12 -0500 Subject: [PATCH 01/11] Add unit test coverage for S3 upload This provides some minimal test cases for testing the S3 write code. --- CMakeLists.txt | 34 +++- test/CMakeLists.txt | 54 +++--- test/s3-setup.sh | 8 +- test/s3_unit_tests.cc | 421 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 484 insertions(+), 33 deletions(-) create mode 100644 test/s3_unit_tests.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 6226b04..0b2deac 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -4,6 +4,7 @@ project( xrootd-http/s3 ) option( XROOTD_PLUGINS_BUILD_UNITTESTS "Build the scitokens-cpp unit tests" OFF ) option( XROOTD_PLUGINS_EXTERNAL_GTEST "Use an external/pre-installed copy of GTest" OFF ) +option( VALGRIND "Run select unit tests under valgrind" OFF ) set( CMAKE_MODULE_PATH ${PROJECT_SOURCE_DIR}/cmake ) set( CMAKE_BUILD_TYPE Debug ) @@ -20,6 +21,10 @@ if(NOT XROOTD_PLUGIN_VERSION) set(XROOTD_PLUGIN_VERSION ${XROOTD_PLUGIN_VERSION} CACHE INTERNAL "") endif() +if(VALGRIND) + find_program(VALGRIND_BIN valgrind REQUIRED) +endif() + macro(use_cxx17) if (CMAKE_VERSION VERSION_LESS "3.1") if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU") @@ -66,17 +71,29 @@ add_definitions( -D_FILE_OFFSET_BITS=64 ) include_directories(${XROOTD_INCLUDES} ${CURL_INCLUDE_DIRS} ${LIBCRYPTO_INCLUDE_DIRS}) -add_library(XrdS3 SHARED src/CurlUtil.cc src/S3File.cc src/S3Directory.cc src/S3AccessInfo.cc src/S3FileSystem.cc src/AWSv4-impl.cc src/S3Commands.cc src/HTTPCommands.cc src/TokenFile.cc src/stl_string_utils.cc src/shortfile.cc src/logging.cc) -add_library(XrdHTTPServer SHARED src/CurlUtil.cc src/HTTPFile.cc src/HTTPFileSystem.cc src/HTTPCommands.cc src/TokenFile.cc src/stl_string_utils.cc src/shortfile.cc src/logging.cc) +# On Linux, we hide all the symbols for the final libraries, exposing only what's needed for the XRootD +# runtime loader. So here we create the object library and will create a separate one for testing with +# the symbols exposed. +add_library(XrdS3Obj OBJECT src/CurlUtil.cc src/S3File.cc src/S3Directory.cc src/S3AccessInfo.cc src/S3FileSystem.cc src/AWSv4-impl.cc src/S3Commands.cc src/HTTPCommands.cc src/TokenFile.cc src/stl_string_utils.cc src/shortfile.cc src/logging.cc) +target_include_directories(XrdS3Obj INTERFACE tinyxml2::tinyxml2) +set_target_properties(XrdS3Obj PROPERTIES POSITION_INDEPENDENT_CODE ON) +target_link_libraries(XrdS3Obj -ldl ${XROOTD_UTILS_LIB} ${XROOTD_SERVER_LIB} ${CURL_LIBRARIES} ${LIBCRYPTO_LIBRARIES} tinyxml2::tinyxml2 Threads::Threads) + +add_library(XrdS3 MODULE "$") +target_link_libraries(XrdS3 XrdS3Obj) -target_link_libraries(XrdS3 -ldl ${XROOTD_UTILS_LIB} ${XROOTD_SERVER_LIB} ${CURL_LIBRARIES} ${LIBCRYPTO_LIBRARIES} tinyxml2::tinyxml2 Threads::Threads) -target_link_libraries(XrdHTTPServer -ldl ${XROOTD_UTILS_LIB} ${XROOTD_SERVER_LIB} ${CURL_LIBRARIES} ${LIBCRYPTO_LIBRARIES} Threads::Threads) +add_library(XrdHTTPServerObj OBJECT src/CurlUtil.cc src/HTTPFile.cc src/HTTPFileSystem.cc src/HTTPCommands.cc src/TokenFile.cc src/stl_string_utils.cc src/shortfile.cc src/logging.cc) +set_target_properties(XrdHTTPServerObj PROPERTIES POSITION_INDEPENDENT_CODE ON) +target_link_libraries(XrdHTTPServerObj -ldl ${XROOTD_UTILS_LIB} ${XROOTD_SERVER_LIB} ${CURL_LIBRARIES} ${LIBCRYPTO_LIBRARIES} Threads::Threads) + +add_library(XrdHTTPServer MODULE "$") +target_link_libraries(XrdHTTPServer XrdHTTPServerObj) # The CMake documentation strongly advises against using these macros; instead, the pkg_check_modules # is supposed to fill out the full path to ${LIBCRYPTO_LIBRARIES}. As of cmake 3.26.1, this does not # appear to be the case on Mac OS X. Remove once no longer necessary! -target_link_directories(XrdS3 PUBLIC ${LIBCRYPTO_LIBRARY_DIRS}) -target_link_directories(XrdHTTPServer PUBLIC ${LIBCRYPTO_LIBRARY_DIRS}) +target_link_directories(XrdS3Obj PUBLIC ${LIBCRYPTO_LIBRARY_DIRS}) +target_link_directories(XrdHTTPServerObj PUBLIC ${LIBCRYPTO_LIBRARY_DIRS}) if(NOT APPLE) set_target_properties(XrdS3 PROPERTIES OUTPUT_NAME "XrdS3-${XROOTD_PLUGIN_VERSION}" SUFFIX ".so" LINK_FLAGS "-Wl,--version-script=${CMAKE_SOURCE_DIR}/configs/export-lib-symbols") @@ -94,6 +111,11 @@ install( ) if( XROOTD_PLUGINS_BUILD_UNITTESTS ) + add_library(XrdS3Testing SHARED "$") + target_link_libraries(XrdS3Testing XrdS3Obj) + add_library(XrdHTTPServerTesting SHARED "$") + target_link_libraries(XrdHTTPServerTesting XrdHTTPServerObj) + if( NOT XROOTD_PLUGINS_EXTERNAL_GTEST ) include(ExternalProject) ExternalProject_Add(gtest diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 21d5c8a..49676d6 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -1,29 +1,10 @@ -add_executable( s3-gtest s3_tests.cc - ../src/AWSv4-impl.cc - ../src/CurlUtil.cc - ../src/logging.cc - ../src/S3AccessInfo.cc - ../src/S3Directory.cc - ../src/S3File.cc - ../src/S3FileSystem.cc - ../src/shortfile.cc - ../src/stl_string_utils.cc - ../src/TokenFile.cc - ../src/HTTPCommands.cc - ../src/S3Commands.cc -) +add_executable( s3-gtest s3_tests.cc ) -add_executable( http-gtest http_tests.cc - ../src/CurlUtil.cc - ../src/HTTPFile.cc - ../src/HTTPFileSystem.cc - ../src/HTTPCommands.cc - ../src/stl_string_utils.cc - ../src/TokenFile.cc - ../src/shortfile.cc - ../src/logging.cc -) +add_executable( s3-unit-test s3_unit_tests.cc ) +add_executable( http-gtest http_tests.cc ) + +include(GoogleTest) if(XROOTD_PLUGINS_EXTERNAL_GTEST) set(LIBGTEST "gtest") @@ -34,9 +15,16 @@ else() set(LIBGTEST "${CMAKE_BINARY_DIR}/external/gtest/src/gtest-build/lib/libgtest.a") endif() -target_link_libraries(s3-gtest XrdS3 "${LIBGTEST}" pthread) -target_link_libraries(http-gtest XrdHTTPServer "${LIBGTEST}" pthread) +target_link_libraries(s3-gtest XrdS3Testing "${LIBGTEST}" pthread) +target_link_libraries(s3-unit-test XrdS3Testing "${LIBGTEST}" pthread) +target_link_libraries(http-gtest XrdHTTPServerTesting "${LIBGTEST}" pthread) +gtest_add_tests(TARGET s3-unit-test TEST_LIST s3UnitTests) +set_tests_properties(${s3UnitTests} + PROPERTIES + FIXTURES_REQUIRED S3::s3_basic + ENVIRONMENT "ENV_FILE=${CMAKE_BINARY_DIR}/tests/s3_basic/setup.sh" +) add_test( NAME @@ -52,6 +40,20 @@ add_test( ${CMAKE_CURRENT_BINARY_DIR}/http-gtest "${CMAKE_BINARY_DIR}/tests/basic/setup.sh" ) +if (VALGRIND) + add_test( + NAME + valgrind-s3 + COMMAND + ${VALGRIND_BIN} ${CMAKE_CURRENT_BINARY_DIR}/s3-unit-test -R FileSystemS3Fixture.UploadLargePartAligned + ) + + set_tests_properties(valgrind-s3 + PROPERTIES + FIXTURES_REQUIRED S3::s3_basic + ) +endif() + set_tests_properties(http-unit PROPERTIES FIXTURES_REQUIRED HTTP::basic diff --git a/test/s3-setup.sh b/test/s3-setup.sh index 1665ad2..dd8e3c8 100755 --- a/test/s3-setup.sh +++ b/test/s3-setup.sh @@ -150,6 +150,8 @@ export MINIO_ROOT_USER=minioadmin export MINIO_ROOT_PASSWORD=QXDEiQxQw8qY MINIO_USER=miniouser MINIO_PASSWORD=2Z303QCzRI7s +printf "%s" "$MINIO_USER" > "$RUNDIR/access_key" +printf "%s" "$MINIO_PASSWORD" > "$RUNDIR/secret_key" # Launch minio "$MINIO_BIN" --certs-dir "$MINIO_CERTSDIR" server --address "$(hostname):0" "$MINIO_DATADIR" 0<&- >"$BINARY_DIR/tests/$TEST_NAME/server.log" 2>&1 & @@ -176,6 +178,8 @@ echo "Minio API server started on $MINIO_URL" cat > "$BINARY_DIR/tests/$TEST_NAME/setup.sh" < "$RUNDIR/hello_world.txt" #### export XROOTD_CONFIG="$XROOTD_CONFIGDIR/xrootd.cfg" +BUCKET_NAME=test-bucket cat > "$XROOTD_CONFIG" <> "$BINARY_DIR/tests/$TEST_NAME/setup.sh" < +#include +#include +#include + +#include +#include + +std::once_flag g_init_once; +std::string g_ca_file; +std::string g_minio_url; +std::string g_bucket_name; +std::string g_access_key_file; +std::string g_secret_key_file; + +void parseEnvFile(const std::string &fname) { + std::ifstream fh(fname); + if (!fh.is_open()) { + std::cerr << "Failed to open env file: " << strerror(errno); + exit(1); + } + std::string line; + while (std::getline(fh, line)) { + auto idx = line.find("="); + if (idx == std::string::npos) { + continue; + } + auto key = line.substr(0, idx); + auto val = line.substr(idx + 1); + if (key == "X509_CA_FILE") { + g_ca_file = val; + setenv("X509_CERT_FILE", g_ca_file.c_str(), 1); + } else if (key == "MINIO_URL") { + g_minio_url = val; + } else if (key == "BUCKET_NAME") { + g_bucket_name = val; + } else if (key == "ACCESS_KEY_FILE") { + g_access_key_file = val; + } else if (key == "SECRET_KEY_FILE") { + g_secret_key_file = val; + } + } +} + +class TestAmazonRequest : public AmazonRequest { + public: + XrdSysLogger log{}; + XrdSysError err{&log, "TestS3CommandsLog"}; + + TestAmazonRequest(const std::string &url, const std::string &akf, + const std::string &skf, const std::string &bucket, + const std::string &object, const std::string &path, + int sigVersion) + : AmazonRequest(url, akf, skf, bucket, object, path, sigVersion, err) {} + + // For getting access to otherwise-protected members + std::string getHostUrl() const { return hostUrl; } +}; + +TEST(TestS3URLGeneration, Test1) { + const std::string serviceUrl = "https://s3-service.com:443"; + const std::string b = "test-bucket"; + const std::string o = "test-object"; + + // Test path-style URL generation + TestAmazonRequest pathReq{serviceUrl, "akf", "skf", b, o, "path", 4}; + std::string generatedHostUrl = pathReq.getHostUrl(); + ASSERT_EQ(generatedHostUrl, + "https://s3-service.com:443/test-bucket/test-object"); + + // Test virtual-style URL generation + TestAmazonRequest virtReq{serviceUrl, "akf", "skf", b, o, "virtual", 4}; + generatedHostUrl = virtReq.getHostUrl(); + ASSERT_EQ(generatedHostUrl, + "https://test-bucket.s3-service.com:443/test-object"); + + // Test path-style with empty bucket (which we use for exporting an entire + // endpoint) + TestAmazonRequest pathReqNoBucket{serviceUrl, "akf", "skf", "", + o, "path", 4}; + generatedHostUrl = pathReqNoBucket.getHostUrl(); + ASSERT_EQ(generatedHostUrl, "https://s3-service.com:443/test-object"); +} + +class FileSystemFixtureBase : public testing::Test { + protected: + FileSystemFixtureBase() + : m_log(new XrdSysLogger(2, 0)) // Log to stderr, no log rotation + {} + + void SetUp() override { + + setenv("XRDINSTANCE", "xrootd", 1); + char tmp_configfn[] = "/tmp/xrootd-s3-gtest.cfg.XXXXXX"; + auto result = mkstemp(tmp_configfn); + ASSERT_NE(result, -1) << "Failed to create temp file (" + << strerror(errno) << ", errno=" << errno << ")"; + m_configfn = std::string(tmp_configfn); + + auto contents = GetConfig(); + ASSERT_FALSE(contents.empty()); + ASSERT_TRUE(writeShortFile(m_configfn, contents, 0)) + << "Failed to write to temp file (" << strerror(errno) + << ", errno=" << errno << ")"; + } + + void TearDown() override { + if (!m_configfn.empty()) { + auto rv = unlink(m_configfn.c_str()); + ASSERT_EQ(rv, 0) << "Failed to delete temp file (" + << strerror(errno) << ", errno=" << errno << ")"; + } + } + + virtual std::string GetConfig() = 0; + + std::string m_configfn; + std::unique_ptr m_log; +}; + +class FileSystemS3VirtualBucket : public FileSystemFixtureBase { + protected: + virtual std::string GetConfig() override { + return R"( +s3.begin +s3.path_name /test +s3.bucket_name genome-browser +s3.service_name s3.amazonaws.com +s3.region us-east-1 +s3.service_url https://s3.us-east-1.amazonaws.com +s3.url_style virtual +s3.end +)"; + } +}; + +class FileSystemS3VirtualNoBucket : public FileSystemFixtureBase { + protected: + virtual std::string GetConfig() override { + return R"( +s3.begin +s3.path_name /test +s3.service_name s3.amazonaws.com +s3.region us-east-1 +s3.service_url https://s3.us-east-1.amazonaws.com +s3.url_style virtual +s3.end +)"; + } +}; + +class FileSystemS3PathBucket : public FileSystemFixtureBase { + protected: + virtual std::string GetConfig() override { + return R"( +s3.begin +s3.path_name /test +s3.service_name s3.amazonaws.com +s3.region us-east-1 +s3.bucket_name genome-browser +s3.service_url https://s3.us-east-1.amazonaws.com +s3.url_style path +s3.end +)"; + } +}; + +class FileSystemS3PathNoBucket : public FileSystemFixtureBase { + protected: + virtual std::string GetConfig() override { + return R"( +s3.begin +s3.path_name /test +s3.service_name s3.amazonaws.com +s3.region us-east-1 +s3.service_url https://s3.us-east-1.amazonaws.com +s3.url_style path +s3.end +)"; + } +}; + +// Regression test for when the service_url ends in a `/` +class FileSystemS3PathBucketSlash : public FileSystemFixtureBase { + protected: + virtual std::string GetConfig() override { + return R"( +s3.begin +s3.path_name /test +s3.service_name s3.amazonaws.com +s3.region us-east-1 +s3.bucket_name genome-browser +s3.service_url https://s3.us-east-1.amazonaws.com/ +s3.url_style path +s3.end +)"; + } +}; + +TEST_F(FileSystemS3VirtualBucket, Create) { + EXPECT_NO_THROW( + { S3FileSystem fs(m_log.get(), m_configfn.c_str(), nullptr); }); +} + +TEST_F(FileSystemS3VirtualNoBucket, Create) { + EXPECT_NO_THROW( + { S3FileSystem fs(m_log.get(), m_configfn.c_str(), nullptr); }); +} + +TEST_F(FileSystemS3PathBucket, Create) { + EXPECT_NO_THROW( + { S3FileSystem fs(m_log.get(), m_configfn.c_str(), nullptr); }); +} + +TEST_F(FileSystemS3PathNoBucket, Create) { + EXPECT_NO_THROW( + { S3FileSystem fs(m_log.get(), m_configfn.c_str(), nullptr); }); +} + +TEST_F(FileSystemS3PathBucketSlash, Create) { + EXPECT_NO_THROW( + { S3FileSystem fs(m_log.get(), m_configfn.c_str(), nullptr); }); +} + +// Tests where we query S3 test fixture +class FileSystemS3Fixture : public FileSystemFixtureBase { + void SetUp() override { + std::call_once(g_init_once, [&] { + char *env_file = getenv("ENV_FILE"); + ASSERT_NE(env_file, nullptr) << "$ENV_FILE environment variable " + "not set; required to run test"; + parseEnvFile(env_file); + + auto logger = new XrdSysLogger(2, 0); + auto log = new XrdSysError(logger, "curl_"); + AmazonRequest::Init(*log); + }); + + FileSystemFixtureBase::SetUp(); + } + + virtual std::string GetConfig() override { + return R"( +xrd.tlsca certfile )" + + g_ca_file + R"( +#s3.trace all dump +s3.trace all +s3.begin +s3.path_name /test +s3.access_key_file )" + + g_access_key_file + R"( +s3.secret_key_file )" + + g_secret_key_file + R"( +s3.service_name s3.example.com +s3.region us-east-1 +s3.bucket_name )" + + g_bucket_name + R"( +s3.service_url )" + + g_minio_url + R"( +s3.url_style path +s3.end + )"; + } + + public: + void WritePattern(const std::string &name, const off_t writeSize, + const char chunkByte, const size_t chunkSize) { + XrdSysLogger log; + S3FileSystem fs(&log, m_configfn.c_str(), nullptr); + + std::unique_ptr fh(fs.newFile()); + ASSERT_TRUE(fh); + + XrdOucEnv env; + env.Put("oss.asize", std::to_string(writeSize).c_str()); + auto rv = fh->Open(name.c_str(), O_CREAT | O_WRONLY, 0755, env); + ASSERT_EQ(rv, 0); + + size_t sizeToWrite = (chunkSize >= writeSize) ? writeSize : chunkSize; + off_t curWriteSize = writeSize; + char curChunkByte = chunkByte; + off_t offset = 0; + while (sizeToWrite) { + std::string writeBuffer(sizeToWrite, curChunkByte); + + std::cerr << "Writing bytes at offset: " << offset << std::endl; + rv = fh->Write(writeBuffer.data(), offset, sizeToWrite); + ASSERT_EQ(rv, sizeToWrite); + + curWriteSize -= rv; + offset += rv; + sizeToWrite = + (chunkSize >= curWriteSize) ? curWriteSize : chunkSize; + curChunkByte += 1; + } + + rv = fh->Close(); + ASSERT_EQ(rv, 0); + + VerifyContents(fs, name, writeSize, chunkByte, chunkSize); + } + + private: + void VerifyContents(S3FileSystem &fs, const std::string &obj, + off_t expectedSize, char chunkByte, size_t chunkSize) { + std::unique_ptr fh(fs.newFile()); + ASSERT_TRUE(fh); + + XrdOucEnv env; + auto rv = fh->Open(obj.c_str(), O_RDONLY, 0, env); + ASSERT_EQ(rv, 0); + + size_t sizeToRead = + (chunkSize >= expectedSize) ? expectedSize : chunkSize; + char curChunkByte = chunkByte; + off_t offset = 0; + while (sizeToRead) { + std::string readBuffer(sizeToRead, curChunkByte - 1); + rv = fh->Read(readBuffer.data(), offset, sizeToRead); + ASSERT_EQ(rv, sizeToRead); + readBuffer.resize(rv); + + std::string correctBuffer(sizeToRead, curChunkByte); + ASSERT_EQ(readBuffer, correctBuffer); + + expectedSize -= rv; + offset += rv; + sizeToRead = (chunkSize >= expectedSize) ? expectedSize : chunkSize; + curChunkByte += 1; + } + + rv = fh->Close(); + ASSERT_EQ(rv, 0); + } +}; + +// Upload a single byte into S3 +TEST_F(FileSystemS3Fixture, UploadOneByte) { + WritePattern("/test/write_one.txt", 1, 'X', 32 * 1024); +} + +// Upload across multiple calls, single part +TEST_F(FileSystemS3Fixture, UploadMultipleCalls) { + WritePattern("/test/write_alphabet.txt", 26, 'a', 1); +} + +// Upload a zero-byte object +TEST_F(FileSystemS3Fixture, UploadZero) { + WritePattern("/test/write_zero.txt", 0, 'X', 32 * 1024); +} + +// Upload larger - a few chunks. +TEST_F(FileSystemS3Fixture, UploadMultipleChunks) { + WritePattern("/test/write_multi_chunks.txt", (10'000 / 1'024) * 1'024 + 42, + 'a', 1'024); +} + +// Upload across multiple parts, not aligned to partition. +TEST_F(FileSystemS3Fixture, UploadLarge) { + WritePattern("/test/write_large_1.txt", + (100'000'000 / 1'310'720) * 1'310'720 + 42, 'a', 1'310'720); +} + +// Upload a file into S3 that's the same size as the partition size +TEST_F(FileSystemS3Fixture, UploadLargePart) { + WritePattern("/test/write_large_2.txt", 100'000'000, 'a', 131'072); +} + +// Upload a small file where the partition size is aligned with the chunk size +TEST_F(FileSystemS3Fixture, UploadSmallAligned) { + WritePattern("/test/write_large_3.txt", 1'000, 'a', 1'000); +} + +// Upload a file into S3 that's the same size as the partition size, using +// chunks that align with the partition size +TEST_F(FileSystemS3Fixture, UploadLargePartAligned) { + WritePattern("/test/write_large_4.txt", 100'000'000, 'a', 1'000'000); +} + +// Upload a file into S3 resulting in multiple partitions +TEST_F(FileSystemS3Fixture, UploadMultiPartAligned) { + WritePattern("/test/write_large_5.txt", 100'000'000, 'a', 1'000'000); +} + +// Upload a file into S3 resulting in multiple partitioned using not-aligned +// chunks +TEST_F(FileSystemS3Fixture, UploadMultiPartUnaligned) { + WritePattern("/test/write_large_1.txt", 100'000'000, 'a', 32'768); +} + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} From 538d4784ac5083056031982c20036d83a952834e Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Mon, 28 Oct 2024 08:47:38 -0500 Subject: [PATCH 02/11] Minor logging fixups Most significantly, switches the debug/dump of the libcurl interaction to use the XRootD logging framework instead of printing right to stderr. --- src/CurlUtil.cc | 13 +++--- src/HTTPCommands.cc | 105 +++++++++++++++++++++++++++++++------------- src/logging.cc | 2 +- 3 files changed, 83 insertions(+), 37 deletions(-) diff --git a/src/CurlUtil.cc b/src/CurlUtil.cc index 9e23265..0f1d4a8 100644 --- a/src/CurlUtil.cc +++ b/src/CurlUtil.cc @@ -187,7 +187,7 @@ void CurlWorker::Run() { // is waiting on it is undefined behavior. auto queue_ref = m_queue; auto &queue = *queue_ref.get(); - m_logger.Log(LogMask::Debug, "CurlWorker::Run", "Started a curl worker"); + m_logger.Log(LogMask::Debug, "Run", "Started a curl worker"); CURLM *multi_handle = curl_multi_init(); if (multi_handle == nullptr) { @@ -213,7 +213,7 @@ void CurlWorker::Run() { } auto curl = queue.GetHandle(); if (curl == nullptr) { - m_logger.Log(LogMask::Debug, "CurlWorker", + m_logger.Log(LogMask::Debug, "Run", "Unable to allocate a curl handle"); op->Fail("E_NOMEM", "Unable to get allocate a curl handle"); continue; @@ -223,7 +223,7 @@ void CurlWorker::Run() { op->Fail(op->getErrorCode(), op->getErrorMessage()); } } catch (...) { - m_logger.Log(LogMask::Debug, "CurlWorker", + m_logger.Log(LogMask::Debug, "Run", "Unable to setup the curl handle"); op->Fail("E_NOMEM", "Failed to setup the curl handle for the operation"); @@ -236,8 +236,7 @@ void CurlWorker::Run() { std::stringstream ss; ss << "Unable to add operation to the curl multi-handle: " << curl_multi_strerror(mres); - m_logger.Log(LogMask::Debug, "CurlWorker", - ss.str().c_str()); + m_logger.Log(LogMask::Debug, "Run", ss.str().c_str()); } op->Fail("E_CURL_LIB", "Unable to add operation to the curl multi-handle"); @@ -253,7 +252,7 @@ void CurlWorker::Run() { if (m_logger.getMsgMask() & LogMask::Debug) { std::stringstream ss; ss << "Curl worker thread " << getpid() << " is running " - << running_handles << "operations"; + << running_handles << " operations"; m_logger.Log(LogMask::Debug, "CurlWorker", ss.str().c_str()); } last_marker = now; @@ -298,6 +297,8 @@ void CurlWorker::Run() { } auto &op = iter->second; auto res = msg->data.result; + m_logger.Log(LogMask::Dump, "Run", + "Processing result from curl"); op->ProcessCurlResult(iter->first, res); op->ReleaseHandle(iter->first); running_handles -= 1; diff --git a/src/HTTPCommands.cc b/src/HTTPCommands.cc index de6a6fa..df3dbf0 100644 --- a/src/HTTPCommands.cc +++ b/src/HTTPCommands.cc @@ -115,61 +115,72 @@ bool HTTPRequest::SendHTTPRequest(const std::string &payload) { return sendPreparedRequest(hostUrl, payload); } -static void dump(const char *text, FILE *stream, unsigned char *ptr, +static void dump(XrdSysError *log, const char *text, unsigned char *ptr, size_t size) { size_t i; size_t c; unsigned int width = 0x10; + if (!log) + return; - fprintf(stream, "%s, %10.10ld bytes (0x%8.8lx)\n", text, (long)size, - (long)size); + std::stringstream ss; + std::string stream; + formatstr(stream, "%s, %10.10ld bytes (0x%8.8lx)\n", text, (long)size, + (long)size); + ss << stream; for (i = 0; i < size; i += width) { - fprintf(stream, "%4.4lx: ", (long)i); + formatstr(stream, "%4.4lx: ", (long)i); + ss << stream; /* show hex to the left */ for (c = 0; c < width; c++) { - if (i + c < size) - fprintf(stream, "%02x ", ptr[i + c]); - else - fputs(" ", stream); + if (i + c < size) { + formatstr(stream, "%02x ", ptr[i + c]); + ss << stream; + } else { + ss << " "; + } } /* show data on the right */ for (c = 0; (c < width) && (i + c < size); c++) { char x = (ptr[i + c] >= 0x20 && ptr[i + c] < 0x80) ? ptr[i + c] : '.'; - fputc(x, stream); + ss << x; } - - fputc('\n', stream); /* newline */ + ss << std::endl; } + log->Log(LogMask::Dump, "Curl", ss.str().c_str()); } -static void dump_plain(const char *text, FILE *stream, unsigned char *ptr, - size_t size) { - fprintf(stream, "%s, %10.10ld bytes (0x%8.8lx)\n", text, (long)size, - (long)size); - fwrite(ptr, 1, size, stream); - fputs("\n", stream); +static void dumpPlain(XrdSysError *log, const char *text, unsigned char *ptr, + size_t size) { + if (!log) + return; + std::string info; + formatstr(info, "%s, %10.10ld bytes (0x%8.8lx)\n", text, (long)size, + (long)size); + log->Log(LogMask::Dump, "Curl", info.c_str()); } int debugCallback(CURL *handle, curl_infotype ci, char *data, size_t size, void *clientp) { const char *text; (void)handle; /* prevent compiler warning */ - (void)clientp; + auto log = static_cast(clientp); + if (!log) + return 0; switch (ci) { case CURLINFO_TEXT: - fputs("== Info: ", stderr); - fwrite(data, size, 1, stderr); + log->Log(LogMask::Dump, "CurlInfo", std::string(data, size).c_str()); default: /* in case a new one is introduced to shock us */ return 0; case CURLINFO_HEADER_OUT: text = "=> Send header"; - dump_plain(text, stderr, (unsigned char *)data, size); + dumpPlain(log, text, (unsigned char *)data, size); break; } return 0; @@ -179,18 +190,25 @@ int debugAndDumpCallback(CURL *handle, curl_infotype ci, char *data, size_t size, void *clientp) { const char *text; (void)handle; /* prevent compiler warning */ - (void)clientp; + auto log = reinterpret_cast(clientp); + if (!log) + return 0; + std::stringstream ss; switch (ci) { case CURLINFO_TEXT: - fputs("== Info: ", stderr); - fwrite(data, size, 1, stderr); + if (size && data[size - 1] == '\n') { + ss << std::string(data, size - 1); + } else { + ss << std::string(data, size); + } + log->Log(LogMask::Dump, "CurlInfo", ss.str().c_str()); default: /* in case a new one is introduced to shock us */ return 0; case CURLINFO_HEADER_OUT: text = "=> Send header"; - dump_plain(text, stderr, (unsigned char *)data, size); + dumpPlain(log, text, (unsigned char *)data, size); break; case CURLINFO_DATA_OUT: text = "=> Send data"; @@ -208,7 +226,7 @@ int debugAndDumpCallback(CURL *handle, curl_infotype ci, char *data, text = "<= Recv SSL data"; break; } - dump(text, stderr, (unsigned char *)data, size); + dump(log, text, (unsigned char *)data, size); return 0; } @@ -467,18 +485,45 @@ bool HTTPRequest::SetupHandle(CURL *curl) { rv = curl_easy_setopt(curl, CURLOPT_HTTPHEADER, m_header_list.get()); if (rv != CURLE_OK) { - this->errorCode = "E_CURL_LIB"; - this->errorMessage = "curl_easy_setopt( CURLOPT_HTTPHEADER ) failed."; + errorCode = "E_CURL_LIB"; + errorMessage = "curl_easy_setopt( CURLOPT_HTTPHEADER ) failed."; return false; } if (m_log.getMsgMask() & LogMask::Debug) { rv = curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, debugCallback); - rv = curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L); + if (rv != CURLE_OK) { + errorCode = "E_CURL_LIB"; + errorMessage = "Failed to set the debug function"; + return false; + } + rv = curl_easy_setopt(curl, CURLOPT_DEBUGDATA, &m_log); + if (rv != CURLE_OK) { + errorCode = "E_CURL_LIB"; + errorMessage = "Failed to set the debug function handler data"; + return false; + } } if (m_log.getMsgMask() & LogMask::Dump) { rv = curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, debugAndDumpCallback); - rv = curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L); + if (rv != CURLE_OK) { + errorCode = "E_CURL_LIB"; + errorMessage = "Failed to set the debug function"; + return false; + } + rv = curl_easy_setopt(curl, CURLOPT_DEBUGDATA, &m_log); + if (rv != CURLE_OK) { + errorCode = "E_CURL_LIB"; + errorMessage = "Failed to set the debug function handler data"; + return false; + } + } + if (m_log.getMsgMask() & (LogMask::Dump | LogMask::Debug)) { + if (curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L) != CURLE_OK) { + errorCode = "E_CURL_LIB"; + errorMessage = "Failed to enable verbose mode for libcurl"; + return false; + } } return true; diff --git a/src/logging.cc b/src/logging.cc index db1492b..6a398c7 100644 --- a/src/logging.cc +++ b/src/logging.cc @@ -37,7 +37,7 @@ std::string XrdHTTPServer::LogMaskToString(int mask) { has_entry = true; } if (mask & LogMask::Debug) { - ss << "debug"; + ss << (has_entry ? ", " : "") << "debug"; has_entry = true; } if (mask & LogMask::Info) { From e3f0a240717f8484e2a5b3f9dde3583d0bd6ff12 Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Mon, 28 Oct 2024 08:56:46 -0500 Subject: [PATCH 03/11] Fixups: Linux compile fixes for s3 tests --- test/s3_unit_tests.cc | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/test/s3_unit_tests.cc b/test/s3_unit_tests.cc index 30d106d..9ab72c0 100644 --- a/test/s3_unit_tests.cc +++ b/test/s3_unit_tests.cc @@ -30,6 +30,7 @@ #include #include +#include #include #include @@ -303,7 +304,9 @@ s3.end auto rv = fh->Open(name.c_str(), O_CREAT | O_WRONLY, 0755, env); ASSERT_EQ(rv, 0); - size_t sizeToWrite = (chunkSize >= writeSize) ? writeSize : chunkSize; + size_t sizeToWrite = (static_cast(chunkSize) >= writeSize) + ? static_cast(writeSize) + : chunkSize; off_t curWriteSize = writeSize; char curChunkByte = chunkByte; off_t offset = 0; @@ -312,12 +315,13 @@ s3.end std::cerr << "Writing bytes at offset: " << offset << std::endl; rv = fh->Write(writeBuffer.data(), offset, sizeToWrite); - ASSERT_EQ(rv, sizeToWrite); + ASSERT_EQ(rv, static_cast(sizeToWrite)); curWriteSize -= rv; offset += rv; - sizeToWrite = - (chunkSize >= curWriteSize) ? curWriteSize : chunkSize; + sizeToWrite = (static_cast(chunkSize) >= curWriteSize) + ? static_cast(curWriteSize) + : chunkSize; curChunkByte += 1; } @@ -337,14 +341,15 @@ s3.end auto rv = fh->Open(obj.c_str(), O_RDONLY, 0, env); ASSERT_EQ(rv, 0); - size_t sizeToRead = - (chunkSize >= expectedSize) ? expectedSize : chunkSize; + size_t sizeToRead = (static_cast(chunkSize) >= expectedSize) + ? expectedSize + : chunkSize; char curChunkByte = chunkByte; off_t offset = 0; while (sizeToRead) { std::string readBuffer(sizeToRead, curChunkByte - 1); rv = fh->Read(readBuffer.data(), offset, sizeToRead); - ASSERT_EQ(rv, sizeToRead); + ASSERT_EQ(rv, static_cast(sizeToRead)); readBuffer.resize(rv); std::string correctBuffer(sizeToRead, curChunkByte); @@ -352,7 +357,9 @@ s3.end expectedSize -= rv; offset += rv; - sizeToRead = (chunkSize >= expectedSize) ? expectedSize : chunkSize; + sizeToRead = (static_cast(chunkSize) >= expectedSize) + ? expectedSize + : chunkSize; curChunkByte += 1; } From 46b5b434dec06816b7e9080727304806ceca2b00 Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Mon, 28 Oct 2024 09:36:43 -0500 Subject: [PATCH 04/11] Allow writes to stream to S3, not buffer This refactors the request logic to allow requests to be continued over multiple calls. The calling thread will regain control when the buffer has been completely consumed (even if the full operation will require additional buffers). Note this only works if the client provides the full file size. --- src/AWSv4-impl.cc | 4 +- src/AWSv4-impl.hh | 3 +- src/CurlUtil.cc | 6 ++ src/HTTPCommands.cc | 85 +++++++++++++++++++------ src/HTTPCommands.hh | 59 +++++++++++++++--- src/S3Commands.cc | 99 +++++++++++++++-------------- src/S3Commands.hh | 65 +++++++++++++++----- src/S3File.cc | 147 +++++++++++++++++++++++++++++++++----------- src/S3File.hh | 20 +++++- 9 files changed, 361 insertions(+), 127 deletions(-) diff --git a/src/AWSv4-impl.cc b/src/AWSv4-impl.cc index 1306742..c7d5d7a 100644 --- a/src/AWSv4-impl.cc +++ b/src/AWSv4-impl.cc @@ -104,7 +104,7 @@ void convertMessageDigestToLowercaseHex(const unsigned char *messageDigest, free(buffer); } -bool doSha256(const std::string &payload, unsigned char *messageDigest, +bool doSha256(const std::string_view payload, unsigned char *messageDigest, unsigned int *mdLength) { EVP_MD_CTX *mdctx = EVP_MD_CTX_create(); if (mdctx == NULL) { @@ -116,7 +116,7 @@ bool doSha256(const std::string &payload, unsigned char *messageDigest, return false; } - if (!EVP_DigestUpdate(mdctx, payload.c_str(), payload.length())) { + if (!EVP_DigestUpdate(mdctx, payload.data(), payload.length())) { EVP_MD_CTX_destroy(mdctx); return false; } diff --git a/src/AWSv4-impl.hh b/src/AWSv4-impl.hh index 8f17e2f..7cd0b84 100644 --- a/src/AWSv4-impl.hh +++ b/src/AWSv4-impl.hh @@ -20,6 +20,7 @@ #include #include +#include namespace AWSv4Impl { @@ -34,7 +35,7 @@ void convertMessageDigestToLowercaseHex(const unsigned char *messageDigest, unsigned int mdLength, std::string &hexEncoded); -bool doSha256(const std::string &payload, unsigned char *messageDigest, +bool doSha256(const std::string_view payload, unsigned char *messageDigest, unsigned int *mdLength); bool createSignature(const std::string &secretAccessKey, diff --git a/src/CurlUtil.cc b/src/CurlUtil.cc index 0f1d4a8..2b4cc05 100644 --- a/src/CurlUtil.cc +++ b/src/CurlUtil.cc @@ -211,6 +211,12 @@ void CurlWorker::Run() { if (!op) { break; } + if (op->inProgress()) { + op->ContinueHandle(); + continue; + } + op->SetInProgress(true); + auto curl = queue.GetHandle(); if (curl == nullptr) { m_logger.Log(LogMask::Debug, "Run", diff --git a/src/HTTPCommands.cc b/src/HTTPCommands.cc index df3dbf0..fdcaa70 100644 --- a/src/HTTPCommands.cc +++ b/src/HTTPCommands.cc @@ -105,14 +105,8 @@ bool HTTPRequest::SendHTTPRequest(const std::string &payload) { } headers["Content-Type"] = "binary/octet-stream"; - std::string contentLength; - formatstr(contentLength, "%zu", payload.size()); - headers["Content-Length"] = contentLength; - // Another undocumented CURL feature: transfer-encoding is "chunked" - // by default for "PUT", which we really don't want. - headers["Transfer-Encoding"] = ""; - - return sendPreparedRequest(hostUrl, payload); + + return sendPreparedRequest(hostUrl, payload, payload.size(), true); } static void dump(XrdSysError *log, const char *text, unsigned char *ptr, @@ -230,6 +224,8 @@ int debugAndDumpCallback(CURL *handle, curl_infotype ci, char *data, return 0; } +void HTTPRequest::Payload::NotifyPaused() { m_parent.Notify(); } + // A callback function that gets passed to curl_easy_setopt for reading data // from the payload size_t read_callback(char *buffer, size_t size, size_t n, void *v) { @@ -239,31 +235,54 @@ size_t read_callback(char *buffer, size_t size, size_t n, void *v) { // been sent. HTTPRequest::Payload *payload = (HTTPRequest::Payload *)v; - if (payload->sentSoFar == payload->data->size()) { + if (payload->sentSoFar == static_cast(payload->data.size())) { payload->sentSoFar = 0; - return 0; + if (payload->final) { + return 0; + } else { + payload->NotifyPaused(); + return CURL_READFUNC_PAUSE; + } } size_t request = size * n; - if (request > payload->data->size()) { - request = payload->data->size(); + if (request > payload->data.size()) { + request = payload->data.size(); } - if (payload->sentSoFar + request > payload->data->size()) { - request = payload->data->size() - payload->sentSoFar; + if (payload->sentSoFar + request > payload->data.size()) { + request = payload->data.size() - payload->sentSoFar; } - memcpy(buffer, payload->data->data() + payload->sentSoFar, request); + memcpy(buffer, payload->data.data() + payload->sentSoFar, request); payload->sentSoFar += request; return request; } bool HTTPRequest::sendPreparedRequest(const std::string &uri, - const std::string &payload) { + const std::string_view payload, + off_t payload_size, bool final) { m_uri = uri; m_payload = payload; + m_payload_size = payload_size; + if (!m_is_streaming && !final) { + m_is_streaming = true; + } + m_final = final; + // Detect whether we were given an undersized buffer in non-streaming mode + if (!m_is_streaming && payload_size && + payload_size != static_cast(payload.size())) { + errorCode = "E_LOGIC"; + std::stringstream ss; + ss << "Logic error: given an undersized payload (have " + << payload.size() << ", expected " << payload_size + << ") in a non-streaming mode"; + errorMessage = ss.str(); + return false; + } + m_result_ready = false; m_queue->Produce(this); std::unique_lock lk(m_mtx); m_cv.wait(lk, [&] { return m_result_ready; }); @@ -272,6 +291,8 @@ bool HTTPRequest::sendPreparedRequest(const std::string &uri, } bool HTTPRequest::ReleaseHandle(CURL *curl) { + m_curl_handle = nullptr; + if (curl == nullptr) return false; // Note: Any option that's conditionally set in `HTTPRequest::SetupHandle` @@ -299,6 +320,18 @@ bool HTTPRequest::ReleaseHandle(CURL *curl) { return true; } +bool HTTPRequest::ContinueHandle() { + if (!m_curl_handle) { + return false; + } + + m_callback_payload->data = m_payload; + m_callback_payload->final = m_final; + m_callback_payload->sentSoFar = 0; + curl_easy_pause(m_curl_handle, 0); + return true; +} + bool HTTPRequest::SetupHandle(CURL *curl) { m_log.Log(XrdHTTPServer::Debug, "SetupHandle", "Sending HTTP request", m_uri.c_str()); @@ -340,13 +373,19 @@ bool HTTPRequest::SetupHandle(CURL *curl) { return false; } - rv = curl_easy_setopt(curl, CURLOPT_POSTFIELDS, m_payload.c_str()); + rv = curl_easy_setopt(curl, CURLOPT_POSTFIELDS, m_payload.data()); if (rv != CURLE_OK) { this->errorCode = "E_CURL_LIB"; this->errorMessage = "curl_easy_setopt( CURLOPT_POSTFIELDS ) failed."; return false; } + + if (m_is_streaming) { + errorCode = "E_NOT_IMPL"; + errorMessage = + "Streaming posts not implemented in backend; internal error."; + } } if (httpVerb == "PUT") { @@ -361,7 +400,7 @@ bool HTTPRequest::SetupHandle(CURL *curl) { // and the offset of the data Here, we tell curl_easy_setopt to use the // read_callback function to read the data from the payload m_callback_payload = std::unique_ptr( - new HTTPRequest::Payload{&m_payload, 0}); + new HTTPRequest::Payload{m_payload, 0, m_final, *this}); rv = curl_easy_setopt(curl, CURLOPT_READDATA, m_callback_payload.get()); if (rv != CURLE_OK) { this->errorCode = "E_CURL_LIB"; @@ -376,6 +415,15 @@ bool HTTPRequest::SetupHandle(CURL *curl) { "curl_easy_setopt( CURLOPT_READFUNCTION ) failed."; return false; } + + if (m_payload_size || !m_is_streaming) { + if (curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE, + m_payload_size) != CURLE_OK) { + errorCode = "E_CURL_LIB"; + errorMessage = + "curl_easy_setopt( CURLOPT_INFILESIZE_LARGE ) failed."; + } + } } rv = curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 1); @@ -526,6 +574,7 @@ bool HTTPRequest::SetupHandle(CURL *curl) { } } + m_curl_handle = curl; return true; } diff --git a/src/HTTPCommands.hh b/src/HTTPCommands.hh index 520c8fe..bfdc088 100644 --- a/src/HTTPCommands.hh +++ b/src/HTTPCommands.hh @@ -25,6 +25,7 @@ #include #include #include +#include #include #include @@ -63,10 +64,15 @@ class HTTPRequest { const std::string &getErrorMessage() const { return errorMessage; } const std::string &getResultString() const { return m_result; } - // Currently only used in PUTS, but potentially useful elsewhere + // State of the payload upload for the curl callbacks struct Payload { - const std::string *data; - size_t sentSoFar; + std::string_view data; + off_t sentSoFar{0}; + bool final{true}; + HTTPRequest &m_parent; + + void NotifyPaused(); // Notify the parent request the curl handle has + // been paused }; // Initialize libraries for HTTP. @@ -76,11 +82,34 @@ class HTTPRequest { static void Init(XrdSysError &); protected: - bool sendPreparedRequest(const std::string &uri, - const std::string &payload); + // Send the request to the HTTP server. + // Blocks until the request has completed. + // If `final` is set to `false`, the HTTPRequest object will start streaming + // a request and assume that `sendPreparedRequest` will be repeated until + // all data is provided (the sum total of the chunks given is the + // payload_size). If payload_size is 0 and final is false, this indicates + // the complete size of the PUT is unknown and chunked encoding will be + // used. + // + // - url: URL, including query parameters, to use. + // - payload: The payload contents when uploading. + // - payload_size: Size of the entire payload (not just the current chunk). + // - final: True if this is the last or only payload for the request. False + // otherwise. + bool sendPreparedRequest(const std::string &url, + const std::string_view payload, off_t payload_size, + bool final); const std::string &getProtocol() { return m_protocol; } + // Returns true if the command is in-progress. + bool inProgress() const { return m_in_progress; } + + // Returns true if the command is a streaming/partial request. + // A streaming request is one that requires multiple calls to + // `sendPreparedRequest` to complete. + bool isStreamingRequest() const { return m_is_streaming; } + typedef std::map AttributeValueMap; AttributeValueMap query_parameters; AttributeValueMap headers; @@ -113,6 +142,10 @@ class HTTPRequest { // processed by a worker virtual bool SetupHandle( CURL *curl); // Configure the curl handle to be used by a given request. + + virtual bool + ContinueHandle(); // Continue the request processing after a pause. + CurlResult ProcessCurlResult( CURL *curl, CURLcode rv); // Process a curl command that ran to completion. @@ -122,6 +155,11 @@ class HTTPRequest { // (curl request did not complete) bool ReleaseHandle( CURL *curl); // Cleanup any resources associated with the curl handle + CURL *getHandle() const { return m_curl_handle; } + + // Sets whether the command is "in-progress" (has a currently-running curl + // handle). + void SetInProgress(bool in_progress) { m_in_progress = in_progress; } const TokenFile *m_token{nullptr}; @@ -138,10 +176,17 @@ class HTTPRequest { m_mtx; // Mutex guarding the results from the curl worker's callback std::condition_variable m_cv; // Condition variable to notify the curl // worker completed the callback - bool m_result_ready{false}; // Flag indicating the results data is ready. + bool m_final{false}; // Flag indicating this is the last sendPreparedRequest + // call of the overall HTTPRequest + bool m_in_progress{false}; // Flag indicating this command is in progress. + bool m_is_streaming{ + false}; // Flag indicating this command is a streaming request. + bool m_result_ready{false}; // Flag indicating the results data is ready. + off_t m_payload_size{0}; // Size of the entire upload payload; 0 if unknown. std::string m_protocol; std::string m_uri; // URL to request from libcurl - std::string m_payload; + std::string_view m_payload; + CURL *m_curl_handle{nullptr}; // The curl handle for the ongoing request char m_errorBuffer[CURL_ERROR_SIZE]; // Static error buffer for libcurl unsigned m_retry_count{0}; }; diff --git a/src/S3Commands.cc b/src/S3Commands.cc index 587994b..a6d9db9 100644 --- a/src/S3Commands.cc +++ b/src/S3Commands.cc @@ -33,6 +33,7 @@ #include #include #include +#include AmazonRequest::~AmazonRequest() {} @@ -40,8 +41,10 @@ bool AmazonRequest::SendRequest() { query_parameters.insert(std::make_pair("Version", "2012-10-01")); switch (signatureVersion) { - case 4: - return sendV4Request(canonicalizeQueryString()); + case 4: { + auto qs = canonicalizeQueryString(); + return sendV4Request(qs, qs.size(), true, true); + } default: this->errorCode = "E_INTERNAL"; this->errorMessage = "Invalid signature version."; @@ -133,7 +136,7 @@ void convertMessageDigestToLowercaseHex(const unsigned char *messageDigest, hexEncoded); } -bool doSha256(const std::string &payload, unsigned char *messageDigest, +bool doSha256(const std::string_view payload, unsigned char *messageDigest, unsigned int *mdLength) { return AWSv4Impl::doSha256(payload, messageDigest, mdLength); } @@ -142,7 +145,7 @@ std::string pathEncode(const std::string &original) { return AWSv4Impl::pathEncode(original); } -bool AmazonRequest::createV4Signature(const std::string &payload, +bool AmazonRequest::createV4Signature(const std::string_view payload, std::string &authorizationValue, bool sendContentSHA) { // If we're using temporary credentials, we need to add the token @@ -225,18 +228,21 @@ bool AmazonRequest::createV4Signature(const std::string &payload, // The canonical payload hash is the lowercase hexadecimal string of the // (SHA256) hash value of the payload. - unsigned int mdLength = 0; - unsigned char messageDigest[EVP_MAX_MD_SIZE]; - if (!doSha256(payload, messageDigest, &mdLength)) { - this->errorCode = "E_INTERNAL"; - this->errorMessage = "Unable to hash payload."; - return false; - } std::string payloadHash; - convertMessageDigestToLowercaseHex(messageDigest, mdLength, payloadHash); if (sendContentSHA) { - headers["X-Amz-Content-Sha256"] = payloadHash; + unsigned int mdLength = 0; + unsigned char messageDigest[EVP_MAX_MD_SIZE]; + if (!doSha256(payload, messageDigest, &mdLength)) { + this->errorCode = "E_INTERNAL"; + this->errorMessage = "Unable to hash payload."; + return false; + } + convertMessageDigestToLowercaseHex(messageDigest, mdLength, + payloadHash); + } else { + payloadHash = "UNSIGNED-PAYLOAD"; } + headers["X-Amz-Content-Sha256"] = payloadHash; // The canonical list of headers is a sorted list of lowercase header // names paired via ':' with the trimmed header value, each pair @@ -315,16 +321,18 @@ bool AmazonRequest::createV4Signature(const std::string &payload, // // Hash the canonical request the way we did the payload. + std::string canonicalRequestHash; + unsigned int mdLength = 0; + unsigned char messageDigest[EVP_MAX_MD_SIZE]; if (!doSha256(canonicalRequest, messageDigest, &mdLength)) { - this->errorCode = "E_INTERNAL"; - this->errorMessage = "Unable to hash canonical request."; + errorCode = "E_INTERNAL"; + errorMessage = "Unable to hash canonical request."; return false; } - std::string canonicalRequestHash; convertMessageDigestToLowercaseHex(messageDigest, mdLength, canonicalRequestHash); - std::string s = this->service; + std::string s = service; if (s.empty()) { size_t i = host.find("."); if (i != std::string::npos) { @@ -408,8 +416,9 @@ bool AmazonRequest::createV4Signature(const std::string &payload, return true; } -bool AmazonRequest::sendV4Request(const std::string &payload, - bool sendContentSHA) { +bool AmazonRequest::sendV4Request(const std::string_view payload, + off_t payload_size, bool sendContentSHA, + bool final) { if ((getProtocol() != "http") && (getProtocol() != "https")) { this->errorCode = "E_INVALID_SERVICE_URL"; this->errorMessage = "Service URL not of a known protocol (http[s])."; @@ -438,41 +447,38 @@ bool AmazonRequest::sendV4Request(const std::string &payload, if (!canonicalQueryString.empty()) { url += "?" + canonicalQueryString; } - return sendPreparedRequest(url, payload); + return sendPreparedRequest(url, payload, payload_size, final); } // It's stated in the API documentation that you can upload to any region // via us-east-1, which is moderately crazy. -bool AmazonRequest::SendS3Request(const std::string &payload) { +bool AmazonRequest::SendS3Request(const std::string_view payload, + off_t payload_size, bool final) { + if (!m_streamingRequest && !final) { + if (payload_size == 0) { + errorCode = "E_INTERNAL"; + errorMessage = "S3 does not support streaming requests where the " + "payload size is unknown"; + return false; + } + m_streamingRequest = true; + } headers["Content-Type"] = "binary/octet-stream"; - std::string contentLength; - formatstr(contentLength, "%zu", payload.size()); - headers["Content-Length"] = contentLength; - // Another undocumented CURL feature: transfer-encoding is "chunked" - // by default for "PUT", which we really don't want. - headers["Transfer-Encoding"] = ""; + service = "s3"; if (region.empty()) { region = "us-east-1"; } - return sendV4Request(payload, true); + return sendV4Request(payload, payload_size, !m_streamingRequest, final); } // --------------------------------------------------------------------------- AmazonS3Upload::~AmazonS3Upload() {} -bool AmazonS3Upload::SendRequest(const std::string &payload, off_t offset, - size_t size) { - if (offset != 0 || size != 0) { - std::string range; - formatstr(range, "bytes=%lld-%lld", static_cast(offset), - static_cast(offset + size - 1)); - headers["Range"] = range.c_str(); - } - +bool AmazonS3Upload::SendRequest(const std::string &payload) { httpVerb = "PUT"; - return SendS3Request(payload); + return SendS3Request(payload, payload.size(), true); } // --------------------------------------------------------------------------- @@ -496,7 +502,7 @@ bool AmazonS3CompleteMultipartUpload::SendRequest( } payload += ""; - return SendS3Request(payload); + return SendS3Request(payload, payload.size(), true); } // --------------------------------------------------------------------------- @@ -508,17 +514,18 @@ bool AmazonS3CreateMultipartUpload::SendRequest() { query_parameters["x-id"] = "CreateMultipartUpload"; httpVerb = "POST"; - return SendS3Request(""); + return SendS3Request("", 0, true); } -bool AmazonS3SendMultipartPart::SendRequest(const std::string &payload, +bool AmazonS3SendMultipartPart::SendRequest(const std::string_view payload, const std::string &partNumber, - const std::string &uploadId) { + const std::string &uploadId, + size_t payloadSize, bool final) { query_parameters["partNumber"] = partNumber; query_parameters["uploadId"] = uploadId; includeResponseHeader = true; httpVerb = "PUT"; - return SendS3Request(payload); + return SendS3Request(payload, payloadSize, final); } // --------------------------------------------------------------------------- @@ -536,7 +543,7 @@ bool AmazonS3Download::SendRequest(off_t offset, size_t size) { httpVerb = "GET"; std::string noPayloadAllowed; - return SendS3Request(noPayloadAllowed); + return SendS3Request(noPayloadAllowed, 0, true); } // --------------------------------------------------------------------------- @@ -547,7 +554,7 @@ bool AmazonS3Head::SendRequest() { httpVerb = "HEAD"; includeResponseHeader = true; std::string noPayloadAllowed; - return SendS3Request(noPayloadAllowed); + return SendS3Request(noPayloadAllowed, 0, true); } // --------------------------------------------------------------------------- @@ -565,7 +572,7 @@ bool AmazonS3List::SendRequest(const std::string &continuationToken) { // Operation is on the bucket itself; alter the URL to remove the object hostUrl = getProtocol() + "://" + host + bucketPath; - return SendS3Request(""); + return SendS3Request("", 0, true); } bool AmazonS3CreateMultipartUpload::Results(std::string &uploadId, diff --git a/src/S3Commands.hh b/src/S3Commands.hh index 56a5a29..d8519d8 100644 --- a/src/S3Commands.hh +++ b/src/S3Commands.hh @@ -22,8 +22,12 @@ #include "S3AccessInfo.hh" #include +#include #include +// The base class for all requests to the S3 endpoint. +// Handles common activities like signing requests and forwarding to the +// underlying HTTPRequest object. class AmazonRequest : public HTTPRequest { public: AmazonRequest(const S3AccessInfo &ai, const std::string objectName, @@ -81,14 +85,35 @@ class AmazonRequest : public HTTPRequest { std::string &path); virtual bool SendRequest(); - virtual bool SendS3Request(const std::string &payload); + + // Send a request to the S3 service. + // + // - payload: contents of the request itself + // - payload_size: final size of the payload for uploads; 0 if unknown. + // - final: True if this is the last (or only) payload of the request; false + // otherwise + virtual bool SendS3Request(const std::string_view payload, + off_t payload_size, bool final); static void Init(XrdSysError &log) { HTTPRequest::Init(log); } protected: - bool sendV4Request(const std::string &payload, bool sendContentSHA = false); + // Send a request to the S3 service using the V4 signing method. + // + // - payload: contents of the request (for uploads or for XML-based + // commands) + // - payload_size: final size of the payload for uploads; 0 if unknown. + // - sendContentSHA: Whether to add the header indicating the checksum of + // the final payload. Servers may verify this is what they received. + // - final: True if this is the last (or only) payload of the request; false + // otherwise. + bool sendV4Request(const std::string_view payload, off_t payload_size, + bool sendContentSHA, bool final); bool retainObject; + bool m_streamingRequest{ + false}; // Is this a streaming request? Streaming requests will not + // include a SHA-256 signature in the header std::string accessKeyFile; std::string secretKeyFile; @@ -110,14 +135,14 @@ class AmazonRequest : public HTTPRequest { std::string m_style; private: - bool createV4Signature(const std::string &payload, + bool createV4Signature(const std::string_view payload, std::string &authorizationHeader, bool sendContentSHA = false); std::string canonicalizeQueryString(); }; -class AmazonS3Upload : public AmazonRequest { +class AmazonS3Upload final : public AmazonRequest { using AmazonRequest::SendRequest; public: @@ -133,14 +158,13 @@ class AmazonS3Upload : public AmazonRequest { virtual ~AmazonS3Upload(); - virtual bool SendRequest(const std::string &payload, off_t offset, - size_t size); + bool SendRequest(const std::string &payload); protected: std::string path; }; -class AmazonS3CreateMultipartUpload : public AmazonRequest { +class AmazonS3CreateMultipartUpload final : public AmazonRequest { using AmazonRequest::SendRequest; public: @@ -189,7 +213,7 @@ class AmazonS3CompleteMultipartUpload : public AmazonRequest { protected: }; -class AmazonS3SendMultipartPart : public AmazonRequest { +class AmazonS3SendMultipartPart final : public AmazonRequest { using AmazonRequest::SendRequest; public: @@ -207,14 +231,22 @@ class AmazonS3SendMultipartPart : public AmazonRequest { virtual ~AmazonS3SendMultipartPart(); - virtual bool SendRequest(const std::string &payload, - const std::string &partNumber, - const std::string &uploadId); + // Send (potentially a partial) payload up to S3. + // Blocks until all the data in payload has been sent to AWS. + // + // - payload: The data corresponding to this partial upload. + // - partNumber: The portion of the multipart upload. + // - uploadId: The upload ID assigned by the creation of the multipart + // upload + // - final: Set to true if this is the last of the part; false otherwise + bool SendRequest(const std::string_view payload, + const std::string &partNumber, const std::string &uploadId, + size_t payloadSize, bool final); protected: }; -class AmazonS3Download : public AmazonRequest { +class AmazonS3Download final : public AmazonRequest { using AmazonRequest::SendRequest; public: @@ -233,7 +265,7 @@ class AmazonS3Download : public AmazonRequest { virtual bool SendRequest(off_t offset, size_t size); }; -class AmazonS3Head : public AmazonRequest { +class AmazonS3Head final : public AmazonRequest { using AmazonRequest::SendRequest; public: @@ -250,6 +282,11 @@ class AmazonS3Head : public AmazonRequest { virtual ~AmazonS3Head(); virtual bool SendRequest(); + + off_t getSize() const { return m_size; } + + private: + off_t m_size{0}; }; struct S3ObjectInfo { @@ -257,7 +294,7 @@ struct S3ObjectInfo { std::string m_key; }; -class AmazonS3List : public AmazonRequest { +class AmazonS3List final : public AmazonRequest { using AmazonRequest::SendRequest; public: diff --git a/src/S3File.cc b/src/S3File.cc index 6ad133e..90a4b43 100644 --- a/src/S3File.cc +++ b/src/S3File.cc @@ -32,6 +32,7 @@ #include +#include #include #include #include @@ -39,6 +40,7 @@ #include #include #include +#include #include using namespace XrdHTTPServer; @@ -49,14 +51,29 @@ XrdVERSIONINFO(XrdOssGetFileSystem, S3); S3File::S3File(XrdSysError &log, S3FileSystem *oss) : m_log(log), m_oss(oss), content_length(0), last_modified(0), - write_buffer(""), partNumber(1) {} + partNumber(1) {} int S3File::Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env) { if (Oflag & O_CREAT) { - m_log.Log(LogMask::Info, "File opened for creation: ", path); + m_log.Log(LogMask::Info, "Open", "File opened for creation:", path); + m_create = true; } if (Oflag & O_APPEND) { - m_log.Log(LogMask::Info, "File opened for append: ", path); + m_log.Log(LogMask::Info, "Open", "File opened for append:", path); + } + + char *asize_char; + if ((asize_char = env.Get("oss.asize"))) { + off_t result{0}; + auto [ptr, ec] = std::from_chars( + asize_char, asize_char + strlen(asize_char), result); + if (ec == std::errc()) { + m_object_size = result; + } else { + m_log.Log(LogMask::Warning, + "Opened file has oss.asize set to an unparseable value: ", + asize_char); + } } if (m_log.getMsgMask() & XrdHTTPServer::Debug) { @@ -81,12 +98,13 @@ int S3File::Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env) { // This flag is not set when it's going to be a read operation // so we check if the file exists in order to be able to return a 404 - if (!Oflag) { + if (!Oflag || (Oflag & O_APPEND)) { AmazonS3Head head(m_ai, m_object, m_log); if (!head.SendRequest()) { return -ENOENT; } + head.getSize(); } return 0; @@ -188,66 +206,123 @@ int S3File::Fstat(struct stat *buff) { } ssize_t S3File::Write(const void *buffer, off_t offset, size_t size) { + if (offset != m_write_offset) { + m_log.Emsg( + "Write", + "Out-of-order write detected; S3 requires writes to be in order"); + return -EIO; + } if (uploadId == "") { AmazonS3CreateMultipartUpload startUpload(m_ai, m_object, m_log); if (!startUpload.SendRequest()) { - m_log.Emsg("Open", "S3 multipart request failed"); + m_log.Emsg("Write", "S3 multipart request failed"); return -ENOENT; } std::string errMsg; startUpload.Results(uploadId, errMsg); } - std::string payload((char *)buffer, size); - size_t payload_size = payload.length(); - if (payload_size != size) { - return -ENOENT; - } - write_buffer += payload; + size_t written = 0; + while (written != size) { + if (m_write_op) { + auto write_size = ContinueSendPart(buffer, size); + if (write_size < 0) { + return write_size; + } + offset += write_size; + m_write_offset += write_size; + buffer = static_cast(buffer) + write_size; + size -= write_size; + written += write_size; + if (!size) { + return written; + } + } - // XXX should this be configurable? 100mb gives us a TB of file size. It - // doesn't seem terribly useful to be much smaller and it's not clear the S3 - // API will work if it's much larger. - if (write_buffer.length() > 100000000) { - if (SendPart() == -ENOENT) { - return -ENOENT; + m_write_op.reset(new AmazonS3SendMultipartPart(m_ai, m_object, m_log)); + + // Calculate the size of the current chunk, if it's known. + m_part_size = m_s3_part_size; + if (!m_object_size) { + m_part_size = 0; + } else if (m_write_offset + static_cast(m_part_size) > + m_object_size) { + m_part_size = m_object_size - m_write_offset; } } - return size; + return written; } -int S3File::SendPart() { - int length = write_buffer.length(); - AmazonS3SendMultipartPart upload_part_request = - AmazonS3SendMultipartPart(m_ai, m_object, m_log); - if (!upload_part_request.SendRequest( - write_buffer, std::to_string(partNumber), uploadId)) { - m_log.Emsg("SendPart", "upload.SendRequest() failed"); - return -ENOENT; - } else { - m_log.Emsg("SendPart", "upload.SendRequest() succeeded"); - std::string resultString = upload_part_request.getResultString(); +ssize_t S3File::ContinueSendPart(const void *buffer, size_t size) { + m_part_written += size; + auto write_size = size; + if (m_part_written > m_s3_part_size) { + write_size = size - (m_part_written - m_s3_part_size); + m_part_written = m_s3_part_size; + } + auto is_final = (m_part_size > 0 && m_part_written == m_part_size) || + m_part_written == m_s3_part_size; + if (m_log.getMsgMask() & LogMask::Dump) { + std::stringstream ss; + ss << "Sending request with buffer of size=" << write_size + << " and is_final=" << is_final; + m_log.Log(LogMask::Dump, "ContinueSendPart", ss.str().c_str()); + } + if (!m_write_op->SendRequest( + std::string_view(static_cast(buffer), write_size), + std::to_string(partNumber), uploadId, m_object_size, is_final)) { + m_log.Emsg("Write", "Upload to S3 failed: ", + m_write_op->getErrorMessage().c_str()); + return -EIO; + } + if (is_final) { + m_part_written = 0; + m_part_size = 0; + auto &resultString = m_write_op->getResultString(); std::size_t startPos = resultString.find("ETag:"); + if (startPos == std::string::npos) { + m_log.Emsg("Write", "Result from S3 does not include ETag:", + resultString.c_str()); + return -EIO; + } std::size_t endPos = resultString.find("\"", startPos + 7); + if (startPos == std::string::npos) { + m_log.Emsg("Write", + "Result from S3 does not include ETag end-character:", + resultString.c_str()); + return -EIO; + } eTags.push_back( resultString.substr(startPos + 7, endPos - startPos - 7)); - + m_write_op.reset(); partNumber++; - write_buffer = ""; } - return length; + return write_size; } int S3File::Close(long long *retsz) { - // this is only true if a buffer exists that needs to be drained - if (write_buffer.length() > 0) { - if (SendPart() == -ENOENT) { + // If we opened the object in create mode but did not actually write + // anything, make a quick zero-length file. + if (m_create && !m_write_offset) { + AmazonS3Upload upload(m_ai, m_object, m_log); + if (!upload.SendRequest("")) { + m_log.Emsg("Close", "Failed to create zero-length file"); return -ENOENT; } else { - m_log.Emsg("Close", "Closed our S3 file"); + m_log.Emsg("Open", "upload.SendRequest() succeeded"); + return 0; } } + if (m_write_op) { + m_part_size = m_part_written; + auto written = ContinueSendPart(nullptr, 0); + if (written < 0) { + m_log.Emsg("Close", "Failed to complete the last S3 upload"); + return -ENOENT; + } + } + // this is only true if some parts have been written and need to be // finalized if (partNumber > 1) { diff --git a/src/S3File.hh b/src/S3File.hh index 250cb77..7bfb337 100644 --- a/src/S3File.hh +++ b/src/S3File.hh @@ -33,6 +33,8 @@ int parse_path(const S3FileSystem &fs, const char *path, std::string &exposedPath, std::string &object); +class AmazonS3SendMultipartPart; + class S3File : public XrdOssDF { public: S3File(XrdSysError &log, S3FileSystem *oss); @@ -95,7 +97,7 @@ class S3File : public XrdOssDF { time_t getLastModified() { return last_modified; } private: - int SendPart(); + ssize_t ContinueSendPart(const void *buffer, size_t size); XrdSysError &m_log; S3FileSystem *m_oss; @@ -105,8 +107,20 @@ class S3File : public XrdOssDF { size_t content_length; time_t last_modified; - std::string write_buffer; - std::string uploadId; + static const size_t m_s3_part_size = + 100'000'000; // The size of each S3 chunk. + + bool m_create{false}; int partNumber; + size_t m_part_written{ + 0}; // Number of bytes written for the current upload chunk. + size_t m_part_size{0}; // Size of the current upload chunk (0 if unknon); + off_t m_write_offset{0}; // Offset of the file pointer for writes (helps + // detect out-of-order writes). + off_t m_object_size{ + -1}; // Expected size of the completed object; -1 if unknown. + std::string uploadId; // For creates, upload ID as assigned by t std::vector eTags; + std::unique_ptr + m_write_op; // The in-progress operation for a multi-part upload. }; From 4183ba02753f859cea0bccb7e620ff05848e5989 Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Mon, 28 Oct 2024 09:39:31 -0500 Subject: [PATCH 05/11] clang-format of the curl headers --- src/CurlUtil.hh | 34 +++++++++++++++++----------------- src/CurlWorker.hh | 38 ++++++++++++++++++-------------------- 2 files changed, 35 insertions(+), 37 deletions(-) diff --git a/src/CurlUtil.hh b/src/CurlUtil.hh index d97fbe9..9457403 100644 --- a/src/CurlUtil.hh +++ b/src/CurlUtil.hh @@ -20,10 +20,10 @@ #include #include +#include #include #include #include -#include // Forward dec'ls typedef void CURL; @@ -43,25 +43,25 @@ CURL *GetHandle(bool verbose); * multi-curl driver thread is based on polling FD's */ class HandlerQueue { -public: - HandlerQueue(); + public: + HandlerQueue(); - void Produce(HTTPRequest *handler); + void Produce(HTTPRequest *handler); - HTTPRequest *Consume(); - HTTPRequest *TryConsume(); + HTTPRequest *Consume(); + HTTPRequest *TryConsume(); - int PollFD() const {return m_read_fd;} + int PollFD() const { return m_read_fd; } - CURL *GetHandle(); - void RecycleHandle(CURL *); + CURL *GetHandle(); + void RecycleHandle(CURL *); -private: - std::deque m_ops; - thread_local static std::vector m_handles; - std::condition_variable m_cv; - std::mutex m_mutex; - const static unsigned m_max_pending_ops{20}; - int m_read_fd{-1}; - int m_write_fd{-1}; + private: + std::deque m_ops; + thread_local static std::vector m_handles; + std::condition_variable m_cv; + std::mutex m_mutex; + const static unsigned m_max_pending_ops{20}; + int m_read_fd{-1}; + int m_write_fd{-1}; }; diff --git a/src/CurlWorker.hh b/src/CurlWorker.hh index 928dd11..8671be5 100644 --- a/src/CurlWorker.hh +++ b/src/CurlWorker.hh @@ -29,24 +29,22 @@ class HTTPRequest; class HandlerQueue; class CurlWorker { -public: - CurlWorker(std::shared_ptr queue, XrdSysError &logger) : - m_queue(queue), - m_logger(logger) - {} - - CurlWorker(const CurlWorker &) = delete; - - void Run(); - static void RunStatic(CurlWorker *myself); - static unsigned GetPollThreads() {return m_workers;} - -private: - std::shared_ptr m_queue; - std::unordered_map m_op_map; - XrdSysError &m_logger; - - const static unsigned m_workers{5}; - const static unsigned m_max_ops{20}; - const static unsigned m_marker_period{5}; + public: + CurlWorker(std::shared_ptr queue, XrdSysError &logger) + : m_queue(queue), m_logger(logger) {} + + CurlWorker(const CurlWorker &) = delete; + + void Run(); + static void RunStatic(CurlWorker *myself); + static unsigned GetPollThreads() { return m_workers; } + + private: + std::shared_ptr m_queue; + std::unordered_map m_op_map; + XrdSysError &m_logger; + + const static unsigned m_workers{5}; + const static unsigned m_max_ops{20}; + const static unsigned m_marker_period{5}; }; From 7ea062f2e60343ff4e11a874c11917cf9162a44f Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Sat, 23 Nov 2024 22:51:49 -0600 Subject: [PATCH 06/11] Avoid unpausing from a separate thread We want to always unpause a given operation from the same thread that is handling it. If a separate thread can pick up the operation, there is a race condition where both the original one and new one are operating on the same `CURL *` handle at the same time, resulting in an observed segfault. This commit introduces a separate "unpause" queue for each curl worker; this queue is notified by the parent when there is additional data available. --- src/CurlUtil.cc | 24 ++++++++++++++++-------- src/CurlWorker.hh | 3 +++ src/HTTPCommands.cc | 13 +++++++++---- src/HTTPCommands.hh | 31 ++++++++++++++++++++----------- test/s3_unit_tests.cc | 15 ++++++++++----- 5 files changed, 58 insertions(+), 28 deletions(-) diff --git a/src/CurlUtil.cc b/src/CurlUtil.cc index 2b4cc05..8cc10f0 100644 --- a/src/CurlUtil.cc +++ b/src/CurlUtil.cc @@ -187,6 +187,7 @@ void CurlWorker::Run() { // is waiting on it is undefined behavior. auto queue_ref = m_queue; auto &queue = *queue_ref.get(); + m_unpause_queue.reset(new HandlerQueue()); m_logger.Log(LogMask::Debug, "Run", "Started a curl worker"); CURLM *multi_handle = curl_multi_init(); @@ -199,23 +200,29 @@ void CurlWorker::Run() { CURLMcode mres = CURLM_OK; std::vector waitfds; - waitfds.resize(1); + waitfds.resize(2); waitfds[0].fd = queue.PollFD(); waitfds[0].events = CURL_WAIT_POLLIN; waitfds[0].revents = 0; + waitfds[1].fd = m_unpause_queue->PollFD(); + waitfds[1].events = CURL_WAIT_POLLIN; + waitfds[1].revents = 0; while (true) { + while (running_handles < static_cast(m_max_ops)) { + auto op = m_unpause_queue->TryConsume(); + if (!op) { + break; + } + op->ContinueHandle(); + } while (running_handles < static_cast(m_max_ops)) { auto op = running_handles == 0 ? queue.Consume() : queue.TryConsume(); if (!op) { break; } - if (op->inProgress()) { - op->ContinueHandle(); - continue; - } - op->SetInProgress(true); + op->SetUnpauseQueue(m_unpause_queue); auto curl = queue.GetHandle(); if (curl == nullptr) { @@ -282,7 +289,8 @@ void CurlWorker::Run() { } else if (mres != CURLM_OK) { if (m_logger.getMsgMask() & LogMask::Warning) { std::stringstream ss; - ss << "Failed to perform multi-handle operation: " << mres; + ss << "Failed to perform multi-handle operation: " + << curl_multi_strerror(mres); m_logger.Log(LogMask::Warning, "CurlWorker", ss.str().c_str()); } break; @@ -314,8 +322,8 @@ void CurlWorker::Run() { queue.RecycleHandle(iter->first); } else { curl_easy_cleanup(iter->first); - m_op_map.erase(iter); } + m_op_map.erase(iter); } } while (msg); } diff --git a/src/CurlWorker.hh b/src/CurlWorker.hh index 8671be5..6e14a84 100644 --- a/src/CurlWorker.hh +++ b/src/CurlWorker.hh @@ -41,6 +41,9 @@ class CurlWorker { private: std::shared_ptr m_queue; + std::shared_ptr + m_unpause_queue; // Queue for notifications that a handle can be + // unpaused. std::unordered_map m_op_map; XrdSysError &m_logger; diff --git a/src/HTTPCommands.cc b/src/HTTPCommands.cc index fdcaa70..2ae8332 100644 --- a/src/HTTPCommands.cc +++ b/src/HTTPCommands.cc @@ -228,7 +228,7 @@ void HTTPRequest::Payload::NotifyPaused() { m_parent.Notify(); } // A callback function that gets passed to curl_easy_setopt for reading data // from the payload -size_t read_callback(char *buffer, size_t size, size_t n, void *v) { +size_t HTTPRequest::ReadCallback(char *buffer, size_t size, size_t n, void *v) { // The callback gets the void pointer that we set with CURLOPT_READDATA. In // this case, it's a pointer to an HTTPRequest::Payload struct that contains // the data to be sent, along with the offset of the data that has already @@ -283,7 +283,11 @@ bool HTTPRequest::sendPreparedRequest(const std::string &uri, } m_result_ready = false; - m_queue->Produce(this); + if (m_unpause_queue) { + m_unpause_queue->Produce(this); + } else { + m_queue->Produce(this); + } std::unique_lock lk(m_mtx); m_cv.wait(lk, [&] { return m_result_ready; }); @@ -328,7 +332,7 @@ bool HTTPRequest::ContinueHandle() { m_callback_payload->data = m_payload; m_callback_payload->final = m_final; m_callback_payload->sentSoFar = 0; - curl_easy_pause(m_curl_handle, 0); + curl_easy_pause(m_curl_handle, CURLPAUSE_CONT); return true; } @@ -408,7 +412,8 @@ bool HTTPRequest::SetupHandle(CURL *curl) { return false; } - rv = curl_easy_setopt(curl, CURLOPT_READFUNCTION, read_callback); + rv = curl_easy_setopt(curl, CURLOPT_READFUNCTION, + HTTPRequest::ReadCallback); if (rv != CURLE_OK) { this->errorCode = "E_CURL_LIB"; this->errorMessage = diff --git a/src/HTTPCommands.hh b/src/HTTPCommands.hh index bfdc088..67a08b8 100644 --- a/src/HTTPCommands.hh +++ b/src/HTTPCommands.hh @@ -100,16 +100,24 @@ class HTTPRequest { const std::string_view payload, off_t payload_size, bool final); - const std::string &getProtocol() { return m_protocol; } - // Returns true if the command is in-progress. - bool inProgress() const { return m_in_progress; } + // Called by the curl handler thread that the request has been finished. + virtual void Notify(); + + const std::string &getProtocol() { return m_protocol; } // Returns true if the command is a streaming/partial request. // A streaming request is one that requires multiple calls to // `sendPreparedRequest` to complete. bool isStreamingRequest() const { return m_is_streaming; } + // Record the unpause queue associated with this request. + // + // Future continuations of this request will be sent directly to this queue. + void SetUnpauseQueue(std::shared_ptr queue) { + m_unpause_queue = queue; + } + typedef std::map AttributeValueMap; AttributeValueMap query_parameters; AttributeValueMap headers; @@ -138,8 +146,6 @@ class HTTPRequest { private: enum class CurlResult { Ok, Fail, Retry }; - void Notify(); // Notify the main request thread the request has been - // processed by a worker virtual bool SetupHandle( CURL *curl); // Configure the curl handle to be used by a given request. @@ -157,9 +163,9 @@ class HTTPRequest { CURL *curl); // Cleanup any resources associated with the curl handle CURL *getHandle() const { return m_curl_handle; } - // Sets whether the command is "in-progress" (has a currently-running curl - // handle). - void SetInProgress(bool in_progress) { m_in_progress = in_progress; } + // Callback for libcurl when the library is ready to read more data from our + // buffer. + static size_t ReadCallback(char *buffer, size_t size, size_t n, void *v); const TokenFile *m_token{nullptr}; @@ -168,17 +174,20 @@ class HTTPRequest { m_workers_initialized; // The global state of the worker initialization. static std::shared_ptr m_queue; // Global queue for all HTTP requests to be processed. + std::shared_ptr m_unpause_queue{ + nullptr}; // Queue to notify the request can be resumed. static std::vector m_workers; // Set of all the curl worker threads. // The following variables manage the state of the request. std::mutex m_mtx; // Mutex guarding the results from the curl worker's callback - std::condition_variable m_cv; // Condition variable to notify the curl - // worker completed the callback + + // Condition variable to notify the curl worker completed the callback. + std::condition_variable m_cv; + bool m_final{false}; // Flag indicating this is the last sendPreparedRequest // call of the overall HTTPRequest - bool m_in_progress{false}; // Flag indicating this command is in progress. bool m_is_streaming{ false}; // Flag indicating this command is a streaming request. bool m_result_ready{false}; // Flag indicating the results data is ready. diff --git a/test/s3_unit_tests.cc b/test/s3_unit_tests.cc index 9ab72c0..1b0eba0 100644 --- a/test/s3_unit_tests.cc +++ b/test/s3_unit_tests.cc @@ -292,7 +292,7 @@ s3.end public: void WritePattern(const std::string &name, const off_t writeSize, - const char chunkByte, const size_t chunkSize) { + const unsigned char chunkByte, const size_t chunkSize) { XrdSysLogger log; S3FileSystem fs(&log, m_configfn.c_str(), nullptr); @@ -308,12 +308,11 @@ s3.end ? static_cast(writeSize) : chunkSize; off_t curWriteSize = writeSize; - char curChunkByte = chunkByte; + auto curChunkByte = chunkByte; off_t offset = 0; while (sizeToWrite) { std::string writeBuffer(sizeToWrite, curChunkByte); - std::cerr << "Writing bytes at offset: " << offset << std::endl; rv = fh->Write(writeBuffer.data(), offset, sizeToWrite); ASSERT_EQ(rv, static_cast(sizeToWrite)); @@ -333,7 +332,8 @@ s3.end private: void VerifyContents(S3FileSystem &fs, const std::string &obj, - off_t expectedSize, char chunkByte, size_t chunkSize) { + off_t expectedSize, unsigned char chunkByte, + size_t chunkSize) { std::unique_ptr fh(fs.newFile()); ASSERT_TRUE(fh); @@ -344,7 +344,7 @@ s3.end size_t sizeToRead = (static_cast(chunkSize) >= expectedSize) ? expectedSize : chunkSize; - char curChunkByte = chunkByte; + unsigned char curChunkByte = chunkByte; off_t offset = 0; while (sizeToRead) { std::string readBuffer(sizeToRead, curChunkByte - 1); @@ -383,6 +383,11 @@ TEST_F(FileSystemS3Fixture, UploadZero) { WritePattern("/test/write_zero.txt", 0, 'X', 32 * 1024); } +// Upload larger - two chunks. +TEST_F(FileSystemS3Fixture, UploadTwoChunks) { + WritePattern("/test/write_two_chunks.txt", 1'024 + 42, 'a', 1'024); +} + // Upload larger - a few chunks. TEST_F(FileSystemS3Fixture, UploadMultipleChunks) { WritePattern("/test/write_multi_chunks.txt", (10'000 / 1'024) * 1'024 + 42, From f195be487844cdd1c596ec2f2e06f611c05dd35c Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Wed, 27 Nov 2024 15:44:50 -0600 Subject: [PATCH 07/11] Time out in-progress transfers If an entire part is a single libcurl operation, a client that starts writing - but then leaves for an extended period of time - will leave dangling references inside libcurl (eventually exhausting the number of allowable transfers). This commit adds a background thread for S3File that will go through all pending uploads and check to see if they are still live; if not, then it'll timeout the operation. --- src/HTTPCommands.cc | 51 +++++++++++++++++++-- src/HTTPCommands.hh | 29 +++++++++++- src/S3File.cc | 103 ++++++++++++++++++++++++++++++++++++++++++ src/S3File.hh | 48 +++++++++++++++++++- test/s3_unit_tests.cc | 29 ++++++++++++ 5 files changed, 253 insertions(+), 7 deletions(-) diff --git a/src/HTTPCommands.cc b/src/HTTPCommands.cc index 2ae8332..c5c714e 100644 --- a/src/HTTPCommands.cc +++ b/src/HTTPCommands.cc @@ -43,6 +43,8 @@ std::shared_ptr HTTPRequest::m_queue = std::make_unique(); bool HTTPRequest::m_workers_initialized = false; std::vector HTTPRequest::m_workers; +std::chrono::steady_clock::duration HTTPRequest::m_timeout_duration = + std::chrono::seconds(10); namespace { @@ -235,6 +237,12 @@ size_t HTTPRequest::ReadCallback(char *buffer, size_t size, size_t n, void *v) { // been sent. HTTPRequest::Payload *payload = (HTTPRequest::Payload *)v; + if (payload->m_parent.Timeout()) { + payload->m_parent.errorCode = "E_TIMEOUT"; + payload->m_parent.errorMessage = "Upload operation timed out"; + return CURL_READFUNC_ABORT; + } + if (payload->sentSoFar == static_cast(payload->data.size())) { payload->sentSoFar = 0; if (payload->final) { @@ -269,6 +277,16 @@ bool HTTPRequest::sendPreparedRequest(const std::string &uri, if (!m_is_streaming && !final) { m_is_streaming = true; } + if (m_timeout) { + errorCode = "E_TIMEOUT"; + errorMessage = "Transfer has timed out due to inactivity."; + return false; + } + if (!errorCode.empty()) { + return false; + } + + m_last_request = std::chrono::steady_clock::now(); m_final = final; // Detect whether we were given an undersized buffer in non-streaming mode if (!m_is_streaming && payload_size && @@ -294,6 +312,27 @@ bool HTTPRequest::sendPreparedRequest(const std::string &uri, return errorCode.empty(); } +void HTTPRequest::Tick(std::chrono::steady_clock::time_point now) { + if (!m_is_streaming) { + return; + } + if (now - m_last_request <= m_timeout_duration) { + return; + } + + if (m_timeout) { + return; + } + m_timeout = true; + + if (m_unpause_queue) { + std::unique_lock lk(m_mtx); + m_result_ready = false; + m_unpause_queue->Produce(this); + m_cv.wait(lk, [&] { return m_result_ready; }); + } +} + bool HTTPRequest::ReleaseHandle(CURL *curl) { m_curl_handle = nullptr; @@ -604,11 +643,13 @@ HTTPRequest::CurlResult HTTPRequest::ProcessCurlResult(CURL *curl, auto unique = std::unique_ptr((void *)1, cleaner); if (rv != 0) { - errorCode = "E_CURL_IO"; - std::ostringstream error; - error << "curl failed (" << rv << "): '" << curl_easy_strerror(rv) - << "'."; - errorMessage = error.str(); + if (errorCode.empty()) { + errorCode = "E_CURL_IO"; + std::ostringstream error; + error << "curl failed (" << rv << "): '" << curl_easy_strerror(rv) + << "'."; + errorMessage = error.str(); + } return CurlResult::Fail; } diff --git a/src/HTTPCommands.hh b/src/HTTPCommands.hh index 67a08b8..6f8f39d 100644 --- a/src/HTTPCommands.hh +++ b/src/HTTPCommands.hh @@ -81,6 +81,20 @@ class HTTPRequest { // context. static void Init(XrdSysError &); + // Perform maintenance of the request. + void Tick(std::chrono::steady_clock::time_point); + + // Sets the duration after which an in-progress operation may be considered + // stalled and hence timeout. + static void SetStallTimeout(std::chrono::steady_clock::duration timeout) { + m_timeout_duration = timeout; + } + + // Return the stall timeout duration currently in use. + static std::chrono::steady_clock::duration GetStallTimeout() { + return m_timeout_duration; + } + protected: // Send the request to the HTTP server. // Blocks until the request has completed. @@ -100,7 +114,6 @@ class HTTPRequest { const std::string_view payload, off_t payload_size, bool final); - // Called by the curl handler thread that the request has been finished. virtual void Notify(); @@ -118,6 +131,10 @@ class HTTPRequest { m_unpause_queue = queue; } + // Return whether or not the request has timed out since the last + // call to send more data. + bool Timeout() const { return m_timeout; } + typedef std::map AttributeValueMap; AttributeValueMap query_parameters; AttributeValueMap headers; @@ -190,6 +207,7 @@ class HTTPRequest { // call of the overall HTTPRequest bool m_is_streaming{ false}; // Flag indicating this command is a streaming request. + bool m_timeout{false}; // Flag indicating the request has timed out. bool m_result_ready{false}; // Flag indicating the results data is ready. off_t m_payload_size{0}; // Size of the entire upload payload; 0 if unknown. std::string m_protocol; @@ -198,6 +216,15 @@ class HTTPRequest { CURL *m_curl_handle{nullptr}; // The curl handle for the ongoing request char m_errorBuffer[CURL_ERROR_SIZE]; // Static error buffer for libcurl unsigned m_retry_count{0}; + + // Time when the last request was sent on this object; used to determine + // whether the operation has timed out. + std::chrono::steady_clock::time_point m_last_request{ + std::chrono::steady_clock::now()}; + + // Duration after which a partially-completed request will timeout if + // no progress has been made. + static std::chrono::steady_clock::duration m_timeout_duration; }; class HTTPUpload : public HTTPRequest { diff --git a/src/S3File.cc b/src/S3File.cc index 90a4b43..405d05f 100644 --- a/src/S3File.cc +++ b/src/S3File.cc @@ -32,8 +32,10 @@ #include +#include #include #include +#include #include #include #include @@ -41,6 +43,7 @@ #include #include #include +#include #include using namespace XrdHTTPServer; @@ -49,6 +52,12 @@ S3FileSystem *g_s3_oss = nullptr; XrdVERSIONINFO(XrdOssGetFileSystem, S3); +std::vector, + std::weak_ptr>> + S3File::m_pending_ops; +std::mutex S3File::m_pending_lk; +std::once_flag S3File::m_monitor_launch; + S3File::S3File(XrdSysError &log, S3FileSystem *oss) : m_log(log), m_oss(oss), content_length(0), last_modified(0), partNumber(1) {} @@ -61,6 +70,9 @@ int S3File::Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env) { if (Oflag & O_APPEND) { m_log.Log(LogMask::Info, "Open", "File opened for append:", path); } + if (Oflag & (O_RDWR | O_WRONLY)) { + m_write_lk.reset(new std::mutex); + } char *asize_char; if ((asize_char = env.Get("oss.asize"))) { @@ -206,16 +218,29 @@ int S3File::Fstat(struct stat *buff) { } ssize_t S3File::Write(const void *buffer, off_t offset, size_t size) { + auto write_mutex = m_write_lk; + if (!write_mutex) { + return -EBADF; + } + std::lock_guard lk(*write_mutex); + if (offset != m_write_offset) { m_log.Emsg( "Write", "Out-of-order write detected; S3 requires writes to be in order"); + m_write_offset = -1; + return -EIO; + } + if (m_write_offset == -1) { + // Previous I/O error has occurred. File is in bad state, immediately + // fail. return -EIO; } if (uploadId == "") { AmazonS3CreateMultipartUpload startUpload(m_ai, m_object, m_log); if (!startUpload.SendRequest()) { m_log.Emsg("Write", "S3 multipart request failed"); + m_write_offset = -1; return -ENOENT; } std::string errMsg; @@ -240,6 +265,10 @@ ssize_t S3File::Write(const void *buffer, off_t offset, size_t size) { } m_write_op.reset(new AmazonS3SendMultipartPart(m_ai, m_object, m_log)); + { + std::lock_guard lk(m_pending_lk); + m_pending_ops.emplace_back(m_write_lk, m_write_op); + } // Calculate the size of the current chunk, if it's known. m_part_size = m_s3_part_size; @@ -271,8 +300,15 @@ ssize_t S3File::ContinueSendPart(const void *buffer, size_t size) { if (!m_write_op->SendRequest( std::string_view(static_cast(buffer), write_size), std::to_string(partNumber), uploadId, m_object_size, is_final)) { + m_write_offset = -1; + if (m_write_op->getErrorCode() == "E_TIMEOUT") { + m_log.Emsg("Write", "Timeout when uploading to S3"); + m_write_op.reset(); + return -ETIMEDOUT; + } m_log.Emsg("Write", "Upload to S3 failed: ", m_write_op->getErrorMessage().c_str()); + m_write_op.reset(); return -EIO; } if (is_final) { @@ -283,6 +319,8 @@ ssize_t S3File::ContinueSendPart(const void *buffer, size_t size) { if (startPos == std::string::npos) { m_log.Emsg("Write", "Result from S3 does not include ETag:", resultString.c_str()); + m_write_op.reset(); + m_write_offset = -1; return -EIO; } std::size_t endPos = resultString.find("\"", startPos + 7); @@ -290,6 +328,8 @@ ssize_t S3File::ContinueSendPart(const void *buffer, size_t size) { m_log.Emsg("Write", "Result from S3 does not include ETag end-character:", resultString.c_str()); + m_write_op.reset(); + m_write_offset = -1; return -EIO; } eTags.push_back( @@ -301,6 +341,67 @@ ssize_t S3File::ContinueSendPart(const void *buffer, size_t size) { return write_size; } +void S3File::LaunchMonitorThread() { + std::call_once(m_monitor_launch, [] { + std::thread t(S3File::CleanupTransfers); + t.detach(); + }); +} + +void S3File::CleanupTransfers() { + while (true) { + std::this_thread::sleep_for(HTTPRequest::GetStallTimeout() / 3); + try { + CleanupTransfersOnce(); + } catch (std::exception &exc) { + std::cerr << "Warning: caught unexpected exception when trying to " + "clean transfers: " + << exc.what() << std::endl; + } + } +} + +void S3File::CleanupTransfersOnce() { + // Make a list of live transfers; erase any dead ones still on the list. + std::vector, + std::shared_ptr>> + existing_ops; + { + std::lock_guard lk(m_pending_lk); + existing_ops.reserve(m_pending_ops.size()); + m_pending_ops.erase( + std::remove_if(m_pending_ops.begin(), m_pending_ops.end(), + [&](const auto &op) -> bool { + auto op_lk = op.first.lock(); + if (!op_lk) { + // In this case, the S3File is no longer open + // for write. No need to potentially clean + // up the transfer. + return true; + } + auto op_part = op.second.lock(); + if (!op_part) { + // In this case, the S3File object is still + // open for writes but the upload has + // completed. Remove from the list. + return true; + } + // The S3File is open and upload is in-progress; + // we'll tick the transfer. + existing_ops.emplace_back(op_lk, op_part); + return false; + }), + m_pending_ops.end()); + } + // For each live transfer, call `Tick` to advance the clock and possibly + // time things out. + auto now = std::chrono::steady_clock::now(); + for (auto &info : existing_ops) { + std::lock_guard lk(*info.first); + info.second->Tick(now); + } +} + int S3File::Close(long long *retsz) { // If we opened the object in create mode but did not actually write // anything, make a quick zero-length file. @@ -315,6 +416,7 @@ int S3File::Close(long long *retsz) { } } if (m_write_op) { + std::lock_guard lk(*m_write_lk); m_part_size = m_part_written; auto written = ContinueSendPart(nullptr, 0); if (written < 0) { @@ -375,6 +477,7 @@ XrdOss *XrdOssGetStorageSystem2(XrdOss *native_oss, XrdSysLogger *Logger, envP->Export("XRDXROOTD_NOPOSC", "1"); + S3File::LaunchMonitorThread(); try { AmazonRequest::Init(*log); g_s3_oss = new S3FileSystem(Logger, config_fn, envP); diff --git a/src/S3File.hh b/src/S3File.hh index 7bfb337..bb44b63 100644 --- a/src/S3File.hh +++ b/src/S3File.hh @@ -27,6 +27,8 @@ #include #include +#include +#include #include @@ -96,7 +98,21 @@ class S3File : public XrdOssDF { size_t getContentLength() { return content_length; } time_t getLastModified() { return last_modified; } + // Launch the global monitor thread associated with S3File objects. + // Currently, the monitor thread is used to cleanup in-progress file + // transfers that have been abandoned. + static void LaunchMonitorThread(); + private: + // Periodic cleanup of in-progress transfers. + // + // Iterates through the global list of pending multipart uploads + // that may be paused. For each, call `Tick` on the upload and + // see if the transfer has aborted. + static void CleanupTransfers(); + + static void CleanupTransfersOnce(); + ssize_t ContinueSendPart(const void *buffer, size_t size); XrdSysError &m_log; S3FileSystem *m_oss; @@ -121,6 +137,36 @@ class S3File : public XrdOssDF { -1}; // Expected size of the completed object; -1 if unknown. std::string uploadId; // For creates, upload ID as assigned by t std::vector eTags; - std::unique_ptr + + // The mutex protecting write activities. Writes must currently be + // serialized as we aggregate them into large operations and upload them to + // the S3 endpoint. The mutex prevents corruption of internal state. + // + // The periodic cleanup thread may decide to abort the in-progress transfer; + // to do so, it'll need a reference to this lock that is independent of the + // lifetime of the open file; hence, it's a shared pointer. + std::shared_ptr m_write_lk; + + // The in-progress operation for a multi-part upload; its lifetime may be + // spread across multiple write calls. + std::shared_ptr m_write_op; // The in-progress operation for a multi-part upload. + + // The multipart uploads represent an in-progress request and the global + // cleanup thread may decide to trigger a failure if the request does not + // advance after some time period. + // + // To do so, we must be able to lock the associated write mutex and then + // call `Tick` on the upload. To avoid prolonging the lifetime of the + // objects beyond the S3File, we hold onto a reference via a weak pointer. + // Mutable operations on this vector are protected by the `m_pending_lk`. + static std::vector, + std::weak_ptr>> + m_pending_ops; + + // Mutex protecting the m_pending_ops variable. + static std::mutex m_pending_lk; + + // Flag determining whether the monitoring thread has been launched. + static std::once_flag m_monitor_launch; }; diff --git a/test/s3_unit_tests.cc b/test/s3_unit_tests.cc index 1b0eba0..b13f14a 100644 --- a/test/s3_unit_tests.cc +++ b/test/s3_unit_tests.cc @@ -22,6 +22,7 @@ // #include "../src/S3Commands.hh" +#include "../src/S3File.hh" #include "../src/S3FileSystem.hh" #include "../src/shortfile.hh" @@ -427,6 +428,34 @@ TEST_F(FileSystemS3Fixture, UploadMultiPartUnaligned) { WritePattern("/test/write_large_1.txt", 100'000'000, 'a', 32'768); } +// Ensure that uploads timeout if no action occurs. +TEST_F(FileSystemS3Fixture, UploadStall) { + HTTPRequest::SetStallTimeout(std::chrono::milliseconds(200)); + S3File::LaunchMonitorThread(); + + XrdSysLogger log; + S3FileSystem fs(&log, m_configfn.c_str(), nullptr); + + std::unique_ptr fh(fs.newFile()); + ASSERT_TRUE(fh); + + XrdOucEnv env; + env.Put("oss.asize", std::to_string(16'384).c_str()); + auto rv = fh->Open("/test/write_stall.txt", O_CREAT | O_WRONLY, 0755, env); + ASSERT_EQ(rv, 0); + + ssize_t sizeToWrite = 4'096; + std::string writeBuffer(sizeToWrite, 'a'); + rv = fh->Write(writeBuffer.data(), 0, sizeToWrite); + ASSERT_EQ(rv, sizeToWrite); + + std::this_thread::sleep_for(HTTPRequest::GetStallTimeout() * 4 / 3 + + std::chrono::milliseconds(10)); + writeBuffer = std::string(sizeToWrite, 'b'); + rv = fh->Write(writeBuffer.data(), sizeToWrite, sizeToWrite); + ASSERT_EQ(rv, -ETIMEDOUT); +} + int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); From 4973bf44d4264f58f60da2fce8497d481f09016d Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Wed, 27 Nov 2024 15:52:11 -0600 Subject: [PATCH 08/11] Notify from the main curl worker loop After the notification is done, the request may be deleted by the owning S3File instance. Do not call `Notify` from within the curl result processing function as the request object needs to be alive to release the curl handle. --- src/CurlUtil.cc | 1 + src/HTTPCommands.cc | 3 --- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/src/CurlUtil.cc b/src/CurlUtil.cc index 8cc10f0..2b6b2fc 100644 --- a/src/CurlUtil.cc +++ b/src/CurlUtil.cc @@ -315,6 +315,7 @@ void CurlWorker::Run() { "Processing result from curl"); op->ProcessCurlResult(iter->first, res); op->ReleaseHandle(iter->first); + op->Notify(); running_handles -= 1; curl_multi_remove_handle(multi_handle, iter->first); if (res == CURLE_OK) { diff --git a/src/HTTPCommands.cc b/src/HTTPCommands.cc index c5c714e..9cdf2b5 100644 --- a/src/HTTPCommands.cc +++ b/src/HTTPCommands.cc @@ -639,9 +639,6 @@ void HTTPRequest::Notify() { HTTPRequest::CurlResult HTTPRequest::ProcessCurlResult(CURL *curl, CURLcode rv) { - auto cleaner = [&](void *) { Notify(); }; - auto unique = std::unique_ptr((void *)1, cleaner); - if (rv != 0) { if (errorCode.empty()) { errorCode = "E_CURL_IO"; From a8e588b78f9f620d9dc86a64a24e0ce76c9b28a3 Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Thu, 28 Nov 2024 11:54:34 -0600 Subject: [PATCH 09/11] Restore buffering mode for objects of unknown size If the client doesn't pre-declare the size of the object it will write, we don't know the size of the last part of the upload. Hence, we must switch back to buffer mode in this case. --- src/S3File.cc | 79 +++++++++++++++++++++++++++++++++---------- src/S3File.hh | 13 +++++++ test/s3_unit_tests.cc | 46 ++++++++++++++++++------- 3 files changed, 107 insertions(+), 31 deletions(-) diff --git a/src/S3File.cc b/src/S3File.cc index 405d05f..9bcf27c 100644 --- a/src/S3File.cc +++ b/src/S3File.cc @@ -247,6 +247,12 @@ ssize_t S3File::Write(const void *buffer, off_t offset, size_t size) { startUpload.Results(uploadId, errMsg); } + // If we don't know the final object size, we must use the streaming + // variant. + if (m_object_size == -1) { + return WriteStreaming(buffer, offset, size); + } + size_t written = 0; while (written != size) { if (m_write_op) { @@ -282,6 +288,41 @@ ssize_t S3File::Write(const void *buffer, off_t offset, size_t size) { return written; } +ssize_t S3File::WriteStreaming(const void *buffer, off_t offset, size_t size) { + m_streaming_buffer.append( + std::string_view(static_cast(buffer), size)); + m_write_offset += size; + + ssize_t rv = size; + if (m_streaming_buffer.size() > 100'000'000) { + rv = SendPartStreaming(); + } + return rv; +} + +ssize_t S3File::SendPartStreaming() { + int length = m_streaming_buffer.length(); + AmazonS3SendMultipartPart upload_part_request = + AmazonS3SendMultipartPart(m_ai, m_object, m_log); + if (!upload_part_request.SendRequest(m_streaming_buffer, + std::to_string(partNumber), uploadId, + m_streaming_buffer.size(), true)) { + m_log.Log(LogMask::Debug, "SendPart", "upload.SendRequest() failed"); + return -EIO; + } else { + m_log.Log(LogMask::Debug, "SendPart", "upload.SendRequest() succeeded"); + std::string resultString = upload_part_request.getResultString(); + std::size_t startPos = resultString.find("ETag:"); + std::size_t endPos = resultString.find("\"", startPos + 7); + eTags.push_back( + resultString.substr(startPos + 7, endPos - startPos - 7)); + partNumber++; + m_streaming_buffer.clear(); + } + + return length; +} + ssize_t S3File::ContinueSendPart(const void *buffer, size_t size) { m_part_written += size; auto write_size = size; @@ -408,20 +449,32 @@ int S3File::Close(long long *retsz) { if (m_create && !m_write_offset) { AmazonS3Upload upload(m_ai, m_object, m_log); if (!upload.SendRequest("")) { - m_log.Emsg("Close", "Failed to create zero-length file"); + m_log.Log(LogMask::Warning, "Close", + "Failed to create zero-length object"); return -ENOENT; } else { - m_log.Emsg("Open", "upload.SendRequest() succeeded"); + m_log.Log(LogMask::Debug, "Close", + "Creation of zero-length object succeeded"); return 0; } } - if (m_write_op) { + if (m_write_lk) { std::lock_guard lk(*m_write_lk); - m_part_size = m_part_written; - auto written = ContinueSendPart(nullptr, 0); - if (written < 0) { - m_log.Emsg("Close", "Failed to complete the last S3 upload"); - return -ENOENT; + if (m_object_size == -1 && !m_streaming_buffer.empty()) { + m_log.Emsg("Close", "Sending final part of length", + std::to_string(m_streaming_buffer.size()).c_str()); + auto rv = SendPartStreaming(); + if (rv < 0) { + return rv; + } + } else if (m_write_op) { + m_part_size = m_part_written; + auto written = ContinueSendPart(nullptr, 0); + if (written < 0) { + m_log.Log(LogMask::Warning, "Close", + "Failed to complete the last S3 upload"); + return -EIO; + } } } @@ -439,16 +492,6 @@ int S3File::Close(long long *retsz) { } return 0; - - /* Original write code - std::string payload((char *)buffer, size); - if (!upload.SendRequest(payload, offset, size)) { - m_log.Emsg("Open", "upload.SendRequest() failed"); - return -ENOENT; - } else { - m_log.Emsg("Open", "upload.SendRequest() succeeded"); - return 0; - } */ } extern "C" { diff --git a/src/S3File.hh b/src/S3File.hh index bb44b63..abd69ee 100644 --- a/src/S3File.hh +++ b/src/S3File.hh @@ -111,8 +111,18 @@ class S3File : public XrdOssDF { // see if the transfer has aborted. static void CleanupTransfers(); + // Single cleanup run for in-progress transfers. static void CleanupTransfersOnce(); + // Write data while in "streaming mode" where we don't know the + // ultimate size of the file (and hence can't start streaming + // partitions immediately). + ssize_t WriteStreaming(const void *buffer, off_t offset, size_t size); + + // Send a fully-buffered part of the file; only used while in + // "streaming" mode. + ssize_t SendPartStreaming(); + ssize_t ContinueSendPart(const void *buffer, size_t size); XrdSysError &m_log; S3FileSystem *m_oss; @@ -137,6 +147,9 @@ class S3File : public XrdOssDF { -1}; // Expected size of the completed object; -1 if unknown. std::string uploadId; // For creates, upload ID as assigned by t std::vector eTags; + // When using the "streaming mode", the upload part has to be completely + // buffered within the S3File object; this is the current buffer. + std::string m_streaming_buffer; // The mutex protecting write activities. Writes must currently be // serialized as we aggregate them into large operations and upload them to diff --git a/test/s3_unit_tests.cc b/test/s3_unit_tests.cc index b13f14a..ca37714 100644 --- a/test/s3_unit_tests.cc +++ b/test/s3_unit_tests.cc @@ -293,7 +293,8 @@ s3.end public: void WritePattern(const std::string &name, const off_t writeSize, - const unsigned char chunkByte, const size_t chunkSize) { + const unsigned char chunkByte, const size_t chunkSize, + bool known_size) { XrdSysLogger log; S3FileSystem fs(&log, m_configfn.c_str(), nullptr); @@ -301,7 +302,11 @@ s3.end ASSERT_TRUE(fh); XrdOucEnv env; - env.Put("oss.asize", std::to_string(writeSize).c_str()); + // Only set oss.asize for test cases where we want the server to know + // the final size. + if (known_size) { + env.Put("oss.asize", std::to_string(writeSize).c_str()); + } auto rv = fh->Open(name.c_str(), O_CREAT | O_WRONLY, 0755, env); ASSERT_EQ(rv, 0); @@ -371,61 +376,76 @@ s3.end // Upload a single byte into S3 TEST_F(FileSystemS3Fixture, UploadOneByte) { - WritePattern("/test/write_one.txt", 1, 'X', 32 * 1024); + WritePattern("/test/write_one.txt", 1, 'X', 32 * 1024, true); + WritePattern("/test/write_one_stream.txt", 1, 'X', 32 * 1024, false); } // Upload across multiple calls, single part TEST_F(FileSystemS3Fixture, UploadMultipleCalls) { - WritePattern("/test/write_alphabet.txt", 26, 'a', 1); + WritePattern("/test/write_alphabet.txt", 26, 'a', 1, true); + WritePattern("/test/write_alphabet_stream.txt", 26, 'a', 1, false); } // Upload a zero-byte object TEST_F(FileSystemS3Fixture, UploadZero) { - WritePattern("/test/write_zero.txt", 0, 'X', 32 * 1024); + WritePattern("/test/write_zero.txt", 0, 'X', 32 * 1024, true); + WritePattern("/test/write_zero_stream.txt", 0, 'X', 32 * 1024, false); } // Upload larger - two chunks. TEST_F(FileSystemS3Fixture, UploadTwoChunks) { - WritePattern("/test/write_two_chunks.txt", 1'024 + 42, 'a', 1'024); + WritePattern("/test/write_two_chunks.txt", 1'024 + 42, 'a', 1'024, true); + WritePattern("/test/write_two_chunks_stream.txt", 1'024 + 42, 'a', 1'024, + false); } // Upload larger - a few chunks. TEST_F(FileSystemS3Fixture, UploadMultipleChunks) { WritePattern("/test/write_multi_chunks.txt", (10'000 / 1'024) * 1'024 + 42, - 'a', 1'024); + 'a', 1'024, true); + WritePattern("/test/write_multi_chunks_stream.txt", + (10'000 / 1'024) * 1'024 + 42, 'a', 1'024, false); } // Upload across multiple parts, not aligned to partition. TEST_F(FileSystemS3Fixture, UploadLarge) { WritePattern("/test/write_large_1.txt", - (100'000'000 / 1'310'720) * 1'310'720 + 42, 'a', 1'310'720); + (100'000'000 / 1'310'720) * 1'310'720 + 42, 'a', 1'310'720, + true); + WritePattern("/test/write_large_1_stream.txt", + (100'000'000 / 1'310'720) * 1'310'720 + 42, 'a', 1'310'720, + false); } // Upload a file into S3 that's the same size as the partition size TEST_F(FileSystemS3Fixture, UploadLargePart) { - WritePattern("/test/write_large_2.txt", 100'000'000, 'a', 131'072); + WritePattern("/test/write_large_2.txt", 100'000'000, 'a', 131'072, true); + WritePattern("/test/write_large_2_stream.txt", 100'000'000, 'a', 131'072, + false); } // Upload a small file where the partition size is aligned with the chunk size TEST_F(FileSystemS3Fixture, UploadSmallAligned) { - WritePattern("/test/write_large_3.txt", 1'000, 'a', 1'000); + WritePattern("/test/write_large_3.txt", 1'000, 'a', 1'000, true); } // Upload a file into S3 that's the same size as the partition size, using // chunks that align with the partition size TEST_F(FileSystemS3Fixture, UploadLargePartAligned) { - WritePattern("/test/write_large_4.txt", 100'000'000, 'a', 1'000'000); + WritePattern("/test/write_large_4.txt", 100'000'000, 'a', 1'000'000, true); } // Upload a file into S3 resulting in multiple partitions TEST_F(FileSystemS3Fixture, UploadMultiPartAligned) { - WritePattern("/test/write_large_5.txt", 100'000'000, 'a', 1'000'000); + WritePattern("/test/write_large_5.txt", 100'000'000, 'a', 1'000'000, true); } // Upload a file into S3 resulting in multiple partitioned using not-aligned // chunks TEST_F(FileSystemS3Fixture, UploadMultiPartUnaligned) { - WritePattern("/test/write_large_1.txt", 100'000'000, 'a', 32'768); + WritePattern("/test/write_large_1.txt", 100'000'000, 'a', 32'768, true); + WritePattern("/test/write_large_1_stream.txt", 100'000'000, 'a', 32'768, + false); } // Ensure that uploads timeout if no action occurs. From 10dea9b5a9da7c81d87d176d777bf746e6a25f1f Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Fri, 29 Nov 2024 19:50:42 -0600 Subject: [PATCH 10/11] Small file upload optimization If the entire object is uploaded during a single `Write` call, then skip the multipart upload and just do a single non-buffered upload. --- src/S3Commands.cc | 2 +- src/S3Commands.hh | 2 +- src/S3File.cc | 19 +++++++++++++++++++ 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/src/S3Commands.cc b/src/S3Commands.cc index a6d9db9..a8b0848 100644 --- a/src/S3Commands.cc +++ b/src/S3Commands.cc @@ -476,7 +476,7 @@ bool AmazonRequest::SendS3Request(const std::string_view payload, AmazonS3Upload::~AmazonS3Upload() {} -bool AmazonS3Upload::SendRequest(const std::string &payload) { +bool AmazonS3Upload::SendRequest(const std::string_view &payload) { httpVerb = "PUT"; return SendS3Request(payload, payload.size(), true); } diff --git a/src/S3Commands.hh b/src/S3Commands.hh index d8519d8..3e72d2e 100644 --- a/src/S3Commands.hh +++ b/src/S3Commands.hh @@ -158,7 +158,7 @@ class AmazonS3Upload final : public AmazonRequest { virtual ~AmazonS3Upload(); - bool SendRequest(const std::string &payload); + bool SendRequest(const std::string_view &payload); protected: std::string path; diff --git a/src/S3File.cc b/src/S3File.cc index 9bcf27c..fa6a56f 100644 --- a/src/S3File.cc +++ b/src/S3File.cc @@ -224,6 +224,25 @@ ssize_t S3File::Write(const void *buffer, off_t offset, size_t size) { } std::lock_guard lk(*write_mutex); + // Small object optimization -- if this is the full object, upload + // it immediately. + if (!m_write_offset && m_object_size == static_cast(size)) { + AmazonS3Upload upload(m_ai, m_object, m_log); + m_write_lk.reset(); + if (!upload.SendRequest( + std::string_view(static_cast(buffer), size))) { + m_log.Log(LogMask::Warning, "Write", + "Failed to create small object"); + return -EIO; + } else { + m_write_offset += size; + m_log.Log(LogMask::Debug, "Write", + "Creation of small object succeeded", + std::to_string(size).c_str()); + return size; + } + } + if (offset != m_write_offset) { m_log.Emsg( "Write", From 4ca389588574e709acc64e7a96e305f33e6a1275 Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Mon, 2 Dec 2024 16:56:36 -0600 Subject: [PATCH 11/11] Correctly refactor unit tests The unit test refactoring left copy/pasted code. This commit splits the common piece into a single header, allowign `s3_tests.cc` with the unit tests that utilize AWS S3 while `s3_unit_tests.cc` use the minio instance started up by ctest. --- src/CurlUtil.cc | 11 ++- test/s3_tests.cc | 38 +-------- test/s3_tests_common.hh | 64 ++++++++++++++ test/s3_unit_tests.cc | 183 +--------------------------------------- 4 files changed, 75 insertions(+), 221 deletions(-) create mode 100644 test/s3_tests_common.hh diff --git a/src/CurlUtil.cc b/src/CurlUtil.cc index 2b6b2fc..66b7ee8 100644 --- a/src/CurlUtil.cc +++ b/src/CurlUtil.cc @@ -201,6 +201,13 @@ void CurlWorker::Run() { std::vector waitfds; waitfds.resize(2); + // The `curl_multi_wait` call in the event loop needs to be interrupted when + // additional work comes into one of the two queues (either the global queue + // or the per-worker unpause queue). To do this, the queue objects will + // write to a file descriptor when a new HTTP request is ready; we add these + // FDs to the list of FDs for libcurl to poll in order to trigger a wakeup. + // The `Consume`/`TryConsume` methods will have a side-effect of reading + // from the pipe if a request is available. waitfds[0].fd = queue.PollFD(); waitfds[0].events = CURL_WAIT_POLLIN; waitfds[0].revents = 0; @@ -237,9 +244,9 @@ void CurlWorker::Run() { } } catch (...) { m_logger.Log(LogMask::Debug, "Run", - "Unable to setup the curl handle"); + "Unable to set up the curl handle"); op->Fail("E_NOMEM", - "Failed to setup the curl handle for the operation"); + "Failed to set up the curl handle for the operation"); continue; } m_op_map[curl] = op; diff --git a/test/s3_tests.cc b/test/s3_tests.cc index 099b334..66c3892 100644 --- a/test/s3_tests.cc +++ b/test/s3_tests.cc @@ -18,11 +18,10 @@ #include "../src/S3Commands.hh" #include "../src/S3FileSystem.hh" -#include "../src/shortfile.hh" +#include "s3_tests_common.hh" #include #include -#include #include class TestAmazonRequest : public AmazonRequest { @@ -65,41 +64,6 @@ TEST(TestS3URLGeneration, Test1) { ASSERT_EQ(generatedHostUrl, "https://s3-service.com:443/test-object"); } -class FileSystemFixtureBase : public testing::Test { - protected: - FileSystemFixtureBase() - : m_log(new XrdSysLogger(2, 0)) // Log to stderr, no log rotation - {} - - void SetUp() override { - setenv("XRDINSTANCE", "xrootd", 1); - char tmp_configfn[] = "/tmp/xrootd-s3-gtest.cfg.XXXXXX"; - auto result = mkstemp(tmp_configfn); - ASSERT_NE(result, -1) << "Failed to create temp file (" - << strerror(errno) << ", errno=" << errno << ")"; - m_configfn = std::string(tmp_configfn); - - auto contents = GetConfig(); - ASSERT_FALSE(contents.empty()); - ASSERT_TRUE(writeShortFile(m_configfn, contents, 0)) - << "Failed to write to temp file (" << strerror(errno) - << ", errno=" << errno << ")"; - } - - void TearDown() override { - if (!m_configfn.empty()) { - auto rv = unlink(m_configfn.c_str()); - ASSERT_EQ(rv, 0) << "Failed to delete temp file (" - << strerror(errno) << ", errno=" << errno << ")"; - } - } - - virtual std::string GetConfig() = 0; - - std::string m_configfn; - std::unique_ptr m_log; -}; - class FileSystemS3VirtualBucket : public FileSystemFixtureBase { protected: virtual std::string GetConfig() override { diff --git a/test/s3_tests_common.hh b/test/s3_tests_common.hh new file mode 100644 index 0000000..d5e797c --- /dev/null +++ b/test/s3_tests_common.hh @@ -0,0 +1,64 @@ +/*************************************************************** + * + * Copyright (C) 2024, Pelican Project, Morgridge Institute for Research + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You may + * obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ***************************************************************/ + +#include "../src/shortfile.hh" + +#include +#include + +#include +#include + +#include +#include +#include + +class FileSystemFixtureBase : public testing::Test { + protected: + FileSystemFixtureBase() + : m_log(new XrdSysLogger(2, 0)) // Log to stderr, no log rotation + {} + + void SetUp() override { + setenv("XRDINSTANCE", "xrootd", 1); + char tmp_configfn[] = "/tmp/xrootd-s3-gtest.cfg.XXXXXX"; + auto result = mkstemp(tmp_configfn); + ASSERT_NE(result, -1) << "Failed to create temp file (" + << strerror(errno) << ", errno=" << errno << ")"; + m_configfn = std::string(tmp_configfn); + + auto contents = GetConfig(); + ASSERT_FALSE(contents.empty()); + ASSERT_TRUE(writeShortFile(m_configfn, contents, 0)) + << "Failed to write to temp file (" << strerror(errno) + << ", errno=" << errno << ")"; + } + + void TearDown() override { + if (!m_configfn.empty()) { + auto rv = unlink(m_configfn.c_str()); + ASSERT_EQ(rv, 0) << "Failed to delete temp file (" + << strerror(errno) << ", errno=" << errno << ")"; + } + } + + virtual std::string GetConfig() = 0; + + std::string m_configfn; + std::unique_ptr m_log; +}; diff --git a/test/s3_unit_tests.cc b/test/s3_unit_tests.cc index ca37714..d09bab4 100644 --- a/test/s3_unit_tests.cc +++ b/test/s3_unit_tests.cc @@ -24,11 +24,10 @@ #include "../src/S3Commands.hh" #include "../src/S3File.hh" #include "../src/S3FileSystem.hh" -#include "../src/shortfile.hh" +#include "s3_tests_common.hh" #include #include -#include #include #include @@ -71,186 +70,6 @@ void parseEnvFile(const std::string &fname) { } } -class TestAmazonRequest : public AmazonRequest { - public: - XrdSysLogger log{}; - XrdSysError err{&log, "TestS3CommandsLog"}; - - TestAmazonRequest(const std::string &url, const std::string &akf, - const std::string &skf, const std::string &bucket, - const std::string &object, const std::string &path, - int sigVersion) - : AmazonRequest(url, akf, skf, bucket, object, path, sigVersion, err) {} - - // For getting access to otherwise-protected members - std::string getHostUrl() const { return hostUrl; } -}; - -TEST(TestS3URLGeneration, Test1) { - const std::string serviceUrl = "https://s3-service.com:443"; - const std::string b = "test-bucket"; - const std::string o = "test-object"; - - // Test path-style URL generation - TestAmazonRequest pathReq{serviceUrl, "akf", "skf", b, o, "path", 4}; - std::string generatedHostUrl = pathReq.getHostUrl(); - ASSERT_EQ(generatedHostUrl, - "https://s3-service.com:443/test-bucket/test-object"); - - // Test virtual-style URL generation - TestAmazonRequest virtReq{serviceUrl, "akf", "skf", b, o, "virtual", 4}; - generatedHostUrl = virtReq.getHostUrl(); - ASSERT_EQ(generatedHostUrl, - "https://test-bucket.s3-service.com:443/test-object"); - - // Test path-style with empty bucket (which we use for exporting an entire - // endpoint) - TestAmazonRequest pathReqNoBucket{serviceUrl, "akf", "skf", "", - o, "path", 4}; - generatedHostUrl = pathReqNoBucket.getHostUrl(); - ASSERT_EQ(generatedHostUrl, "https://s3-service.com:443/test-object"); -} - -class FileSystemFixtureBase : public testing::Test { - protected: - FileSystemFixtureBase() - : m_log(new XrdSysLogger(2, 0)) // Log to stderr, no log rotation - {} - - void SetUp() override { - - setenv("XRDINSTANCE", "xrootd", 1); - char tmp_configfn[] = "/tmp/xrootd-s3-gtest.cfg.XXXXXX"; - auto result = mkstemp(tmp_configfn); - ASSERT_NE(result, -1) << "Failed to create temp file (" - << strerror(errno) << ", errno=" << errno << ")"; - m_configfn = std::string(tmp_configfn); - - auto contents = GetConfig(); - ASSERT_FALSE(contents.empty()); - ASSERT_TRUE(writeShortFile(m_configfn, contents, 0)) - << "Failed to write to temp file (" << strerror(errno) - << ", errno=" << errno << ")"; - } - - void TearDown() override { - if (!m_configfn.empty()) { - auto rv = unlink(m_configfn.c_str()); - ASSERT_EQ(rv, 0) << "Failed to delete temp file (" - << strerror(errno) << ", errno=" << errno << ")"; - } - } - - virtual std::string GetConfig() = 0; - - std::string m_configfn; - std::unique_ptr m_log; -}; - -class FileSystemS3VirtualBucket : public FileSystemFixtureBase { - protected: - virtual std::string GetConfig() override { - return R"( -s3.begin -s3.path_name /test -s3.bucket_name genome-browser -s3.service_name s3.amazonaws.com -s3.region us-east-1 -s3.service_url https://s3.us-east-1.amazonaws.com -s3.url_style virtual -s3.end -)"; - } -}; - -class FileSystemS3VirtualNoBucket : public FileSystemFixtureBase { - protected: - virtual std::string GetConfig() override { - return R"( -s3.begin -s3.path_name /test -s3.service_name s3.amazonaws.com -s3.region us-east-1 -s3.service_url https://s3.us-east-1.amazonaws.com -s3.url_style virtual -s3.end -)"; - } -}; - -class FileSystemS3PathBucket : public FileSystemFixtureBase { - protected: - virtual std::string GetConfig() override { - return R"( -s3.begin -s3.path_name /test -s3.service_name s3.amazonaws.com -s3.region us-east-1 -s3.bucket_name genome-browser -s3.service_url https://s3.us-east-1.amazonaws.com -s3.url_style path -s3.end -)"; - } -}; - -class FileSystemS3PathNoBucket : public FileSystemFixtureBase { - protected: - virtual std::string GetConfig() override { - return R"( -s3.begin -s3.path_name /test -s3.service_name s3.amazonaws.com -s3.region us-east-1 -s3.service_url https://s3.us-east-1.amazonaws.com -s3.url_style path -s3.end -)"; - } -}; - -// Regression test for when the service_url ends in a `/` -class FileSystemS3PathBucketSlash : public FileSystemFixtureBase { - protected: - virtual std::string GetConfig() override { - return R"( -s3.begin -s3.path_name /test -s3.service_name s3.amazonaws.com -s3.region us-east-1 -s3.bucket_name genome-browser -s3.service_url https://s3.us-east-1.amazonaws.com/ -s3.url_style path -s3.end -)"; - } -}; - -TEST_F(FileSystemS3VirtualBucket, Create) { - EXPECT_NO_THROW( - { S3FileSystem fs(m_log.get(), m_configfn.c_str(), nullptr); }); -} - -TEST_F(FileSystemS3VirtualNoBucket, Create) { - EXPECT_NO_THROW( - { S3FileSystem fs(m_log.get(), m_configfn.c_str(), nullptr); }); -} - -TEST_F(FileSystemS3PathBucket, Create) { - EXPECT_NO_THROW( - { S3FileSystem fs(m_log.get(), m_configfn.c_str(), nullptr); }); -} - -TEST_F(FileSystemS3PathNoBucket, Create) { - EXPECT_NO_THROW( - { S3FileSystem fs(m_log.get(), m_configfn.c_str(), nullptr); }); -} - -TEST_F(FileSystemS3PathBucketSlash, Create) { - EXPECT_NO_THROW( - { S3FileSystem fs(m_log.get(), m_configfn.c_str(), nullptr); }); -} - // Tests where we query S3 test fixture class FileSystemS3Fixture : public FileSystemFixtureBase { void SetUp() override {