Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions mooncake-common/common.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ option(USE_REDIS "option for enable redis as metadata server" OFF)
option(USE_HTTP "option for enable http as metadata server" ON)
option(WITH_RUST_EXAMPLE "build the Rust interface and sample code for the transfer engine" OFF)
option(WITH_METRICS "enable metrics and metrics reporting thread" ON)
option(USE_3FS "option for using 3FS storage backend" OFF)


option(USE_LRU_MASTER "option for using LRU in master service" OFF)
Expand Down Expand Up @@ -139,6 +140,10 @@ if (WITH_METRICS)
message(STATUS "metrics is enabled")
endif()

if(USE_3FS)
add_compile_definitions(USE_3FS)
message(STATUS "3FS storage backend is enabled")
endif()

set(GFLAGS_USE_TARGET_NAMESPACE "true")
find_package(yaml-cpp REQUIRED)
Expand Down
7 changes: 2 additions & 5 deletions mooncake-store/include/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,8 @@ class Client {
void PrepareStorageBackend(const std::string& storage_root_dir,
const std::string& fsdir);

ErrorCode GetFromLocalFile(const std::string& object_key,
std::vector<Slice>& slices,
std::vector<Replica::Descriptor>& replicas);

void PutToLocalFile(const std::string& object_key,
std::vector<Slice>& slices);
const std::vector<Slice>& slices);

/**
* @brief Find the first complete replica from a replica list
Expand All @@ -255,6 +251,7 @@ class Client {
void SubmitTransfers(std::vector<PutOperation>& ops);
void WaitForTransfers(std::vector<PutOperation>& ops);
void FinalizeBatchPut(std::vector<PutOperation>& ops);
void BatchPuttoLocalFile(std::vector<PutOperation>& ops);
std::vector<tl::expected<void, ErrorCode>> CollectResults(
const std::vector<PutOperation>& ops);

Expand Down
162 changes: 162 additions & 0 deletions mooncake-store/include/file_interface.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
#pragma once

#include <string>
#include <unordered_map>
#include <sys/uio.h>
#include <cstdio>
#include "types.h"
#include <atomic>
#include <thread>
#include <sys/file.h>

namespace mooncake {
class FileLockRAII {
public:
enum class LockType { READ, WRITE };

FileLockRAII(int fd, LockType type) : fd_(fd), locked_(false) {
if (type == LockType::READ) {
locked_ = (flock(fd_, LOCK_SH) == 0);
} else {
locked_ = (flock(fd_, LOCK_EX) == 0);
}
}

~FileLockRAII() {
if (locked_) {
flock(fd_, LOCK_UN);
}
}


FileLockRAII(const FileLockRAII&) = delete;
FileLockRAII& operator=(const FileLockRAII&) = delete;

FileLockRAII(FileLockRAII&& other) noexcept
: fd_(other.fd_), locked_(other.locked_) {
other.locked_ = false;
}

bool is_locked() const { return locked_; }

private:
int fd_;
bool locked_;
};

/**
* @class LocalFile
* @brief RAII wrapper for file operations with thread-safe locking support
*
* Provides thread-safe file I/O operations including read/write and vectorized I/O.
* Implements proper resource management through RAII pattern.
*/
class StorageFile {
public:

StorageFile(const std::string &filename, int fd)
: filename_(filename), fd_(fd), error_code_(ErrorCode::OK), is_locked_(false) {}
/**
* @brief Destructor
* @note Automatically closes the file and releases resources
*/
virtual ~StorageFile() = default;

/**
* @brief Writes data from buffer to file
* @param buffer Input buffer containing data to write
* @param length Number of bytes to write
* @return tl::expected<size_t, ErrorCode> containing number of bytes written on success, or ErrorCode on failure
* @note Thread-safe operation with write locking
*/
virtual tl::expected<size_t, ErrorCode> write(const std::string &buffer, size_t length) = 0;

/**
* @brief Writes data from buffer to file
* @param data Input span containing data to write
* @param length Number of bytes to write
* @return tl::expected<size_t, ErrorCode> containing number of bytes written on success, or ErrorCode on failure
* @note Thread-safe operation with write locking
*/
virtual tl::expected<size_t, ErrorCode> write(std::span<const char> data, size_t length) = 0;

/**
* @brief Reads data from file into buffer
* @param buffer Output buffer for read data
* @param length Maximum number of bytes to read
* @return tl::expected<size_t, ErrorCode> containing number of bytes read on success, or ErrorCode on failure
* @note Thread-safe operation with read locking
*/
virtual tl::expected<size_t, ErrorCode> read(std::string &buffer, size_t length) = 0;

/**
* @brief Scattered write at specified file offset
* @param iov Array of I/O vectors
* @param iovcnt Number of elements in iov array
* @param offset File offset to write at
* @return tl::expected<size_t, ErrorCode> containing total bytes written on success, or ErrorCode on failure
* @note Thread-safe operation with write locking
*/
virtual tl::expected<size_t, ErrorCode> vector_write(const iovec *iov, int iovcnt, off_t offset) = 0;

/**
* @brief Scattered read from specified file offset
* @param iov Array of I/O vectors
* @param iovcnt Number of elements in iov array
* @param offset File offset to read from
* @return tl::expected<size_t, ErrorCode> containing total bytes read on success, or ErrorCode on failure
* @note Thread-safe operation with read locking
*/
virtual tl::expected<size_t, ErrorCode> vector_read(const iovec *iov, int iovcnt, off_t offset) = 0;

template<typename T>
tl::expected<T, ErrorCode> make_error(ErrorCode code) {
error_code_ = code;
return tl::make_unexpected(code);
}

/**
* @brief file locking mechanism
*/
FileLockRAII acquire_write_lock() {
return FileLockRAII(fd_, FileLockRAII::LockType::WRITE);
}

FileLockRAII acquire_read_lock() {
return FileLockRAII(fd_, FileLockRAII::LockType::READ);
}

/**
* @brief Gets the current error code
* @return Current error code
*/
ErrorCode get_error_code(){
return error_code_;
}

protected:
std::string filename_;
int fd_;
ErrorCode error_code_{ErrorCode::OK};
std::atomic<bool> is_locked_{false};
};

