Skip to content

Commit

Permalink
custom object output stream
Browse files Browse the repository at this point in the history
Signed-off-by: shaoting-huang <shaoting.huang@zilliz.com>
  • Loading branch information
shaoting-huang committed Oct 2, 2024
1 parent c1e0166 commit abee34d
Show file tree
Hide file tree
Showing 6 changed files with 501 additions and 138 deletions.
3 changes: 2 additions & 1 deletion cpp/include/milvus-storage/common/log.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include <sys/types.h>
#include <unistd.h>
#include "glog/logging.h"
#include "arrow/util/logging.h"

// namespace milvus {

Expand Down Expand Up @@ -116,7 +117,7 @@
#define STORAGE_IGNORE_EXPR(expr) ((void)(expr))
#define STORAGE_CHECK_OR_LOG(condition) \
STORAGE_PREDICT_TRUE(condition) \
? STORAGE_IGNORE_EXPR(0) : LOG_STORAGE_FATAL_ << " Check failed: " #condition " "
? arrow::util::Voidify() & STORAGE_IGNORE_EXPR(0) : LOG_STORAGE_FATAL_ << " Check failed: " #condition " "

/////////////////////////////////////////////////////////////////////////////////////////////////

Expand Down
54 changes: 27 additions & 27 deletions cpp/include/milvus-storage/common/path_util.h
Original file line number Diff line number Diff line change
@@ -1,35 +1,35 @@
// Copyright 2024 Zilliz
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// // Copyright 2024 Zilliz
// //
// // Licensed under the Apache License, Version 2.0 (the "License");
// // you may not use this file except in compliance with the License.
// // You may obtain a copy of the License at
// //
// // http://www.apache.org/licenses/LICENSE-2.0
// //
// // Unless required by applicable law or agreed to in writing, software
// // distributed under the License is distributed on an "AS IS" BASIS,
// // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// // See the License for the specific language governing permissions and
// // limitations under the License.

#pragma once
// #pragma once

#include <string>
#include "common/status.h"
// #include <string>
// #include "common/status.h"

namespace milvus_storage {
// namespace milvus_storage {

constexpr char kSep = '/';
// constexpr char kSep = '/';

Status NotAFile(std::string_view path) { return Status::IOError("Not a regular file: '", path, "'"); }
// Status NotAFile(std::string_view path) { return Status::IOError("Not a regular file: " + std::string(path)); }

bool HasTrailingSlash(std::string s) { return !s.empty() && s.back() == kSep; }
// bool HasTrailingSlash(std::string s) { return !s.empty() && s.back() == kSep; }

Status AssertNoTrailingSlash(std::string key) {
if (HasTrailingSlash(key)) {
return NotAFile(key);
}
return Status::OK();
}
// Status AssertNoTrailingSlash(std::string key) {
// if (HasTrailingSlash(key)) {
// return NotAFile(key);
// }
// return Status::OK();
// }

} // namespace milvus_storage
// } // namespace milvus_storage
5 changes: 5 additions & 0 deletions cpp/include/milvus-storage/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class Status {

static Status WriterError(const std::string& msg) { return Status(kWriterError, msg); }

static Status IOError(const std::string& msg) { return Status(kIOError, msg); }

bool ok() const { return code_ == kOk; }

bool IsArrowError() const { return code_ == kArrowError; }
Expand All @@ -49,6 +51,8 @@ class Status {

bool IsWriterError() const { return code_ == kWriterError; }

bool IsIOError() const { return code_ == kIOError; }

std::string ToString() const;

private:
Expand All @@ -59,6 +63,7 @@ class Status {
kInternalStateError = 3,
kFileNotFound = 4,
kWriterError = 5,
kIOError = 6,
};

explicit Status(Code code, const std::string& msg = "") : code_(code), msg_(msg) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <string>
#include <vector>

#include <arrow/util/key_value_metadata.h>
#include <arrow/filesystem/s3fs.h>
#include "arrow/filesystem/filesystem.h"
#include "arrow/util/macros.h"
Expand All @@ -31,7 +32,7 @@ class MultiPartUploadS3FS : public arrow::fs::FileSystem {
: options_(options), part_size_(part_size) {}

arrow::Result<std::shared_ptr<arrow::io::OutputStream>> OpenOutputStream(
const std::string& s, const std::shared_ptr<const KeyValueMetadata>& metadata) override;
const std::string& s, const std::shared_ptr<const arrow::KeyValueMetadata>& metadata) override;

private:
const int64_t part_size_;
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/filesystem/io/io_util.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
#include <arrow/io/interface.h>
#include <arrow/io/interfaces.h>
#include <arrow/util/thread_pool.h>

namespace milvus_storage {

template <typename... SubmitArgs>
auto SubmitIO(arrow::io::IOContext io_context, SubmitArgs&&... submit_args)
-> decltype(std::declval<::arrow::internal::Executor*>()->Submit(submit_args...)) {
::arrow::internal::TaskHints hints;
arrow::internal::TaskHints hints;
hints.external_id = io_context.external_id();
return io_context.executor()->Submit(hints, io_context.stop_token(), std::forward<SubmitArgs>(submit_args)...);
};
Expand Down
Loading

0 comments on commit abee34d

Please sign in to comment.