Skip to content

Commit 8fce1ff

Browse files
[Huawei]ascend_transport kv pool
1 parent fc1eb07 commit 8fce1ff

36 files changed

+3695
-1621
lines changed

mooncake-common/common.cmake

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,13 +59,13 @@ option(BUILD_UNIT_TESTS "Build uint tests" ON)
5959
option(USE_CUDA "option for enabling gpu features" OFF)
6060
option(USE_NVMEOF "option for using NVMe over Fabric" OFF)
6161
option(USE_TCP "option for using TCP transport" ON)
62-
option(USE_ASCEND "option for using npu" OFF)
62+
option(USE_ASCEND "option for using npu" ON)
6363
option(USE_MNNVL "option for using Multi-Node NVLink transport" OFF)
6464
option(USE_CXL "option for using CXL protocol" OFF)
6565
option(USE_ETCD "option for enable etcd as metadata server" OFF)
6666
option(USE_ETCD_LEGACY "option for enable etcd based on etcd-cpp-api-v3" OFF)
6767
option(USE_REDIS "option for enable redis as metadata server" OFF)
68-
option(USE_HTTP "option for enable http as metadata server" ON)
68+
option(USE_HTTP "option for enable http as metadata server" OFF)
6969
option(WITH_RUST_EXAMPLE "build the Rust interface and sample code for the transfer engine" OFF)
7070
option(WITH_METRICS "enable metrics and metrics reporting thread" ON)
7171
option(USE_3FS "option for using 3FS storage backend" OFF)
@@ -115,7 +115,13 @@ if (USE_ASCEND)
115115
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DOPEN_BUILD_PROJECT ")
116116
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DOPEN_BUILD_PROJECT ")
117117

118-
file(GLOB ASCEND_TOOLKIT_ROOT "/usr/local/Ascend/ascend-toolkit/latest/*-linux")
118+
if("$ENV{ASCEND_TOOLKIT_PATH}" STREQUAL "")
119+
set(ASCEND_TOOLKIT_ROOT "/usr/local/Ascend/ascend-toolkit/latest/*-linux")
120+
else()
121+
set(ASCEND_TOOLKIT_ROOT $ENV{ASCEND_TOOLKIT_PATH})
122+
endif()
123+
124+
file(GLOB ASCEND_TOOLKIT_ROOT "${ASCEND_TOOLKIT_ROOT}")
119125
set(ASCEND_LIB_DIR "${ASCEND_TOOLKIT_ROOT}/lib64")
120126
set(ASCEND_INCLUDE_DIR "${ASCEND_TOOLKIT_ROOT}/include")
121127
add_compile_definitions(USE_ASCEND)

mooncake-integration/store/store_py.cpp