class PosixFile : public StorageFile {
public:
PosixFile(const std::string &filename, int fd);
~PosixFile() override;

tl::expected<size_t, ErrorCode> write(const std::string &buffer, size_t length) override;
tl::expected<size_t, ErrorCode> write(std::span<const char> data, size_t length) override;
tl::expected<size_t, ErrorCode> read(std::string &buffer, size_t length) override;
tl::expected<size_t, ErrorCode> vector_write(const iovec *iov, int iovcnt, off_t offset) override;
tl::expected<size_t, ErrorCode> vector_read(const iovec *iov, int iovcnt, off_t offset) override;
};

} // namespace mooncake

#ifdef USE_3FS
#include <hf3fs/hf3fs.h>
#endif


101 changes: 101 additions & 0 deletions mooncake-store/include/hf3fs/hf3fs.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
#pragma once

#include <string>
#include <unordered_map>
#include <mutex>
#include <thread>
#include <hf3fs_usrbio.h>
#include "types.h"

namespace mooncake {

class StorageFile;

// Forward declaration of USRBIOResourceManager
struct Hf3fsConfig {
// 3FS cluster related parameters

// USRBIO related parameters
std::string mount_root = "/"; // Mount point root directory
size_t iov_size = 32 << 20; // Shared memory size (32MB)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same, if value size bigger than 32MB, what will happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current implementation handles values exceeding iov_size by splitting the operation into multiple read-and-copy iterations within a loop (e.g., for 64MB data, it performs two passes to read into the iov and copy to slices).

size_t ior_entries = 16; // Maximum number of requests in IO ring
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the batch size is greater than 16, what will happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each thread has its own USRBIO resources (iov, ior, etc.), so ior is separeted in batchget now. Besides in the current implementation, only one I/O request is submitted to the ior at a time, waiting for completion before submitting the next, thus avoiding ior overflow (splitting 32MB into 4*8MB I/O requests showed no significant performance gain in local tests, so this approach was not adopted).

//`0` for no control with I/O depth.
// If greater than 0, then only when `io_depth` I/O requests are in queue, they will be issued to server as a batch.
// If smaller than 0, then USRBIO will wait for at most `-io_depth` I/O requests are in queue and issue them in one batch.
// If io_depth is 0, then USRBIO will issue all the prepared I/O requests to server ASAP.
size_t io_depth = 0; // IO batch processing depth
int ior_timeout = 0; // IO timeout (milliseconds)
};

class USRBIOResourceManager {
public:

USRBIOResourceManager() {}

void setDefaultParams(const Hf3fsConfig& config) {
default_config_ = config;
}

struct ThreadUSRBIOResource* getThreadResource(
const Hf3fsConfig &config);

struct ThreadUSRBIOResource* getThreadResource() {
return getThreadResource(default_config_);
}

~USRBIOResourceManager();


private:
USRBIOResourceManager(const USRBIOResourceManager &) = delete;
USRBIOResourceManager &operator=(const USRBIOResourceManager &) = delete;
Hf3fsConfig default_config_;

// Thread resources map protection lock
std::mutex resource_map_mutex;

// ThreadID to resource mapping
std::unordered_map<std::thread::id, struct ThreadUSRBIOResource *>
thread_resources;
};

// Thread level USRBIO resource structure
struct ThreadUSRBIOResource {
// USRBIO resources
struct hf3fs_iov iov_;
struct hf3fs_ior ior_read_;
struct hf3fs_ior ior_write_;

// Resource initialization status
bool initialized;

// Resource belongs to parameters
Hf3fsConfig config_;

ThreadUSRBIOResource() : initialized(false) {}

// Initialize resource
bool Initialize(const Hf3fsConfig &config);

// Cleanup resource
void Cleanup();

~ThreadUSRBIOResource() { Cleanup(); }
};

class ThreeFSFile : public StorageFile {
public:
ThreeFSFile(const std::string &filename, int fd, USRBIOResourceManager* resource_manager);
~ThreeFSFile() override;

tl::expected<size_t, ErrorCode> write(const std::string &buffer, size_t length) override;
tl::expected<size_t, ErrorCode> write(std::span<const char> data, size_t length) override;
tl::expected<size_t, ErrorCode> read(std::string &buffer, size_t length) override;
tl::expected<size_t, ErrorCode> vector_write(const iovec *iov, int iovcnt, off_t offset) override;
tl::expected<size_t, ErrorCode> vector_read(const iovec *iov, int iovcnt, off_t offset) override;

private:
USRBIOResourceManager* resource_manager_;
};

}
Loading
Loading