Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
951c093
enable client ssd offload and storage persistence
May 30, 2025
159442d
add storage_root_path in all tests setup() initialization and add rel…
Jun 3, 2025
b02d7bd
clean up headers and improve code readability - Added consistent Doxy…
Jun 3, 2025
9e25771
Revert "add storage_root_path in all tests setup() initialization and…
Jun 3, 2025
4d5f4dc
Restore the high-level API to its original state and modify it to int…
Jun 3, 2025
240e56a
add local_file_test and thread_pool_test
Jun 3, 2025
cd275dc
feat(client_ssd_offload): implement async writes and fix locking bugs
Jun 4, 2025
5443d6d
add support for remove , remove_all , isexist interface etc.
Jun 6, 2025
d78414e
feat(kvcache): implement cluster isolation with session IDs
Jun 9, 2025
6e93a21
edit two parameters client get, add persisitence path in client rathe…
Jun 10, 2025
7971c4c
merge main
Jun 10, 2025
91591b1
add support for batch api conflict , refactor replica.descriptor to s…
Jun 11, 2025
4b28a49
add test branch
Jun 11, 2025
13d8dc1
add ci ssd
Jun 11, 2025
1f00e2c
change python test
Jun 11, 2025
b915991
add log for fail
Jun 11, 2025
f77dd9d
change querykey return value type
Jun 11, 2025
e68318a
fix bug
Jun 11, 2025
4392df4
fix bug
Jun 11, 2025
7c78507
add sleep for removefile
Jun 11, 2025
3dfc8cd
fix sleep
Jun 11, 2025
ffc9e59
edit ci.yml and fix delete before write problem
Jun 11, 2025
fdd112d
add comment for storage_backend
Jun 11, 2025
36d9ae5
spell check
Jun 11, 2025
4ae5b00
fix conflict
Jun 11, 2025
c5cae7a
fix name problem and decrease errorcode for file
Jun 13, 2025
3f6be2b
fix conflict
Jun 13, 2025
d75a522
add pytest for ssd offload
Jun 16, 2025
8c83a02
edit test
Jun 16, 2025
f0138b3
edit test
Jun 16, 2025
68f0887
fix test
Jun 16, 2025
ad60b71
fix bug
Jun 16, 2025
273b9f2
fix test
Jun 16, 2025
c722b39
Modify the thread pool value capture to reference capture to fix the …
Jun 18, 2025
37151d7
fix conflict with upstream
Jun 24, 2025
3d1d5fd
add async getfrom file in batchget transfertask. delete file_storage_…
Jun 25, 2025
ab405ba
fix conflict
Jun 25, 2025
b772440
add support for HA in cluster_id subdirectory, change session_id to f…
Jun 26, 2025
f15ad63
Merge branch 'main' into client-ssd-persistence
Jun 26, 2025
a8adcd0
add persistence in batchput
Jun 26, 2025
f348dba
add disk allocate for get_into py interface
Jun 26, 2025
7768e29
fix bug
Jun 26, 2025
f2323bf
temp
Jun 27, 2025
1f048ab
fix bug in submit fileread task for std:move(slices)
Jun 30, 2025
4885ff7
edit querykey to return optional<descriptor>, add interface batchquer…
Jun 30, 2025
a1d527f
Merge branch 'main' into client-ssd-persistence
Jun 30, 2025
38df0bf
fix confict in batchget, add batchget/batchput test
Jun 30, 2025
87266fe
fix bug
Jun 30, 2025
01b2f0a
fix bug
Jun 30, 2025
ca41bab
comment batch test
Jun 30, 2025
d6b3c34
Merge branch 'main' into client-ssd-persistence
Jun 30, 2025
726859b
fix conflict and add batch_get_into file test
Jun 30, 2025
8ba5207
fix test bug
Jul 1, 2025
745888b
fix test bug
Jul 1, 2025
a4facc9
fix conflict
Jul 1, 2025
6cf962e
fix conflict
Jul 1, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,17 @@ jobs:
MC_METADATA_SERVER=http://127.0.0.1:8080/metadata MC_STORE_MEMCPY=false make test -j ARGS="-V"
shell: bash

