Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add custom multi part upload fs #152

Merged
merged 12 commits into from
Oct 10, 2024
19 changes: 15 additions & 4 deletions cpp/benchmark/benchmark_packed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <arrow/array/builder_primitive.h>
#include <arrow/util/key_value_metadata.h>
#include "filesystem/fs.h"
#include "common/config.h"

#define SKIP_IF_NOT_OK(status, st) \
if (!status.ok()) { \
Expand All @@ -50,20 +51,27 @@ class S3Fixture : public benchmark::Fixture {
const char* secret_key = std::getenv(kEnvSecretKey);
const char* endpoint_url = std::getenv(kEnvS3EndpointUrl);
const char* file_path = std::getenv(kEnvFilePath);
std::string uri = "file:///tmp";
auto conf = StorageConfig();
conf.uri = "file:///tmp/";
if (access_key != nullptr && secret_key != nullptr && endpoint_url != nullptr && file_path != nullptr) {
uri = endpoint_url;
conf.uri = std::string(endpoint_url);
conf.access_key_id = std::string(access_key);
conf.access_key_value = std::string(secret_key);
conf.file_path = std::string(file_path);
}
storage_config_ = std::move(conf);

auto base = std::string();
auto factory = std::make_shared<FileSystemFactory>();
auto result = factory->BuildFileSystem(uri, &base);
auto result = factory->BuildFileSystem(conf, &base);
if (!result.ok()) {
state.SkipWithError("Failed to build file system!");
}
fs_ = std::move(result).value();
}

std::shared_ptr<arrow::fs::FileSystem> fs_;
StorageConfig storage_config_;
};

