Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions mooncake-common/common.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ option(BUILD_UNIT_TESTS "Build uint tests" ON)
option(USE_CUDA "option for enabling gpu features" OFF)
option(USE_NVMEOF "option for using NVMe over Fabric" OFF)
option(USE_TCP "option for using TCP transport" ON)
option(USE_ASCEND "option for using npu" OFF)
option(USE_ASCEND "option for using npu" ON)
option(USE_MNNVL "option for using Multi-Node NVLink transport" OFF)
option(USE_CXL "option for using CXL protocol" OFF)
option(USE_ETCD "option for enable etcd as metadata server" OFF)
option(USE_ETCD_LEGACY "option for enable etcd based on etcd-cpp-api-v3" OFF)
option(USE_REDIS "option for enable redis as metadata server" OFF)
option(USE_HTTP "option for enable http as metadata server" ON)
option(USE_HTTP "option for enable http as metadata server" OFF)
option(WITH_RUST_EXAMPLE "build the Rust interface and sample code for the transfer engine" OFF)
option(WITH_METRICS "enable metrics and metrics reporting thread" ON)
option(USE_3FS "option for using 3FS storage backend" OFF)
Expand Down Expand Up @@ -115,7 +115,13 @@ if (USE_ASCEND)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DOPEN_BUILD_PROJECT ")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DOPEN_BUILD_PROJECT ")

file(GLOB ASCEND_TOOLKIT_ROOT "/usr/local/Ascend/ascend-toolkit/latest/*-linux")
if("$ENV{ASCEND_TOOLKIT_PATH}" STREQUAL "")
set(ASCEND_TOOLKIT_ROOT "/usr/local/Ascend/ascend-toolkit/latest/*-linux")
else()
set(ASCEND_TOOLKIT_ROOT $ENV{ASCEND_TOOLKIT_PATH})
endif()

file(GLOB ASCEND_TOOLKIT_ROOT "${ASCEND_TOOLKIT_ROOT}")
set(ASCEND_LIB_DIR "${ASCEND_TOOLKIT_ROOT}/lib64")
set(ASCEND_INCLUDE_DIR "${ASCEND_TOOLKIT_ROOT}/include")
add_compile_definitions(USE_ASCEND)
Expand Down
281 changes: 261 additions & 20 deletions mooncake-integration/store/store_py.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ tl::expected<void, ErrorCode> DistributedObjectStore::setup_internal(
} else {
this->local_hostname = local_hostname;
}

LOG(ERROR) << "setup_internal local_hostname:" << this->local_hostname;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This log message uses LOG(ERROR) for what appears to be a debug/informational message. Using ERROR level for non-error conditions can clutter logs and make it harder to find real errors. Please consider changing this to LOG(INFO) or VLOG(1).

Suggested change
LOG(ERROR) << "setup_internal local_hostname:" << this->local_hostname;
LOG(INFO) << "setup_internal local_hostname:" << this->local_hostname;