- name: Test (in build env with ssd)
run: |
cd build
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/lib
ldconfig -v || echo "always continue"
mkdir -p /tmp/mooncake_test_ssd
export MOONCAKE_STORAGE_ROOT_DIR=/tmp/mooncake_test_ssd
MC_METADATA_SERVER=http://127.0.0.1:8080/metadata make test -j ARGS="-V"
rm -rf /tmp/mooncake_test_ssd
shell: bash

- name: Stop Mooncake Master Service
run: |
pkill mooncake_master || true
Expand Down Expand Up @@ -169,13 +180,31 @@ jobs:
mooncake_http_metadata_server --port 8080 &
shell: bash

- name: Run tests with ssd
run: |
source test_env/bin/activate
mkdir -p /tmp/mooncake_test_ssd
export MOONCAKE_STORAGE_ROOT_DIR=/tmp/mooncake_test_ssd
MC_STORE_MEMCPY=false ./scripts/run_tests.sh
deactivate
rm -rf /tmp/mooncake_test_ssd
shell: bash

- name: Restart metadata server
run: |
PID=$(pgrep -f "mooncake_http_metadata_server"); kill -9 $PID
source test_env/bin/activate
mooncake_http_metadata_server --port 8080 &
shell: bash

- name: Run tests
run: |
source test_env/bin/activate
MC_STORE_MEMCPY=false ./scripts/run_tests.sh
deactivate
shell: bash


build-flags:
runs-on: ubuntu-22.04
strategy:
Expand Down
126 changes: 96 additions & 30 deletions mooncake-integration/store/store_py.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,21 @@ int DistributedObjectStore::initAll(const std::string &protocol_,
buffer_allocator_size, protocol_, device_name);
}

int DistributedObjectStore::allocateSlices(std::vector<Slice> &slices,
size_t length) {
uint64_t offset = 0;
while (offset < length) {
auto chunk_size = std::min(length - offset, kMaxSliceSize);
auto ptr = client_buffer_allocator_->allocate(chunk_size);
if (!ptr) {
return 1; // SliceGuard will handle cleanup
}
slices.emplace_back(Slice{ptr, chunk_size});
offset += chunk_size;
}
return 0;
}

