Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Perf] Replaced the S3 connector implementation to use the AWS transfer manager #4272

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
02f830b
[Perf] Replaced the S3 connector implementation to use the AWS transf…
paul-amonson Mar 13, 2023
a199be7
[Fix] Format fixes via check.py.
paul-amonson Mar 13, 2023
6f5b82b
[Fix] Coding style issues.
paul-amonson Mar 15, 2023
5a6ae13
[Refactor,Fix] Fixed a couple review issues and refactored the Transf…
paul-amonson Mar 16, 2023
0b6842e
Merge branch 'facebookincubator:main' into improve_s3_performance
paul-amonson Mar 20, 2023
1729fb5
[Fix] Proper log for long S3 waits.
paul-amonson Mar 20, 2023
e4e1457
Merge branch 'facebookincubator:main' into improve_s3_performance
paul-amonson Mar 21, 2023
4db7e49
Merge branch 'facebookincubator:main' into improve_s3_performance
paul-amonson Mar 30, 2023
91d5d63
[Fix] Attempt to fix CI.
paul-amonson Mar 31, 2023
15889d6
Merge branch 'facebookincubator:main' into improve_s3_performance
paul-amonson Apr 3, 2023
8dccd52
[Fix] Add missing folder not created by new AWS SDK.
paul-amonson Apr 3, 2023
f93dae4
Merge branch 'facebookincubator:main' into improve_s3_performance
paul-amonson Apr 3, 2023
e65b922
Merge branch 'facebookincubator:main' into improve_s3_performance
paul-amonson Apr 4, 2023
9e397c8
[Fix] Fixed a source problem causing UT failures.
paul-amonson Apr 4, 2023
4fc8a5c
Merge branch 'facebookincubator:main' into improve_s3_performance
paul-amonson Apr 6, 2023
ced8a1a
[Fix] Addressed review comments.
paul-amonson Apr 6, 2023
e9e638b
[Fix] Formatting.
paul-amonson Apr 6, 2023
d1fd64b
Merge branch 'facebookincubator:main' into improve_s3_performance
paul-amonson Apr 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ if(VELOX_ENABLE_S3)
if(AWSSDK_ROOT_DIR)
set(CMAKE_PREFIX_PATH ${AWSSDK_ROOT_DIR})
endif()
find_package(AWSSDK REQUIRED COMPONENTS s3;identity-management)
find_package(AWSSDK REQUIRED COMPONENTS s3;identity-management;transfer)
add_definitions(-DVELOX_ENABLE_S3)
endif()

Expand Down
5 changes: 3 additions & 2 deletions scripts/setup-adapters.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ DEPENDENCY_DIR=${DEPENDENCY_DIR:-$(pwd)}

function install_aws-sdk-cpp {
local AWS_REPO_NAME="aws/aws-sdk-cpp"
local AWS_SDK_VERSION="1.9.96"
local AWS_SDK_VERSION="1.11.37"

github_checkout $AWS_REPO_NAME $AWS_SDK_VERSION --depth 1 --recurse-submodules
cmake_install -DCMAKE_BUILD_TYPE=Debug -DBUILD_SHARED_LIBS:BOOL=OFF -DMINIMIZE_SIZE:BOOL=ON -DENABLE_TESTING:BOOL=OFF -DBUILD_ONLY:STRING="s3;identity-management" -DCMAKE_INSTALL_PREFIX="${DEPENDENCY_DIR}/install"
cmake_install -DCMAKE_BUILD_TYPE=Debug -DBUILD_SHARED_LIBS:BOOL=OFF -DMINIMIZE_SIZE:BOOL=ON -DENABLE_TESTING:BOOL=OFF -DBUILD_ONLY:STRING="s3;identity-management;transfer" -DCMAKE_INSTALL_PREFIX="${DEPENDENCY_DIR}/install"
mkdir -p /root/adapter-deps/install/bin
}