void **args = (protocol == "rdma") ? rdma_args(rdma_devices) : nullptr;
auto client_opt =
mooncake::Client::Create(this->local_hostname, metadata_server,
Expand All @@ -164,24 +164,6 @@ tl::expected<void, ErrorCode> DistributedObjectStore::setup_internal(
return tl::unexpected(ErrorCode::INVALID_PARAMS);
}
client_ = *client_opt;

// Local_buffer_size is allowed to be 0, but we only register memory when
// local_buffer_size > 0. Invoke ibv_reg_mr() with size=0 is UB, and may
// fail in some rdma implementations.
client_buffer_allocator_ = ClientBufferAllocator::create(local_buffer_size);
if (local_buffer_size > 0) {
auto result = client_->RegisterLocalMemory(
client_buffer_allocator_->getBase(), local_buffer_size,
kWildcardLocation, false, true);
if (!result.has_value()) {
LOG(ERROR) << "Failed to register local memory: "
<< toString(result.error());
return tl::unexpected(result.error());
}
} else {
LOG(INFO) << "Local buffer size is 0, skip registering local memory";
}

// If global_segment_size is 0, skip mount segment;
// If global_segment_size is larger than max_mr_size, split to multiple
// segments.
Expand Down Expand Up @@ -1000,6 +982,21 @@ std::vector<int> DistributedObjectStore::batch_put_from(
return results;
}

std::vector<int> DistributedObjectStore::batch_put_from_ascend(
const std::string key, const std::vector<void *> &buffers,
const std::vector<size_t> &sizes, const ReplicateConfig &config) {
auto internal_results =
batch_put_from_internal_ascend(key, buffers, sizes, config);
std::vector<int> results;
results.reserve(internal_results.size());

for (const auto &result : internal_results) {
results.push_back(to_py_ret(result));
}

return results;
}

std::vector<int> DistributedObjectStore::batch_get_into(
const std::vector<std::string> &keys, const std::vector<void *> &buffers,
const std::vector<size_t> &sizes) {
Expand All @@ -1014,6 +1011,25 @@ std::vector<int> DistributedObjectStore::batch_get_into(
return results;
}

std::vector<int> DistributedObjectStore::batch_get_into_ascend(
const std::string key, const std::vector<void *> &buffers,
const std::vector<size_t> &sizes) {
// auto start = std::chrono::high_resolution_clock::now();

auto internal_results = batch_get_into_internal_ascend(key, buffers, sizes);
std::vector<int> results;
results.reserve(internal_results.size());

for (const auto &result : internal_results) {
results.push_back(to_py_ret(result));
}
// auto stop = std::chrono::high_resolution_clock::now();
// auto duration_call =
// std::chrono::duration_cast<std::chrono::microseconds>(stop - start);
// LOG(INFO) << "key: " << key << ", batch_get_into_ascend: " << duration_call.count() << "us";
return results;
}

tl::expected<void, ErrorCode> DistributedObjectStore::put_from_internal(
const std::string &key, void *buffer, size_t size,
const ReplicateConfig &config) {
Expand Down Expand Up @@ -1201,6 +1217,152 @@ DistributedObjectStore::batch_get_into_internal(
return results;
}

std::vector<tl::expected<int64_t, ErrorCode>>
DistributedObjectStore::batch_get_into_internal_ascend(
const std::string key, const std::vector<void *> &buffers,
const std::vector<size_t> &sizes) {
// LOG(INFO) << "GET KEY start: " << key;
// Validate preconditions
if (!client_) {
LOG(ERROR) << "Client is not initialized";
return std::vector<tl::expected<int64_t, ErrorCode>>(
1, tl::unexpected(ErrorCode::INVALID_PARAMS));
}

if (buffers.size() != sizes.size()) {
LOG(ERROR) << "Input vector sizes mismatch: keys=" << 1
<< ", buffers=" << buffers.size()
<< ", sizes=" << sizes.size();
return std::vector<tl::expected<int64_t, ErrorCode>>(
1, tl::unexpected(ErrorCode::INVALID_PARAMS));
}

const size_t num_keys = 1;
std::vector<tl::expected<int64_t, ErrorCode>> results;
results.reserve(num_keys);

if (num_keys == 0) {
return results;
Comment on lines +1240 to +1245
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The code const size_t num_keys = 1; followed by if (num_keys == 0) is dead code since num_keys is a compile-time constant and the condition will always be false. This should be removed to improve code clarity.

    const size_t num_keys = 1;
    std::vector<tl::expected<int64_t, ErrorCode>> results;
    results.reserve(num_keys);

}
std::vector<std::string> keys;
keys.reserve(1);
keys.emplace_back(key);
// Query metadata for all keys
const auto query_results = client_->BatchQuery(keys);

// Process each key individually and prepare for batch transfer
struct ValidKeyInfo {
std::string key;
size_t original_index;
std::vector<Replica::Descriptor> replica_list;
std::vector<Slice> slices;
uint64_t total_size;
};

std::vector<ValidKeyInfo> valid_operations;
valid_operations.reserve(num_keys);

for (size_t i = 0; i < num_keys; ++i) {
const auto &key = keys[i];

// Handle query failures
if (!query_results[i]) {
const auto error = query_results[i].error();
results.emplace_back(tl::unexpected(error));
if (error != ErrorCode::OBJECT_NOT_FOUND) {
LOG(ERROR) << "Query failed for key '" << key
<< "': " << toString(error);
}
continue;
}

// Validate replica list
auto replica_list = query_results[i].value();
if (replica_list.empty()) {
LOG(ERROR) << "Empty replica list for key: " << key;
results.emplace_back(tl::unexpected(ErrorCode::INVALID_REPLICA));
continue;
}

// Calculate required buffer size
const auto &replica = replica_list[0];
uint64_t total_size = calculate_total_size(replica);
int total_key_size = 0;
for (size_t k = 0; k < sizes.size(); ++k) {
total_key_size += sizes[k];
}
// LOG(INFO) << "KEY: '" << key
// << "': required=" << total_size
// << ", available=" << total_key_size;
// Validate buffer capacity
if (total_key_size < total_size) {
LOG(ERROR) << "Buffer too small for key '" << key
<< "': required=" << total_size
<< ", available=" << total_key_size;
results.emplace_back(tl::unexpected(ErrorCode::INVALID_PARAMS));
continue;
}
std::vector<Slice> key_slices;
// Create slices for this key's buffer
for (size_t j = 0; j < buffers.size(); ++j) {
uint64_t offset = 0;
if (replica.is_memory_replica() == false) {
key_slices.emplace_back(Slice{buffers[j], sizes[j]});
} else {
key_slices.emplace_back(Slice{buffers[j], sizes[j]});
}
Comment on lines +1309 to +1313
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The if/else block here has identical bodies. This is redundant and can be simplified to a single statement.

            key_slices.emplace_back(Slice{buffers[j], sizes[j]});

}

// Store operation info for batch processing
valid_operations.push_back({.key = key,
.original_index = i,
.replica_list = std::move(replica_list),
.slices = std::move(key_slices),
.total_size = total_size});

// Set success result (actual bytes transferred)
results.emplace_back(static_cast<int64_t>(total_size));
}

// Early return if no valid operations
if (valid_operations.empty()) {
return results;
}

// Prepare batch transfer data structures
std::vector<std::string> batch_keys;
std::vector<std::vector<Replica::Descriptor>> batch_replica_lists;
std::unordered_map<std::string, std::vector<Slice>> batch_slices;

batch_keys.reserve(valid_operations.size());
batch_replica_lists.reserve(valid_operations.size());

for (const auto &op : valid_operations) {
batch_keys.push_back(op.key);
batch_replica_lists.push_back(op.replica_list);
batch_slices[op.key] = op.slices;
}

// Execute batch transfer
const auto batch_get_results =
client_->BatchGet(batch_keys, batch_replica_lists, batch_slices);

// Process transfer results
for (size_t j = 0; j < batch_get_results.size(); ++j) {
const auto &op = valid_operations[j];

if (!batch_get_results[j]) {
const auto error = batch_get_results[j].error();
LOG(ERROR) << "BatchGet failed for key '" << op.key
<< "': " << toString(error);
results[op.original_index] = tl::unexpected(error);
}
}
// LOG(INFO) << "GET KEY end: " << key << "end";

return results;
}

std::vector<tl::expected<void, ErrorCode>>
DistributedObjectStore::batch_put_from_internal(
const std::vector<std::string> &keys, const std::vector<void *> &buffers,
Expand Down Expand Up @@ -1255,6 +1417,47 @@ DistributedObjectStore::batch_put_from_internal(
return client_->BatchPut(keys, ordered_batched_slices, config);
}

std::vector<tl::expected<void, ErrorCode>>
DistributedObjectStore::batch_put_from_internal_ascend(
const std::string key, const std::vector<void *> &buffers,
const std::vector<size_t> &sizes, const ReplicateConfig &config) {
LOG(INFO) << "PUT KEY start: " << key;
if (!client_) {
LOG(ERROR) << "Client is not initialized";
return std::vector<tl::expected<void, ErrorCode>>(
1, tl::unexpected(ErrorCode::INVALID_PARAMS));
}

if (buffers.size() != sizes.size()) {
LOG(ERROR) << "Mismatched sizes for key, buffers, and sizes";
return std::vector<tl::expected<void, ErrorCode>>(
1, tl::unexpected(ErrorCode::INVALID_PARAMS));
}

// std::unordered_map<std::string, std::vector<mooncake::Slice>> all_slices;

// Create slices from user buffers
std::vector<mooncake::Slice> slices;
slices.reserve(buffers.size());
for (size_t i = 0; i < buffers.size(); ++i) {
void *buffer = buffers[i];
size_t size = sizes[i];
slices.emplace_back(Slice{buffer, size});
}
std::vector<std::vector<mooncake::Slice>> ordered_batched_slices;
ordered_batched_slices.reserve(1);
ordered_batched_slices.emplace_back(slices);

std::vector<std::string> keys;
keys.reserve(1);
keys.emplace_back(key);
LOG(ERROR) << "batch put keys size:" << keys.size() << ", ordered_batched_slices size:" << ordered_batched_slices.size()
<< ", slice size len:" << slices.size();
Comment on lines +1454 to +1455
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This LOG(ERROR) message appears to be for debugging purposes. Please consider removing it or changing it to a lower severity level like LOG(INFO) or VLOG to avoid cluttering error logs.

    LOG(INFO) << "batch put keys size:" << keys.size() << ", ordered_batched_slices size:" << ordered_batched_slices.size()
    << ", slice size len:" << slices.size();


// Call client BatchPut and return the vector<expected> directly
return client_->BatchPut(keys, ordered_batched_slices, config);
}

std::vector<tl::expected<bool, ErrorCode>>
DistributedObjectStore::batchIsExist_internal(
const std::vector<std::string> &keys) {
Expand Down Expand Up @@ -1778,7 +1981,45 @@ PYBIND11_MODULE(store, m) {
},
py::arg("keys"), py::arg("values"),
py::arg("config") = ReplicateConfig{})
.def("get_hostname", &DistributedObjectStore::get_hostname);
.def("get_hostname", &DistributedObjectStore::get_hostname)
.def(
"batch_put_from_ascend",
[](DistributedObjectStore &self,
const std::string key,
const std::vector<uintptr_t> &buffer_ptrs,
const std::vector<size_t> &sizes,
const ReplicateConfig &config = ReplicateConfig{}) {
std::vector<void *> buffers;
buffers.reserve(buffer_ptrs.size());
for (uintptr_t ptr : buffer_ptrs) {
buffers.push_back(reinterpret_cast<void *>(ptr));
}
py::gil_scoped_release release;
return self.batch_put_from_ascend(key, buffers, sizes, config);
},
py::arg("keys"), py::arg("buffer_ptrs"), py::arg("sizes"),
py::arg("config") = ReplicateConfig{},
"Put object data directly from pre-allocated buffers for "
"multiple "
"keys")
.def(
"batch_get_into_ascend",
[](DistributedObjectStore &self,
const std::string key,
const std::vector<uintptr_t> &buffer_ptrs,
const std::vector<size_t> &sizes) {
std::vector<void *> buffers;
buffers.reserve(buffer_ptrs.size());
for (uintptr_t ptr : buffer_ptrs) {
buffers.push_back(reinterpret_cast<void *>(ptr));
}
py::gil_scoped_release release;
return self.batch_get_into_ascend(key, buffers, sizes);
},
py::arg("keys"), py::arg("buffer_ptrs"), py::arg("sizes"),
"Get object data directly into pre-allocated buffers for "
"multiple "
"keys");
}

} // namespace mooncake
Loading