Lines changed: 261 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ tl::expected<void, ErrorCode> DistributedObjectStore::setup_internal(
154154
} else {
155155
this->local_hostname = local_hostname;
156156
}
157-
157+
LOG(ERROR) << "setup_internal local_hostname:" << this->local_hostname;
158158
void **args = (protocol == "rdma") ? rdma_args(rdma_devices) : nullptr;
159159
auto client_opt =
160160
mooncake::Client::Create(this->local_hostname, metadata_server,
@@ -164,24 +164,6 @@ tl::expected<void, ErrorCode> DistributedObjectStore::setup_internal(
164164
return tl::unexpected(ErrorCode::INVALID_PARAMS);
165165
}
166166
client_ = *client_opt;
167-
168-
// Local_buffer_size is allowed to be 0, but we only register memory when
169-
// local_buffer_size > 0. Invoke ibv_reg_mr() with size=0 is UB, and may
170-
// fail in some rdma implementations.
171-
client_buffer_allocator_ = ClientBufferAllocator::create(local_buffer_size);
172-
if (local_buffer_size > 0) {
173-
auto result = client_->RegisterLocalMemory(
174-
client_buffer_allocator_->getBase(), local_buffer_size,
175-
kWildcardLocation, false, true);
176-
if (!result.has_value()) {
177-
LOG(ERROR) << "Failed to register local memory: "
178-
<< toString(result.error());
179-
return tl::unexpected(result.error());
180-
}
181-
} else {
182-
LOG(INFO) << "Local buffer size is 0, skip registering local memory";
183-
}
184-
185167
// If global_segment_size is 0, skip mount segment;
186168
// If global_segment_size is larger than max_mr_size, split to multiple
187169
// segments.
@@ -1000,6 +982,21 @@ std::vector<int> DistributedObjectStore::batch_put_from(
1000982
return results;
1001983
}
1002984

985+
std::vector<int> DistributedObjectStore::batch_put_from_ascend(
986+
const std::string key, const std::vector<void *> &buffers,
987+
const std::vector<size_t> &sizes, const ReplicateConfig &config) {
988+
auto internal_results =
989+
batch_put_from_internal_ascend(key, buffers, sizes, config);
990+
std::vector<int> results;
991+
results.reserve(internal_results.size());
992+
993+
for (const auto &result : internal_results) {
994+
results.push_back(to_py_ret(result));
995+
}
996+
997+
return results;
998+
}
999+
10031000
std::vector<int> DistributedObjectStore::batch_get_into(
10041001
const std::vector<std::string> &keys, const std::vector<void *> &buffers,
10051002
const std::vector<size_t> &sizes) {
@@ -1014,6 +1011,25 @@ std::vector<int> DistributedObjectStore::batch_get_into(
10141011
return results;
10151012
}
10161013

1014+
std::vector<int> DistributedObjectStore::batch_get_into_ascend(
1015+
const std::string key, const std::vector<void *> &buffers,
1016+
const std::vector<size_t> &sizes) {
1017+
// auto start = std::chrono::high_resolution_clock::now();
1018+
1019+
auto internal_results = batch_get_into_internal_ascend(key, buffers, sizes);
1020+
std::vector<int> results;
1021+
results.reserve(internal_results.size());
1022+
1023+
for (const auto &result : internal_results) {
1024+
results.push_back(to_py_ret(result));
1025+
}
1026+
// auto stop = std::chrono::high_resolution_clock::now();
1027+
// auto duration_call =
1028+
// std::chrono::duration_cast<std::chrono::microseconds>(stop - start);
1029+
// LOG(INFO) << "key: " << key << ", batch_get_into_ascend: " << duration_call.count() << "us";
1030+
return results;
1031+
}
1032+
10171033
tl::expected<void, ErrorCode> DistributedObjectStore::put_from_internal(
10181034
const std::string &key, void *buffer, size_t size,
10191035
const ReplicateConfig &config) {
@@ -1201,6 +1217,152 @@ DistributedObjectStore::batch_get_into_internal(
12011217
return results;
12021218
}
12031219

1220+
std::vector<tl::expected<int64_t, ErrorCode>>
1221+
DistributedObjectStore::batch_get_into_internal_ascend(
1222+
const std::string key, const std::vector<void *> &buffers,
1223+
const std::vector<size_t> &sizes) {
1224+
// LOG(INFO) << "GET KEY start: " << key;
1225+
// Validate preconditions
1226+
if (!client_) {
1227+
LOG(ERROR) << "Client is not initialized";
1228+
return std::vector<tl::expected<int64_t, ErrorCode>>(
1229+
1, tl::unexpected(ErrorCode::INVALID_PARAMS));
1230+
}
1231+
1232+
if (buffers.size() != sizes.size()) {
1233+
LOG(ERROR) << "Input vector sizes mismatch: keys=" << 1
1234+
<< ", buffers=" << buffers.size()
1235+
<< ", sizes=" << sizes.size();
1236+
return std::vector<tl::expected<int64_t, ErrorCode>>(
1237+
1, tl::unexpected(ErrorCode::INVALID_PARAMS));
1238+
}
1239+
1240+
const size_t num_keys = 1;
1241+
std::vector<tl::expected<int64_t, ErrorCode>> results;
1242+
results.reserve(num_keys);
1243+
1244+
if (num_keys == 0) {
1245+
return results;
1246+
}
1247+
std::vector<std::string> keys;
1248+
keys.reserve(1);
1249+
keys.emplace_back(key);
1250+
// Query metadata for all keys
1251+
const auto query_results = client_->BatchQuery(keys);
1252+
1253+
// Process each key individually and prepare for batch transfer
1254+
struct ValidKeyInfo {
1255+
std::string key;
1256+
size_t original_index;
1257+
std::vector<Replica::Descriptor> replica_list;
1258+
std::vector<Slice> slices;
1259+
uint64_t total_size;
1260+
};
1261+
1262+
std::vector<ValidKeyInfo> valid_operations;
1263+
valid_operations.reserve(num_keys);
1264+
1265+
for (size_t i = 0; i < num_keys; ++i) {
1266+
const auto &key = keys[i];
1267+
1268+
// Handle query failures
1269+
if (!query_results[i]) {
1270+
const auto error = query_results[i].error();
1271+
results.emplace_back(tl::unexpected(error));
1272+
if (error != ErrorCode::OBJECT_NOT_FOUND) {
1273+
LOG(ERROR) << "Query failed for key '" << key
1274+
<< "': " << toString(error);
1275+
}
1276+
continue;
1277+
}
1278+
1279+
// Validate replica list
1280+
auto replica_list = query_results[i].value();
1281+
if (replica_list.empty()) {
1282+
LOG(ERROR) << "Empty replica list for key: " << key;
1283+
results.emplace_back(tl::unexpected(ErrorCode::INVALID_REPLICA));
1284+
continue;
1285+
}
1286+
1287+
// Calculate required buffer size
1288+
const auto &replica = replica_list[0];
1289+
uint64_t total_size = calculate_total_size(replica);
1290+
int total_key_size = 0;
1291+
for (size_t k = 0; k < sizes.size(); ++k) {
1292+
total_key_size += sizes[k];
1293+
}
1294+
// LOG(INFO) << "KEY: '" << key
1295+
// << "': required=" << total_size
1296+
// << ", available=" << total_key_size;
1297+
// Validate buffer capacity
1298+
if (total_key_size < total_size) {
1299+
LOG(ERROR) << "Buffer too small for key '" << key
1300+
<< "': required=" << total_size
1301+
<< ", available=" << total_key_size;
1302+
results.emplace_back(tl::unexpected(ErrorCode::INVALID_PARAMS));
1303+
continue;
1304+
}
1305+
std::vector<Slice> key_slices;
1306+
// Create slices for this key's buffer
1307+
for (size_t j = 0; j < buffers.size(); ++j) {
1308+
uint64_t offset = 0;
1309+
if (replica.is_memory_replica() == false) {
1310+
key_slices.emplace_back(Slice{buffers[j], sizes[j]});
1311+
} else {
1312+
key_slices.emplace_back(Slice{buffers[j], sizes[j]});
1313+
}
1314+
}
1315+
1316+
// Store operation info for batch processing
1317+
valid_operations.push_back({.key = key,
1318+
.original_index = i,
1319+
.replica_list = std::move(replica_list),
1320+
.slices = std::move(key_slices),
1321+
.total_size = total_size});
1322+
1323+
// Set success result (actual bytes transferred)
1324+
results.emplace_back(static_cast<int64_t>(total_size));
1325+
}
1326+
1327+
// Early return if no valid operations
1328+
if (valid_operations.empty()) {
1329+
return results;
1330+
}
1331+
1332+
// Prepare batch transfer data structures
1333+
std::vector<std::string> batch_keys;
1334+
std::vector<std::vector<Replica::Descriptor>> batch_replica_lists;
1335+
std::unordered_map<std::string, std::vector<Slice>> batch_slices;
1336+
1337+
batch_keys.reserve(valid_operations.size());
1338+
batch_replica_lists.reserve(valid_operations.size());
1339+
1340+
for (const auto &op : valid_operations) {
1341+
batch_keys.push_back(op.key);
1342+
batch_replica_lists.push_back(op.replica_list);
1343+
batch_slices[op.key] = op.slices;
1344+
}
1345+
1346+
// Execute batch transfer
1347+
const auto batch_get_results =
1348+
client_->BatchGet(batch_keys, batch_replica_lists, batch_slices);
1349+
1350+
// Process transfer results
1351+
for (size_t j = 0; j < batch_get_results.size(); ++j) {
1352+
const auto &op = valid_operations[j];
1353+
1354+
if (!batch_get_results[j]) {
1355+
const auto error = batch_get_results[j].error();
1356+
LOG(ERROR) << "BatchGet failed for key '" << op.key
1357+
<< "': " << toString(error);
1358+
results[op.original_index] = tl::unexpected(error);
1359+
}
1360+
}
1361+
// LOG(INFO) << "GET KEY end: " << key << "end";
1362+
1363+
return results;
1364+
}
1365+
12041366
std::vector<tl::expected<void, ErrorCode>>
12051367
DistributedObjectStore::batch_put_from_internal(
12061368
const std::vector<std::string> &keys, const std::vector<void *> &buffers,
@@ -1255,6 +1417,47 @@ DistributedObjectStore::batch_put_from_internal(
12551417
return client_->BatchPut(keys, ordered_batched_slices, config);
12561418
}
12571419

1420+
std::vector<tl::expected<void, ErrorCode>>
1421+
DistributedObjectStore::batch_put_from_internal_ascend(
1422+
const std::string key, const std::vector<void *> &buffers,
1423+
const std::vector<size_t> &sizes, const ReplicateConfig &config) {
1424+
LOG(INFO) << "PUT KEY start: " << key;
1425+
if (!client_) {
1426+
LOG(ERROR) << "Client is not initialized";
1427+
return std::vector<tl::expected<void, ErrorCode>>(
1428+
1, tl::unexpected(ErrorCode::INVALID_PARAMS));
1429+
}
1430+
1431+
if (buffers.size() != sizes.size()) {
1432+
LOG(ERROR) << "Mismatched sizes for key, buffers, and sizes";
1433+
return std::vector<tl::expected<void, ErrorCode>>(
1434+
1, tl::unexpected(ErrorCode::INVALID_PARAMS));
1435+
}
1436+
1437+
// std::unordered_map<std::string, std::vector<mooncake::Slice>> all_slices;
1438+
1439+
// Create slices from user buffers
1440+
std::vector<mooncake::Slice> slices;
1441+
slices.reserve(buffers.size());
1442+
for (size_t i = 0; i < buffers.size(); ++i) {
1443+
void *buffer = buffers[i];
1444+
size_t size = sizes[i];
1445+
slices.emplace_back(Slice{buffer, size});
1446+
}
1447+
std::vector<std::vector<mooncake::Slice>> ordered_batched_slices;
1448+
ordered_batched_slices.reserve(1);
1449+
ordered_batched_slices.emplace_back(slices);
1450+
1451+
std::vector<std::string> keys;
1452+
keys.reserve(1);
1453+
keys.emplace_back(key);
1454+
LOG(ERROR) << "batch put keys size:" << keys.size() << ", ordered_batched_slices size:" << ordered_batched_slices.size()
1455+
<< ", slice size len:" << slices.size();
1456+
1457+
// Call client BatchPut and return the vector<expected> directly
1458+
return client_->BatchPut(keys, ordered_batched_slices, config);
1459+
}
1460+
12581461
std::vector<tl::expected<bool, ErrorCode>>
12591462
DistributedObjectStore::batchIsExist_internal(
12601463
const std::vector<std::string> &keys) {
@@ -1778,7 +1981,45 @@ PYBIND11_MODULE(store, m) {
17781981
},
17791982
py::arg("keys"), py::arg("values"),
17801983
py::arg("config") = ReplicateConfig{})
1781-
.def("get_hostname", &DistributedObjectStore::get_hostname);
1984+
.def("get_hostname", &DistributedObjectStore::get_hostname)
1985+
.def(
1986+
"batch_put_from_ascend",
1987+
[](DistributedObjectStore &self,
1988+
const std::string key,
1989+
const std::vector<uintptr_t> &buffer_ptrs,
1990+
const std::vector<size_t> &sizes,
1991+
const ReplicateConfig &config = ReplicateConfig{}) {
1992+
std::vector<void *> buffers;
1993+
buffers.reserve(buffer_ptrs.size());
1994+
for (uintptr_t ptr : buffer_ptrs) {
1995+
buffers.push_back(reinterpret_cast<void *>(ptr));
1996+
}
1997+
py::gil_scoped_release release;
1998+
return self.batch_put_from_ascend(key, buffers, sizes, config);
1999+
},
2000+
py::arg("keys"), py::arg("buffer_ptrs"), py::arg("sizes"),
2001+
py::arg("config") = ReplicateConfig{},
2002+
"Put object data directly from pre-allocated buffers for "
2003+
"multiple "
2004+
"keys")
2005+
.def(
2006+
"batch_get_into_ascend",
2007+
[](DistributedObjectStore &self,
2008+
const std::string key,
2009+
const std::vector<uintptr_t> &buffer_ptrs,
2010+
const std::vector<size_t> &sizes) {
2011+
std::vector<void *> buffers;
2012+
buffers.reserve(buffer_ptrs.size());
2013+
for (uintptr_t ptr : buffer_ptrs) {
2014+
buffers.push_back(reinterpret_cast<void *>(ptr));
2015+
}
2016+
py::gil_scoped_release release;
2017+
return self.batch_get_into_ascend(key, buffers, sizes);
2018+
},
2019+
py::arg("keys"), py::arg("buffer_ptrs"), py::arg("sizes"),
2020+
"Get object data directly into pre-allocated buffers for "
2021+
"multiple "
2022+
"keys");
17822023
}
17832024

17842025
} // namespace mooncake

0 commit comments

Comments
 (0)