static void PackedRead(benchmark::State& st, arrow::fs::FileSystem* fs, const std::string& path, size_t buffer_size) {
Expand Down Expand Up @@ -123,7 +131,10 @@ static void PackedWrite(benchmark::State& st, arrow::fs::FileSystem* fs, const s
auto record_batch = arrow::RecordBatch::Make(schema, 3, arrays);

for (auto _ : st) {
PackedRecordBatchWriter writer(buffer_size, schema, *fs, path, *parquet::default_writer_properties());
auto conf = StorageConfig();
conf.use_custom_part_upload_size = true;
conf.part_size = 30 * 1024 * 1024;
PackedRecordBatchWriter writer(buffer_size, schema, *fs, path, conf, *parquet::default_writer_properties());
for (int i = 0; i < 8 * 1024; ++i) {
auto r = writer.Write(record_batch);
if (!r.ok()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,35 @@
// limitations under the License.

#pragma once
#include <arrow/filesystem/filesystem.h>
#include <memory>
#include <string>

#include <sstream>

using namespace std;

namespace milvus_storage {

struct StorageConfig {
std::string uri = "";
std::string bucket_name = "";
std::string access_key_id = "";
std::string access_key_value = "";
std::string file_path = "";
std::string root_path = "";
std::string cloud_provider = "";
std::string region = "";
bool use_custom_part_upload_size = false;
int64_t part_size = 0;

std::string ToString() const {
std::stringstream ss;
ss << "[uri=" << uri << ", bucket_name=" << bucket_name << ", root_path=" << root_path
<< ", cloud_provider=" << cloud_provider << ", region=" << region
<< ", use_custom_part_upload_size=" << use_custom_part_upload_size << "]";

return ss.str();
}
};

static constexpr int64_t DEFAULT_MAX_ROW_GROUP_SIZE = 1024 * 1024; // 1 MB

// https://github.com/apache/arrow/blob/6b268f62a8a172249ef35f093009c740c32e1f36/cpp/src/arrow/filesystem/s3fs.cc#L1596
Expand All @@ -32,4 +55,4 @@ static const std::string ROW_GROUP_SIZE_META_KEY = "row_group_size";
static constexpr int64_t DEFAULT_READ_BATCH_SIZE = 1024;
static constexpr int64_t DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024;

} // namespace milvus_storage
} // namespace milvus_storage
51 changes: 51 additions & 0 deletions cpp/include/milvus-storage/common/path_util.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2024 Zilliz
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <string>
#include "arrow/status.h"

namespace milvus_storage {

constexpr char kSep = '/';

arrow::Status NotAFile(std::string_view path) {
return arrow::Status::IOError("Not a regular file: " + std::string(path));
}

bool HasTrailingSlash(std::string_view s) { return !s.empty() && s.back() == kSep; }

std::string EnsureTrailingSlash(std::string_view v) {
if (!v.empty() && !HasTrailingSlash(v)) {
// XXX How about "C:" on Windows? We probably don't want to turn it into "C:/"...
// Unless the local filesystem always uses absolute paths
return std::string(v) + kSep;
} else {
return std::string(v);
}
}

std::pair<std::string, std::string> GetAbstractPathParent(const std::string& s) {
// XXX should strip trailing slash?

auto pos = s.find_last_of(kSep);
if (pos == std::string::npos) {
// Empty parent
return {{}, s};
}
return {s.substr(0, pos), s.substr(pos + 1)};
}

} // namespace milvus_storage
5 changes: 5 additions & 0 deletions cpp/include/milvus-storage/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class Status {

static Status WriterError(const std::string& msg) { return Status(kWriterError, msg); }

static Status IOError(const std::string& msg) { return Status(kIOError, msg); }

bool ok() const { return code_ == kOk; }

bool IsArrowError() const { return code_ == kArrowError; }
Expand All @@ -49,6 +51,8 @@ class Status {

bool IsWriterError() const { return code_ == kWriterError; }

bool IsIOError() const { return code_ == kIOError; }

std::string ToString() const;

private:
Expand All @@ -59,6 +63,7 @@ class Status {
kInternalStateError = 3,
kFileNotFound = 4,
kWriterError = 5,
kIOError = 6,
};

explicit Status(Code code, const std::string& msg = "") : code_(code), msg_(msg) {}
Expand Down
15 changes: 8 additions & 7 deletions cpp/include/milvus-storage/filesystem/azure/azure_fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,24 @@ class AzureFileSystemProducer : public FileSystemProducer {
public:
AzureFileSystemProducer(){};

Result<std::shared_ptr<arrow::fs::FileSystem>> Make(const std::string& uri, std::string* out_path) override {
Result<std::shared_ptr<arrow::fs::FileSystem>> Make(const StorageConfig& storage_config,
std::string* out_path) override {
arrow::util::Uri uri_parser;
RETURN_ARROW_NOT_OK(uri_parser.Parse(uri));
RETURN_ARROW_NOT_OK(uri_parser.Parse(storage_config.uri));

arrow::fs::AzureOptions options;
auto account = std::getenv("AZURE_STORAGE_ACCOUNT");
auto key = std::getenv("AZURE_SECRET_KEY");
if (account == nullptr || key == nullptr) {
auto account = storage_config.access_key_id;
auto key = storage_config.access_key_value;
if (account.empty() || key.empty()) {
return Status::InvalidArgument("Please provide azure storage account and azure secret key");
}
options.account_name = account;
RETURN_ARROW_NOT_OK(options.ConfigureAccountKeyCredential(std::getenv("AZURE_SECRET_KEY")));
RETURN_ARROW_NOT_OK(options.ConfigureAccountKeyCredential(key));

ASSIGN_OR_RETURN_ARROW_NOT_OK(auto fs, arrow::fs::AzureFileSystem::Make(options));
fs->CreateDir(*out_path);
return std::shared_ptr<arrow::fs::FileSystem>(fs);
}
};

} // namespace milvus_storage
} // namespace milvus_storage
7 changes: 5 additions & 2 deletions cpp/include/milvus-storage/filesystem/fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@
#include <memory>
#include <string>
#include "common/result.h"
#include "common/config.h"

namespace milvus_storage {

class FileSystemProducer {
public:
virtual ~FileSystemProducer() = default;

virtual Result<std::shared_ptr<arrow::fs::FileSystem>> Make(const std::string& uri, std::string* out_path) = 0;
virtual Result<std::shared_ptr<arrow::fs::FileSystem>> Make(const StorageConfig& storage_config,
std::string* out_path) = 0;

std::string UriToPath(const std::string& uri) {
arrow::util::Uri uri_parser;
Expand All @@ -41,7 +43,8 @@ class FileSystemProducer {

class FileSystemFactory {
public:
Result<std::shared_ptr<arrow::fs::FileSystem>> BuildFileSystem(const std::string& uri, std::string* out_path);
Result<std::shared_ptr<arrow::fs::FileSystem>> BuildFileSystem(const StorageConfig& storage_config,
std::string* out_path);
};

} // namespace milvus_storage
26 changes: 26 additions & 0 deletions cpp/include/milvus-storage/filesystem/io/io_util.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#include <arrow/io/interfaces.h>
#include <arrow/status.h>
#include <arrow/util/thread_pool.h>
#include "common/log.h"

namespace milvus_storage {

template <typename... SubmitArgs>
auto SubmitIO(arrow::io::IOContext io_context, SubmitArgs&&... submit_args)
-> decltype(std::declval<::arrow::internal::Executor*>()->Submit(submit_args...)) {
arrow::internal::TaskHints hints;
hints.external_id = io_context.external_id();
return io_context.executor()->Submit(hints, io_context.stop_token(), std::forward<SubmitArgs>(submit_args)...);
};

void CloseFromDestructor(arrow::io::FileInterface* file) {
arrow::Status st = file->Close();
if (!st.ok()) {
auto file_type = typeid(*file).name();
std::stringstream ss;
ss << "When destroying file of type " << file_type << ": " << st.message();
LOG_STORAGE_FATAL_ << st.WithMessage(ss.str());
}
}

} // namespace milvus_storage
99 changes: 99 additions & 0 deletions cpp/include/milvus-storage/filesystem/s3/multi_part_upload_s3_fs.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright 2024 Zilliz
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <memory>
#include <string>
#include <vector>
#include <cstdlib>
#include "common/log.h"
#include "common/macro.h"

#include <arrow/util/key_value_metadata.h>
#include <arrow/filesystem/s3fs.h>
#include "arrow/filesystem/filesystem.h"
#include "arrow/util/macros.h"
#include "arrow/util/uri.h"
#include "arrow/io/interfaces.h"

using namespace arrow;
using ::arrow::fs::FileInfo;
using ::arrow::fs::FileInfoGenerator;

namespace milvus_storage {

class MultiPartUploadS3FS : public arrow::fs::S3FileSystem {
public:
~MultiPartUploadS3FS() override;

std::string type_name() const override { return "multiPartUploadS3"; }

bool Equals(const FileSystem& other) const override;

arrow::Result<std::string> PathFromUri(const std::string& uri_string) const override;

arrow::Result<FileInfo> GetFileInfo(const std::string& path) override;

arrow::Result<std::vector<arrow::fs::FileInfo>> GetFileInfo(const arrow::fs::FileSelector& select) override;

FileInfoGenerator GetFileInfoGenerator(const arrow::fs::FileSelector& select) override;

arrow::Status CreateDir(const std::string& path, bool recursive) override;

arrow::Status DeleteDir(const std::string& path) override;

arrow::Status DeleteDirContents(const std::string& path, bool missing_dir_ok) override;

Future<> DeleteDirContentsAsync(const std::string& path, bool missing_dir_ok) override;

arrow::Status DeleteRootDirContents() override;

arrow::Status DeleteFile(const std::string& path) override;

arrow::Status Move(const std::string& src, const std::string& dest) override;

arrow::Status CopyFile(const std::string& src, const std::string& dest) override;

arrow::Result<std::shared_ptr<arrow::io::OutputStream>> OpenOutputStreamWithUploadSize(const std::string& s,
int64_t part_size);

arrow::Result<std::shared_ptr<arrow::io::OutputStream>> OpenOutputStreamWithUploadSize(
const std::string& s, const std::shared_ptr<const arrow::KeyValueMetadata>& metadata, int64_t part_size);

static arrow::Result<std::shared_ptr<MultiPartUploadS3FS>> Make(
const arrow::fs::S3Options& options, const arrow::io::IOContext& = arrow::io::default_io_context());

arrow::Result<std::shared_ptr<arrow::io::InputStream>> OpenInputStream(const std::string& path) override;

arrow::Result<std::shared_ptr<arrow::io::InputStream>> OpenInputStream(const FileInfo& info) override;

arrow::Result<std::shared_ptr<arrow::io::RandomAccessFile>> OpenInputFile(const std::string& s) override;

arrow::Result<std::shared_ptr<arrow::io::RandomAccessFile>> OpenInputFile(const FileInfo& info) override;

arrow::Result<std::shared_ptr<arrow::io::OutputStream>> OpenOutputStream(
const std::string& path, const std::shared_ptr<const arrow::KeyValueMetadata>& metadata) override;

arrow::Result<std::shared_ptr<arrow::io::OutputStream>> OpenAppendStream(
const std::string& path, const std::shared_ptr<const arrow::KeyValueMetadata>& metadata) override;

protected:
explicit MultiPartUploadS3FS(const arrow::fs::S3Options& options, const arrow::io::IOContext& io_context);

class Impl;
std::shared_ptr<Impl> impl_;
Comment on lines +95 to +96
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The S3FileSystem already has such a field, can we use that instead of redefine it again?

};

} // namespace milvus_storage
Loading
Loading