int DistributedObjectStore::allocateSlices(std::vector<Slice> &slices,
const std::string &value) {
uint64_t offset = 0;
Expand Down Expand Up @@ -302,15 +317,22 @@ int DistributedObjectStore::allocateSlices(
length = 0;
if (object_info.replica_list.empty()) return -1;
auto &replica = object_info.replica_list[0];
for (auto &handle : replica.buffer_descriptors) {
auto chunk_size = handle.size_;
assert(chunk_size <= kMaxSliceSize);
auto ptr = client_buffer_allocator_->allocate(chunk_size);
if (!ptr) {
return 1; // SliceGuard will handle cleanup
if(replica.is_memory_replica() == false) {
auto &disk_descriptor =replica.get_disk_descriptor();
length = disk_descriptor.file_size;
return allocateSlices(slices, length);
}else{
auto &memory_descriptors = replica.get_memory_descriptor();
for (auto &handle : memory_descriptors.buffer_descriptors) {
auto chunk_size = handle.size_;
assert(chunk_size <= kMaxSliceSize);
auto ptr = client_buffer_allocator_->allocate(chunk_size);
if (!ptr) {
return 1; // SliceGuard will handle cleanup
}
slices.emplace_back(Slice{ptr, chunk_size});
length += chunk_size;
}
slices.emplace_back(Slice{ptr, chunk_size});
length += chunk_size;
}
return 0;
}
Expand Down Expand Up @@ -355,15 +377,26 @@ int DistributedObjectStore::allocateBatchedSlices(
// Get first replica
auto &replica = object_info_it->second[0];
uint64_t length = 0;
for (auto &handle : replica.buffer_descriptors) {
auto chunk_size = handle.size_;
assert(chunk_size <= kMaxSliceSize);
auto ptr = client_buffer_allocator_->allocate(chunk_size);
if (!ptr) {

if(replica.is_memory_replica() == false) {
auto &disk_descriptor =replica.get_disk_descriptor();
length = disk_descriptor.file_size;
auto result = allocateSlices(batched_slices[key], length);
if(result) {
return 1;
}
batched_slices[key].emplace_back(Slice{ptr, chunk_size});
length += chunk_size;
}else{
auto &memory_descriptors = replica.get_memory_descriptor();
for (auto &handle : memory_descriptors.buffer_descriptors) {
auto chunk_size = handle.size_;
assert(chunk_size <= kMaxSliceSize);
auto ptr = client_buffer_allocator_->allocate(chunk_size);
if (!ptr) {
return 1;
}
batched_slices[key].emplace_back(Slice{ptr, chunk_size});
length += chunk_size;
}
}
str_length_map.emplace(key, length);
}
Expand Down Expand Up @@ -712,8 +745,14 @@ int64_t DistributedObjectStore::getSize(const std::string &key) {
int64_t total_size = 0;
if (!object_info.replica_list.empty()) {
auto &replica = object_info.replica_list[0];
for (auto &handle : replica.buffer_descriptors) {
total_size += handle.size_;
if(replica.is_memory_replica() == false) {
auto &disk_descriptor = replica.get_disk_descriptor();
total_size = disk_descriptor.file_size;
}else{
auto &memory_descriptors = replica.get_memory_descriptor();
for (auto &handle : memory_descriptors.buffer_descriptors) {
total_size += handle.size_;
}
}
} else {
LOG(ERROR) << "Internal error: object_info.replica_list_size() is 0";
Expand Down Expand Up @@ -850,8 +889,13 @@ int DistributedObjectStore::get_into(const std::string &key, void *buffer,
}

auto &replica = object_info.replica_list[0];
for (auto &handle : replica.buffer_descriptors) {
total_size += handle.size_;
if(replica.is_memory_replica() == false) {
auto &disk_descriptor = replica.get_disk_descriptor();
total_size = disk_descriptor.file_size;
}else{
for (auto &handle : replica.get_memory_descriptor().buffer_descriptors) {
total_size += handle.size_;
}
}

// Check if user buffer is large enough
Expand All @@ -865,11 +909,19 @@ int DistributedObjectStore::get_into(const std::string &key, void *buffer,
std::vector<mooncake::Slice> slices;
uint64_t offset = 0;

for (auto &handle : replica.buffer_descriptors) {
auto chunk_size = handle.size_;
void *chunk_ptr = static_cast<char *>(buffer) + offset;
slices.emplace_back(Slice{chunk_ptr, chunk_size});
offset += chunk_size;
if(replica.is_memory_replica() == false) {
while(offset < total_size){
auto chunk_size = std::min(total_size - offset, kMaxSliceSize);
void *chunk_ptr = static_cast<char *>(buffer) + offset;
slices.emplace_back(Slice{chunk_ptr, chunk_size});
offset += chunk_size;
}
}else{
for (auto &handle : replica.get_memory_descriptor().buffer_descriptors) {
void *chunk_ptr = static_cast<char *>(buffer) + offset;
slices.emplace_back(Slice{chunk_ptr, handle.size_});
offset += handle.size_;
}
}

// Step 3: Read data directly into user buffer
Expand Down Expand Up @@ -984,8 +1036,13 @@ std::vector<int> DistributedObjectStore::batch_get_into(

auto &replica = replica_list[0];
uint64_t total_size = 0;
for (auto &handle : replica.buffer_descriptors) {
total_size += handle.size_;
if(replica.is_memory_replica() == false) {
auto &disk_descriptor = replica.get_disk_descriptor();
total_size = disk_descriptor.file_size;
}else{
for (auto &handle : replica.get_memory_descriptor().buffer_descriptors) {
total_size += handle.size_;
}
}

if (sizes[i] < total_size) {
Expand All @@ -998,10 +1055,19 @@ std::vector<int> DistributedObjectStore::batch_get_into(

uint64_t offset = 0;
std::vector<mooncake::Slice> key_slices;
for (auto &handle : replica.buffer_descriptors) {
void *chunk_ptr = static_cast<char *>(buffers[i]) + offset;
key_slices.emplace_back(Slice{chunk_ptr, handle.size_});
offset += handle.size_;
if(replica.is_memory_replica() == false) {
while(offset < total_size){
auto chunk_size = std::min(total_size - offset, kMaxSliceSize);
void *chunk_ptr = static_cast<char *>(buffers[i]) + offset;
key_slices.emplace_back(Slice{chunk_ptr, chunk_size});
offset += chunk_size;
}
}else{
for (auto &handle : replica.get_memory_descriptor().buffer_descriptors) {
void *chunk_ptr = static_cast<char *>(buffers[i]) + offset;
key_slices.emplace_back(Slice{chunk_ptr, handle.size_});
offset += handle.size_;
}
}
all_slices[key] = key_slices;
results[i] = static_cast<int>(total_size);
Expand Down
3 changes: 3 additions & 0 deletions mooncake-integration/store/store_py.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,9 @@ class DistributedObjectStore {
int64_t getSize(const std::string &key);

private:
int allocateSlices(std::vector<mooncake::Slice> &slices,
size_t length);

int allocateSlices(std::vector<mooncake::Slice> &slices,
const std::string &value);

Expand Down
36 changes: 27 additions & 9 deletions mooncake-store/include/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
#include "transfer_engine.h"
#include "transfer_task.h"
#include "types.h"
#include "thread_pool.h"
#include "storage_backend.h"

namespace mooncake {

Expand Down Expand Up @@ -95,9 +97,9 @@ class Client {
* @param slices Vector of slices to store the data
* @return ErrorCode indicating success/failure
*/
ErrorCode Get(const std::string& object_key, const ObjectInfo& object_info,
ErrorCode Get(const std::string& object_key, ObjectInfo& object_info,
std::vector<Slice>& slices);

/**
* @brief Transfers data using pre-queried object information
* @param object_keys Keys of the objects
Expand Down Expand Up @@ -204,7 +206,8 @@ class Client {
* @brief Private constructor to enforce creation through Create() method
*/
Client(const std::string& local_hostname,
const std::string& metadata_connstring);
const std::string& metadata_connstring,
const std::string& storage_root_dir);

/**
* @brief Internal helper functions for initialization and data transfer
Expand All @@ -215,26 +218,36 @@ class Client {
const std::string& protocol,
void** protocol_args);
ErrorCode TransferData(
const std::vector<AllocatedBuffer::Descriptor>& handles,
const Replica::Descriptor &replica,
std::vector<Slice>& slices, TransferRequest::OpCode op_code);
ErrorCode TransferWrite(
const std::vector<AllocatedBuffer::Descriptor>& handles,
const Replica::Descriptor &replica,
std::vector<Slice>& slices);
ErrorCode TransferRead(
const std::vector<AllocatedBuffer::Descriptor>& handles,
const Replica::Descriptor &replica,
std::vector<Slice>& slices);

/**
* @brief Prepare and use the storage backend for persisting data
*/
void PrepareStorageBackend(const std::string& storage_root_dir, const std::string& fsdir);

ErrorCode GetFromLocalFile(const std::string& object_key,
std::vector<Slice>& slices, ObjectInfo& object_info);

void PutToLocalFile(const std::string& object_key,
std::vector<Slice>& slices);

/**
* @brief Find the first complete replica from a replica list
* @param replica_list List of replicas to search through
* @param handles Output vector to store the buffer handles of the found
* replica
* @param replica the first complete replica (file or memory)
* @return ErrorCode::OK if found, ErrorCode::INVALID_REPLICA if no complete
* replica
*/
ErrorCode FindFirstCompleteReplica(
const std::vector<Replica::Descriptor>& replica_list,
std::vector<AllocatedBuffer::Descriptor>& handles);
Replica::Descriptor& replica);

// Core components
TransferEngine transfer_engine_;
Expand All @@ -248,6 +261,11 @@ class Client {
// Configuration
const std::string local_hostname_;
const std::string metadata_connstring_;
const std::string storage_root_dir_;

// Client persistent thread pool for async operations
ThreadPool write_thread_pool_;
std::shared_ptr<StorageBackend> storage_backend_;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we move the thread pool into the StorageManager? since it's meant for the StorageManager, not the client.


// For high availability
MasterViewHelper master_view_helper_;
Expand Down
5 changes: 4 additions & 1 deletion mooncake-store/include/ha_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ class MasterServiceSupervisor {
std::chrono::steady_clock::duration rpc_conn_timeout =
std::chrono::seconds(
0), // Client connection timeout. 0 = no timeout (infinite)
bool rpc_enable_tcp_no_delay = true);
bool rpc_enable_tcp_no_delay = true,
const std::string& cluster_id = DEFAULT_CLUSTER_ID);
int Start();
~MasterServiceSupervisor();

Expand Down Expand Up @@ -115,6 +116,8 @@ class MasterServiceSupervisor {

// Local hostname for leader election
std::string local_hostname_;

std::string cluster_id_;
};

} // namespace mooncake
Expand Down
Loading
Loading