function install_libhdfs3 {
Expand Down
125 changes: 88 additions & 37 deletions velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,48 +21,46 @@

#include <fmt/format.h>
#include <glog/logging.h>
#include <chrono>
#include <fstream>
#include <memory>
#include <stdexcept>

#include <aws/core/Aws.h>
#include <aws/core/auth/AWSCredentialsProviderChain.h>
#include <aws/core/http/HttpResponse.h>
#include <aws/core/utils/StringUtils.h>
#include <aws/core/utils/logging/ConsoleLogSystem.h>
#include <aws/core/utils/memory/AWSMemory.h>
#include <aws/core/utils/memory/stl/AWSStreamFwd.h>
#include <aws/core/utils/stream/PreallocatedStreamBuf.h>
#include <aws/core/utils/threading/Executor.h>
#include <aws/identity-management/auth/STSAssumeRoleCredentialsProvider.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/model/GetObjectRequest.h>
#include <aws/s3/model/HeadObjectRequest.h>
#include <aws/transfer/TransferHandle.h>
#include <aws/transfer/TransferManager.h>

namespace facebook::velox {
namespace {
// Reference: https://issues.apache.org/jira/browse/ARROW-8692
// https://github.com/apache/arrow/blob/master/cpp/src/arrow/filesystem/s3fs.cc#L843
// A non-copying iostream. See
// https://stackoverflow.com/questions/35322033/aws-c-sdk-uploadpart-times-out
// https://stackoverflow.com/questions/13059091/creating-an-input-stream-from-constant-memory
class StringViewStream : Aws::Utils::Stream::PreallocatedStreamBuf,
public std::iostream {
public:
StringViewStream(const void* data, int64_t nbytes)
: Aws::Utils::Stream::PreallocatedStreamBuf(
reinterpret_cast<unsigned char*>(const_cast<void*>(data)),
static_cast<size_t>(nbytes)),
std::iostream(this) {}
};

// By default, the AWS SDK reads object data into an auto-growing StringStream.
// To avoid copies, read directly into a pre-allocated buffer instead.
// See https://github.com/aws/aws-sdk-cpp/issues/64 for an alternative but
paul-amonson marked this conversation as resolved.
Show resolved Hide resolved
// functionally similar recipe.
Aws::IOStreamFactory AwsWriteableStreamFactory(void* data, int64_t nbytes) {
return [=]() { return Aws::New<StringViewStream>("", data, nbytes); };
}
class UnderlyingStreamWrapper : public Aws::IOStream {
public:
explicit UnderlyingStreamWrapper(std::streambuf* buf) : Aws::IOStream(buf) {}
~UnderlyingStreamWrapper() override = default;
};

class S3ReadFile final : public ReadFile {
public:
S3ReadFile(const std::string& path, Aws::S3::S3Client* client)
: client_(client) {
S3ReadFile(
const std::string& path,
std::shared_ptr<Aws::S3::S3Client> client,
const std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>&
executor)
: client_(client), executor_(executor) {
bucketAndKeyFromS3Path(path, bucket_, key_);
}

Expand All @@ -83,6 +81,9 @@ class S3ReadFile final : public ReadFile {
outcome, "Failed to get metadata for S3 object", bucket_, key_);
length_ = outcome.GetResult().GetContentLength();
VELOX_CHECK_GE(length_, 0);
Aws::Transfer::TransferManagerConfiguration transferConfig(executor_.get());
transferConfig.s3Client = client_;
transferManager_ = Aws::Transfer::TransferManager::Create(transferConfig);
}

std::string_view pread(uint64_t offset, uint64_t length, void* buffer)
Expand Down Expand Up @@ -151,30 +152,56 @@ class S3ReadFile final : public ReadFile {
// bytes.
void preadInternal(uint64_t offset, uint64_t length, char* position) const {
// Read the desired range of bytes.
Aws::S3::Model::GetObjectRequest request;
Aws::S3::Model::GetObjectResult result;

request.SetBucket(awsString(bucket_));
request.SetKey(awsString(key_));
std::stringstream ss;
ss << "bytes=" << offset << "-" << offset + length - 1;
request.SetRange(awsString(ss.str()));
request.SetResponseStreamFactory(
AwsWriteableStreamFactory(position, length));
auto outcome = client_->GetObject(request);
VELOX_CHECK_AWS_OUTCOME(outcome, "Failed to get S3 object", bucket_, key_);
Aws::Utils::Stream::PreallocatedStreamBuf streamBuffer(
(unsigned char*)position, length);
auto downloadHandle =
transferManager_->DownloadFile(bucket_, key_, offset, length, [&]() {
return Aws::New<UnderlyingStreamWrapper>("TestTag", &streamBuffer);
});
int count = 1;
using namespace std::chrono;
uint64_t startTime =
duration_cast<milliseconds>(system_clock::now().time_since_epoch())
.count();
uint64_t now = startTime;
uint64_t logAt = startTime + 5000; // wait 5 seconds; then warn and backoff
auto status = downloadHandle->GetStatus();
while (status == Aws::Transfer::TransferStatus::NOT_STARTED ||
status == Aws::Transfer::TransferStatus::IN_PROGRESS) {
if (now > logAt) {
VLOG(1) << "S3 Request is taking a long time: "
<< (now - startTime) / 1000 << "s!\n";
count++;
logAt += count * 5000; // backoff for next warning
}
status = downloadHandle->GetStatus();
now = duration_cast<milliseconds>(system_clock::now().time_since_epoch())
paul-amonson marked this conversation as resolved.
Show resolved Hide resolved
.count();
}
if (downloadHandle->GetStatus() != Aws::Transfer::TransferStatus::COMPLETED)
paul-amonson marked this conversation as resolved.
Show resolved Hide resolved
VELOX_FAIL(
"{} from location: {}:{}",
"Failed to get S3 object using the transfer manager",
bucket_,
key_);
}

Aws::S3::S3Client* client_;
std::shared_ptr<Aws::S3::S3Client> client_;
std::shared_ptr<Aws::Transfer::TransferManager> transferManager_;
std::string bucket_;
std::string key_;
int64_t length_ = -1;
std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor> executor_;
};

} // namespace

namespace filesystems {

class S3Config {
public: // Constants
static const char* kS3TransferManagerMaxThreadsConfig;

public:
S3Config(const Config* config) : config_(config) {}

Expand Down Expand Up @@ -222,6 +249,10 @@ class S3Config {
"hive.s3.iam-role-session-name", std::string("velox-session"));
}

int getTranferManagerMaxThreads() const {
return config_->get<int>(kS3TransferManagerMaxThreadsConfig, 25);
}

Aws::Utils::Logging::LogLevel getLogLevel() const {
auto level = config_->get("hive.s3.log-level", std::string("FATAL"));
// Convert to upper case.
Expand Down Expand Up @@ -251,6 +282,9 @@ class S3Config {
const Config* FOLLY_NONNULL config_;
};

const char* S3Config::kS3TransferManagerMaxThreadsConfig =
"hive.s3.transfer-manager-max-threads";

class S3FileSystem::Impl {
public:
Impl(const Config* config) : s3Config_(config) {
Expand All @@ -267,6 +301,9 @@ class S3FileSystem::Impl {
// log a message.
awsOptions.httpOptions.installSigPipeHandler = true;
Aws::InitAPI(awsOptions);
S3FileSystem::Impl::executor_ =
std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(
s3Config_.getTranferManagerMaxThreads());
}
}

Expand All @@ -275,6 +312,7 @@ class S3FileSystem::Impl {
if (newCount == 0) {
Aws::SDKOptions awsOptions;
awsOptions.loggingOptions.logLevel = s3Config_.getLogLevel();
client_.reset();
Aws::ShutdownAPI(awsOptions);
}
}
Expand Down Expand Up @@ -366,8 +404,17 @@ class S3FileSystem::Impl {
// Make it clear that the S3FileSystem instance owns the S3Client.
// Once the S3FileSystem is destroyed, the S3Client fails to work
// due to the Aws::ShutdownAPI invocation in the destructor.
Aws::S3::S3Client* s3Client() const {
return client_.get();
const std::shared_ptr<Aws::S3::S3Client>& s3Client() const {
return client_;
}

const S3Config& s3Config() const {
return s3Config_;
}

const std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>&
getExecutor() const {
return S3FileSystem::Impl::executor_;
}

std::string getLogLevelName() const {
Expand All @@ -378,9 +425,12 @@ class S3FileSystem::Impl {
const S3Config s3Config_;
std::shared_ptr<Aws::S3::S3Client> client_;
static std::atomic<size_t> initCounter_;
static std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor> executor_;
};

std::atomic<size_t> S3FileSystem::Impl::initCounter_(0);
std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>
S3FileSystem::Impl::executor_(nullptr);
folly::once_flag S3FSInstantiationFlag;

S3FileSystem::S3FileSystem(std::shared_ptr<const Config> config)
Expand All @@ -398,7 +448,8 @@ std::string S3FileSystem::getLogLevelName() const {

std::unique_ptr<ReadFile> S3FileSystem::openFileForRead(std::string_view path) {
const std::string file = s3Path(path);
auto s3file = std::make_unique<S3ReadFile>(file, impl_->s3Client());
auto s3file = std::make_unique<S3ReadFile>(
file, impl_->s3Client(), impl_->getExecutor());
s3file->initialize();
return s3file;
}
Expand Down