diff --git a/mooncake-common/common.cmake b/mooncake-common/common.cmake index 983909972..13cfd2dc1 100644 --- a/mooncake-common/common.cmake +++ b/mooncake-common/common.cmake @@ -67,6 +67,7 @@ option(USE_REDIS "option for enable redis as metadata server" OFF) option(USE_HTTP "option for enable http as metadata server" ON) option(WITH_RUST_EXAMPLE "build the Rust interface and sample code for the transfer engine" OFF) option(WITH_METRICS "enable metrics and metrics reporting thread" ON) +option(USE_3FS "option for using 3FS storage backend" OFF) option(USE_LRU_MASTER "option for using LRU in master service" OFF) @@ -139,6 +140,10 @@ if (WITH_METRICS) message(STATUS "metrics is enabled") endif() +if(USE_3FS) + add_compile_definitions(USE_3FS) + message(STATUS "3FS storage backend is enabled") +endif() set(GFLAGS_USE_TARGET_NAMESPACE "true") find_package(yaml-cpp REQUIRED) diff --git a/mooncake-store/include/client.h b/mooncake-store/include/client.h index 350332c45..7cf8fb482 100644 --- a/mooncake-store/include/client.h +++ b/mooncake-store/include/client.h @@ -226,12 +226,8 @@ class Client { void PrepareStorageBackend(const std::string& storage_root_dir, const std::string& fsdir); - ErrorCode GetFromLocalFile(const std::string& object_key, - std::vector& slices, - std::vector& replicas); - void PutToLocalFile(const std::string& object_key, - std::vector& slices); + const std::vector& slices); /** * @brief Find the first complete replica from a replica list @@ -255,6 +251,7 @@ class Client { void SubmitTransfers(std::vector& ops); void WaitForTransfers(std::vector& ops); void FinalizeBatchPut(std::vector& ops); + void BatchPuttoLocalFile(std::vector& ops); std::vector> CollectResults( const std::vector& ops); diff --git a/mooncake-store/include/file_interface.h b/mooncake-store/include/file_interface.h new file mode 100644 index 000000000..e3aefddf1 --- /dev/null +++ b/mooncake-store/include/file_interface.h @@ -0,0 +1,162 @@ +#pragma once + +#include +#include +#include +#include +#include "types.h" +#include +#include +#include + +namespace mooncake { +class FileLockRAII { +public: + enum class LockType { READ, WRITE }; + + FileLockRAII(int fd, LockType type) : fd_(fd), locked_(false) { + if (type == LockType::READ) { + locked_ = (flock(fd_, LOCK_SH) == 0); + } else { + locked_ = (flock(fd_, LOCK_EX) == 0); + } + } + + ~FileLockRAII() { + if (locked_) { + flock(fd_, LOCK_UN); + } + } + + + FileLockRAII(const FileLockRAII&) = delete; + FileLockRAII& operator=(const FileLockRAII&) = delete; + + FileLockRAII(FileLockRAII&& other) noexcept + : fd_(other.fd_), locked_(other.locked_) { + other.locked_ = false; + } + + bool is_locked() const { return locked_; } + +private: + int fd_; + bool locked_; +}; + +/** + * @class LocalFile + * @brief RAII wrapper for file operations with thread-safe locking support + * + * Provides thread-safe file I/O operations including read/write and vectorized I/O. + * Implements proper resource management through RAII pattern. + */ +class StorageFile { +public: + + StorageFile(const std::string &filename, int fd) + : filename_(filename), fd_(fd), error_code_(ErrorCode::OK), is_locked_(false) {} + /** + * @brief Destructor + * @note Automatically closes the file and releases resources + */ + virtual ~StorageFile() = default; + + /** + * @brief Writes data from buffer to file + * @param buffer Input buffer containing data to write + * @param length Number of bytes to write + * @return tl::expected containing number of bytes written on success, or ErrorCode on failure + * @note Thread-safe operation with write locking + */ + virtual tl::expected write(const std::string &buffer, size_t length) = 0; + + /** + * @brief Writes data from buffer to file + * @param data Input span containing data to write + * @param length Number of bytes to write + * @return tl::expected containing number of bytes written on success, or ErrorCode on failure + * @note Thread-safe operation with write locking + */ + virtual tl::expected write(std::span data, size_t length) = 0; + + /** + * @brief Reads data from file into buffer + * @param buffer Output buffer for read data + * @param length Maximum number of bytes to read + * @return tl::expected containing number of bytes read on success, or ErrorCode on failure + * @note Thread-safe operation with read locking + */ + virtual tl::expected read(std::string &buffer, size_t length) = 0; + + /** + * @brief Scattered write at specified file offset + * @param iov Array of I/O vectors + * @param iovcnt Number of elements in iov array + * @param offset File offset to write at + * @return tl::expected containing total bytes written on success, or ErrorCode on failure + * @note Thread-safe operation with write locking + */ + virtual tl::expected vector_write(const iovec *iov, int iovcnt, off_t offset) = 0; + + /** + * @brief Scattered read from specified file offset + * @param iov Array of I/O vectors + * @param iovcnt Number of elements in iov array + * @param offset File offset to read from + * @return tl::expected containing total bytes read on success, or ErrorCode on failure + * @note Thread-safe operation with read locking + */ + virtual tl::expected vector_read(const iovec *iov, int iovcnt, off_t offset) = 0; + + template + tl::expected make_error(ErrorCode code) { + error_code_ = code; + return tl::make_unexpected(code); + } + + /** + * @brief file locking mechanism + */ + FileLockRAII acquire_write_lock() { + return FileLockRAII(fd_, FileLockRAII::LockType::WRITE); + } + + FileLockRAII acquire_read_lock() { + return FileLockRAII(fd_, FileLockRAII::LockType::READ); + } + + /** + * @brief Gets the current error code + * @return Current error code + */ + ErrorCode get_error_code(){ + return error_code_; + } + +protected: + std::string filename_; + int fd_; + ErrorCode error_code_{ErrorCode::OK}; + std::atomic is_locked_{false}; +}; + +class PosixFile : public StorageFile { +public: + PosixFile(const std::string &filename, int fd); + ~PosixFile() override; + + tl::expected write(const std::string &buffer, size_t length) override; + tl::expected write(std::span data, size_t length) override; + tl::expected read(std::string &buffer, size_t length) override; + tl::expected vector_write(const iovec *iov, int iovcnt, off_t offset) override; + tl::expected vector_read(const iovec *iov, int iovcnt, off_t offset) override; +}; + +} // namespace mooncake + +#ifdef USE_3FS +#include +#endif + + diff --git a/mooncake-store/include/hf3fs/hf3fs.h b/mooncake-store/include/hf3fs/hf3fs.h new file mode 100644 index 000000000..48b6f01bf --- /dev/null +++ b/mooncake-store/include/hf3fs/hf3fs.h @@ -0,0 +1,101 @@ +#pragma once + +#include +#include +#include +#include +#include +#include "types.h" + +namespace mooncake { + +class StorageFile; + +// Forward declaration of USRBIOResourceManager +struct Hf3fsConfig { + // 3FS cluster related parameters + + // USRBIO related parameters + std::string mount_root = "/"; // Mount point root directory + size_t iov_size = 32 << 20; // Shared memory size (32MB) + size_t ior_entries = 16; // Maximum number of requests in IO ring + //`0` for no control with I/O depth. + // If greater than 0, then only when `io_depth` I/O requests are in queue, they will be issued to server as a batch. + // If smaller than 0, then USRBIO will wait for at most `-io_depth` I/O requests are in queue and issue them in one batch. + // If io_depth is 0, then USRBIO will issue all the prepared I/O requests to server ASAP. + size_t io_depth = 0; // IO batch processing depth + int ior_timeout = 0; // IO timeout (milliseconds) +}; + +class USRBIOResourceManager { +public: + + USRBIOResourceManager() {} + + void setDefaultParams(const Hf3fsConfig& config) { + default_config_ = config; + } + + struct ThreadUSRBIOResource* getThreadResource( + const Hf3fsConfig &config); + + struct ThreadUSRBIOResource* getThreadResource() { + return getThreadResource(default_config_); + } + + ~USRBIOResourceManager(); + + +private: + USRBIOResourceManager(const USRBIOResourceManager &) = delete; + USRBIOResourceManager &operator=(const USRBIOResourceManager &) = delete; + Hf3fsConfig default_config_; + + // Thread resources map protection lock + std::mutex resource_map_mutex; + + // ThreadID to resource mapping + std::unordered_map + thread_resources; +}; + +// Thread level USRBIO resource structure +struct ThreadUSRBIOResource { + // USRBIO resources + struct hf3fs_iov iov_; + struct hf3fs_ior ior_read_; + struct hf3fs_ior ior_write_; + + // Resource initialization status + bool initialized; + + // Resource belongs to parameters + Hf3fsConfig config_; + + ThreadUSRBIOResource() : initialized(false) {} + + // Initialize resource + bool Initialize(const Hf3fsConfig &config); + + // Cleanup resource + void Cleanup(); + + ~ThreadUSRBIOResource() { Cleanup(); } +}; + +class ThreeFSFile : public StorageFile { +public: + ThreeFSFile(const std::string &filename, int fd, USRBIOResourceManager* resource_manager); + ~ThreeFSFile() override; + + tl::expected write(const std::string &buffer, size_t length) override; + tl::expected write(std::span data, size_t length) override; + tl::expected read(std::string &buffer, size_t length) override; + tl::expected vector_write(const iovec *iov, int iovcnt, off_t offset) override; + tl::expected vector_read(const iovec *iov, int iovcnt, off_t offset) override; + +private: + USRBIOResourceManager* resource_manager_; +}; + +} \ No newline at end of file diff --git a/mooncake-store/include/local_file.h b/mooncake-store/include/local_file.h deleted file mode 100644 index 0fe59bce6..000000000 --- a/mooncake-store/include/local_file.h +++ /dev/null @@ -1,97 +0,0 @@ -#pragma once - -#include -#include -#include -#include "types.h" -#include - -namespace mooncake { - -/** - * @class LocalFile - * @brief RAII wrapper for file operations with thread-safe locking support - * - * Provides thread-safe file I/O operations including read/write and vectorized I/O. - * Implements proper resource management through RAII pattern. - */ -class LocalFile { -public: - /** - * @brief Constructs a LocalFile instance - * @param filename Path to the file being managed - * @param file FILE pointer to the opened file - * @param ec Initial error code (defaults to OK) - * @note Takes ownership of the FILE pointer - */ - explicit LocalFile(const std::string& filename, FILE *file, ErrorCode ec = ErrorCode::OK); - - /** - * @brief Destructor - * @note Automatically closes the file and releases resources - */ - ~LocalFile(); - - /** - * @brief Reads data from file into buffer - * @param buffer Output buffer for read data - * @param length Maximum number of bytes to read - * @return Number of bytes read on success, -1 on error - * @note Thread-safe operation with read locking - */ - ssize_t read(std::string &buffer, size_t length); - - /** - * @brief Writes data from buffer to file - * @param buffer Input buffer containing data to write - * @param length Number of bytes to write - * @return Number of bytes written on success, -1 on error - * @note Thread-safe operation with write locking - */ - ssize_t write(const std::string &buffer, size_t length); - - /** - * @brief Scattered read from specified file offset - * @param iov Array of I/O vectors - * @param iovcnt Number of elements in iov array - * @param offset File offset to read from - * @return Total bytes read on success, -1 on error - * @note Thread-safe operation with read locking - */ - ssize_t preadv(const iovec *iov, int iovcnt, off_t offset); - - /** - * @brief Scattered write at specified file offset - * @param iov Array of I/O vectors - * @param iovcnt Number of elements in iov array - * @param offset File offset to write at - * @return Total bytes written on success, -1 on error - * @note Thread-safe operation with write locking - */ - ssize_t pwritev(const iovec *iov, int iovcnt, off_t offset); - - /** - * @brief Gets the current error code - * @return Current error code - */ - ErrorCode get_error_code(){ - return error_code_; - } - -private: - /** - * @brief file locking mechanism - */ - int acquire_write_lock(); - int acquire_read_lock(); - int release_lock(); - - std::string filename_; - FILE *file_; - ErrorCode error_code_; - std::atomic is_locked_{false}; -}; - -} // namespace mooncake - - diff --git a/mooncake-store/include/storage_backend.h b/mooncake-store/include/storage_backend.h index 8941a8bc3..eb01efd34 100644 --- a/mooncake-store/include/storage_backend.h +++ b/mooncake-store/include/storage_backend.h @@ -5,7 +5,7 @@ #include #include #include -#include +#include #include #include #include @@ -25,7 +25,18 @@ class StorageBackend { * @param fsdir subdirectory name * @note Directory existence is not checked in constructor */ - explicit StorageBackend(const std::string& root_dir, const std::string& fsdir): root_dir_(root_dir), fsdir_(fsdir) {} + #ifdef USE_3FS + explicit StorageBackend(const std::string& root_dir, const std::string& fsdir, bool is_3fs_dir) + : root_dir_(root_dir), fsdir_(fsdir), is_3fs_dir_(is_3fs_dir) { + resource_manager_ = std::make_unique(); + Hf3fsConfig config; + config.mount_root = root_dir; + resource_manager_->setDefaultParams(config); + } + #else + explicit StorageBackend(const std::string& root_dir, const std::string& fsdir) + : root_dir_(root_dir), fsdir_(fsdir) {} + #endif /** * @brief Factory method to create a StorageBackend instance @@ -49,49 +60,67 @@ class StorageBackend { LOG(INFO) << "FSDIR cannot be empty"; return nullptr; } + + fs::path root_path(root_dir); + std::string real_fsdir = "moon_" + fsdir; + #ifdef USE_3FS + bool is_3fs_dir = fs::exists(root_path / "3fs-virt") && + fs::is_directory(root_path / "3fs-virt"); + return std::make_shared(root_dir, real_fsdir, is_3fs_dir); + #else return std::make_shared(root_dir, real_fsdir); + #endif } /** * @brief Stores an object composed of multiple slices * @param key Object identifier * @param slices Vector of data slices to store - * @return ErrorCode indicating operation status + * @return tl::expected indicating operation status */ - ErrorCode StoreObject(const ObjectKey& key, const std::vector& slices) ; - + tl::expected StoreObject(const ObjectKey& key, const std::vector& slices) ; + /** * @brief Stores an object from a string * @param key Object identifier * @param str String containing object data - * @return ErrorCode indicating operation status + * @return tl::expected indicating operation status */ - ErrorCode StoreObject(const ObjectKey& key, const std::string& str) ; - + tl::expected StoreObject(const ObjectKey& key, const std::string& str) ; + /** - * @brief Loads an object into slices + * @brief Stores an object from a span of data * @param key Object identifier - * @param slices Output vector for loaded data slices - * @return ErrorCode indicating operation status + * @param data Span containing object data + * @return tl::expected indicating operation status */ - ErrorCode LoadObject(const ObjectKey& key, std::vector& slices) ; - + tl::expected StoreObject(const ObjectKey& key, std::span data); + /** * @brief Loads an object into slices - * @param key Object identifier + * @param path KVCache File path to load from * @param slices Output vector for loaded data slices - * @return ErrorCode indicating operation status + * @param length Expected length of data to read + * @return tl::expected indicating operation status */ - ErrorCode LoadObject(const ObjectKey& key, std::vector& slices, std::string& path) ; + tl::expected LoadObject(std::string& path, std::vector& slices, size_t length) ; /** * @brief Loads an object as a string - * @param key Object identifier + * @param path KVCache File path to load from * @param str Output string for loaded data - * @return ErrorCode indicating operation status + * @param length Expected length of data to read + * @return tl::expected indicating operation status */ - ErrorCode LoadObject(const ObjectKey& key, std::string& str) ; + tl::expected LoadObject(std::string& path, std::string& str, size_t length) ; + + /** + * @brief Checks if an object with the given key exists + * @param key Object identifier + * @return bool indicating whether the object exists + */ + bool Existkey(const ObjectKey& key) ; /** * @brief Queries metadata for an object by key @@ -109,13 +138,6 @@ class StorageBackend { */ std::unordered_map BatchQueryKey(const std::vector& keys); - /** - * @brief Checks if an object with the given key exists - * @param key Object identifier - * @return ErrorCode::OK if exists, ErrorCode::FILE_NOT_FOUND if not exists, other ErrorCode for errors - */ - ErrorCode Existkey(const ObjectKey& key) ; - /** * @brief Deletes the physical file associated with the given object key * @param key Object identifier @@ -129,10 +151,19 @@ class StorageBackend { */ void RemoveAll() ; + enum class FileMode { + Read, + Write + }; // Root directory path for storage and subdirectory name std::string root_dir_; std::string fsdir_; - + + #ifdef USE_3FS + bool is_3fs_dir_{false}; // Flag to indicate if the storage is using 3FS directory structure + std::unique_ptr resource_manager_; + #endif + private: /** * @brief Sanitizes object key for filesystem safety @@ -144,8 +175,14 @@ class StorageBackend { */ std::string ResolvePath(const ObjectKey& key) const; - ErrorCode LoadObjectInPath(const std::string& path, - std::vector& slices); + /** + * @brief Creates a file object for the specified path and mode + * @param path Filesystem path for the file + * @param mode File access mode (read/write) + * @return Unique pointer to the created StorageFile, or nullptr on failure + */ + std::unique_ptr create_file(const std::string& path, + FileMode mode) const; }; diff --git a/mooncake-store/src/CMakeLists.txt b/mooncake-store/src/CMakeLists.txt index 157f9a6e1..bc3d94e82 100644 --- a/mooncake-store/src/CMakeLists.txt +++ b/mooncake-store/src/CMakeLists.txt @@ -10,7 +10,6 @@ set(MOONCAKE_STORE_SOURCES utils.cpp master_metric_manager.cpp storage_backend.cpp - local_file.cpp thread_pool.cpp etcd_helper.cpp ha_helper.cpp @@ -20,12 +19,28 @@ set(MOONCAKE_STORE_SOURCES ha_helper.cpp rpc_service.cpp offset_allocator.cpp + posix_file.cpp + ) +set(EXTRA_LIBS "") + +if(USE_3FS) + add_subdirectory(hf3fs) + list(APPEND MOONCAKE_STORE_SOURCES ${HF3FS_SOURCES}) + find_library(HF3FS_API_LIB hf3fs_api_shared PATHS /usr/lib NO_DEFAULT_PATH) + if(NOT HF3FS_API_LIB) + message(FATAL_ERROR "hf3fs_api_shared library not found in /usr/lib") + endif() + set(EXTRA_LIBS ${HF3FS_API_LIB}) +endif() + # The cache_allocator library include_directories(${Python3_INCLUDE_DIRS}) add_library(mooncake_store ${MOONCAKE_STORE_SOURCES}) -target_link_libraries(mooncake_store PUBLIC transfer_engine ${ETCD_WRAPPER_LIB} glog::glog gflags::gflags) +target_link_libraries(mooncake_store PUBLIC transfer_engine ${ETCD_WRAPPER_LIB} glog::glog gflags::gflags + ${EXTRA_LIBS} +) if (STORE_USE_ETCD) add_dependencies(mooncake_store build_etcd_wrapper) endif() diff --git a/mooncake-store/src/client.cpp b/mooncake-store/src/client.cpp index 93769c901..604ed23f7 100644 --- a/mooncake-store/src/client.cpp +++ b/mooncake-store/src/client.cpp @@ -237,6 +237,7 @@ std::optional> Client::Create( "disabled."; } else { LOG(INFO) << "Storage root directory is: " << storage_root_dir; + LOG(INFO) << "Fs subdir is: " << response.value(); // Initialize storage backend client->PrepareStorageBackend(storage_root_dir, response.value()); } @@ -868,6 +869,23 @@ std::vector> Client::CollectResults( return results; } +void Client::BatchPuttoLocalFile(std::vector& ops) { + if (!storage_backend_) { + return; // No storage backend initialized + } + + for (const auto& op : ops) { + if (op.IsSuccessful()) { + // Store to local file if operation was successful + PutToLocalFile(op.key, op.slices); + } else { + LOG(ERROR) << "Skipping local file storage for key " << op.key + << " due to failure: " + << toString(op.result.error()); + } + } +} + std::vector> Client::BatchPut( const std::vector& keys, std::vector>& batched_slices, @@ -877,6 +895,7 @@ std::vector> Client::BatchPut( SubmitTransfers(ops); WaitForTransfers(ops); FinalizeBatchPut(ops); + BatchPuttoLocalFile(ops); return CollectResults(ops); } @@ -1014,6 +1033,12 @@ tl::expected Client::unregisterLocalMemory( tl::expected Client::IsExist(const std::string& key) { auto result = master_client_.ExistKey(key); if (!result) { + if(storage_backend_) { + // If master query fails, check storage backend + if (storage_backend_->Existkey(key)) { + return true; // Key exists in storage backend + } + } return tl::unexpected(result.error()); } return result.value(); @@ -1050,23 +1075,8 @@ void Client::PrepareStorageBackend(const std::string& storage_root_dir, } } -ErrorCode Client::GetFromLocalFile(const std::string& object_key, - std::vector& slices, - std::vector& replicas) { - if (!storage_backend_) { - return ErrorCode::FILE_READ_FAIL; - } - - ErrorCode err = storage_backend_->LoadObject(object_key, slices); - if (err != ErrorCode::OK) { - return err; - } - - return ErrorCode::OK; -} - void Client::PutToLocalFile(const std::string& key, - std::vector& slices) { + const std::vector& slices) { if (!storage_backend_) return; size_t total_size = 0; @@ -1074,6 +1084,11 @@ void Client::PutToLocalFile(const std::string& key, total_size += slice.size; } + // Currently, persistence is achieved through asynchronous writes, but before asynchronous + // writing in 3FS, significant performance degradation may occur due to data copying. + // Profiling reveals that the number of page faults triggered in this scenario is nearly double the normal count. + // Future plans include introducing a reuse buffer list to address this performance degradation issue. + std::string value; value.reserve(total_size); for (const auto& slice : slices) { diff --git a/mooncake-store/src/hf3fs/CMakeLists.txt b/mooncake-store/src/hf3fs/CMakeLists.txt new file mode 100644 index 000000000..63ae07f4e --- /dev/null +++ b/mooncake-store/src/hf3fs/CMakeLists.txt @@ -0,0 +1,8 @@ +set(HF3FS_SOURCES + ${CMAKE_CURRENT_SOURCE_DIR}/hf3fs_file.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/hf3fs_resource_manager.cpp +) + +include_directories("include") + +set(HF3FS_SOURCES ${HF3FS_SOURCES} PARENT_SCOPE) \ No newline at end of file diff --git a/mooncake-store/src/hf3fs/README.md b/mooncake-store/src/hf3fs/README.md new file mode 100644 index 000000000..d629dc434 --- /dev/null +++ b/mooncake-store/src/hf3fs/README.md @@ -0,0 +1,42 @@ +# Mooncake HF3FS Plugin + +This plugin implements 3FS native API (USRBIO) as a high-performance storage backend for Mooncake. + +## Prerequisites + +### 1. 3FS Installation +- Build and install [3FS](https://github.com/deepseek-ai/3FS/) +- Required library: `libhf3fs_api_shared.so` (Default location: `3FS_PATH/build/src/lib/api`) + → Install to: `/usr/lib/` +- Required header: `hf3fs_usrbio.h` (Default location: `3FS_PATH/src/lib/api`) + → Install to: `/usr/include/` + +### 2. Mooncake Configuration +- Enable 3FS support during CMake configuration: +```bash + +cmake -DUSE_3FS=ON ... +``` + +- Build and install Mooncake as usual. + +## Usage + +### Basic Operation +Set the environment variable to specify the 3FS mount point: +```bash + +MOONCAKE_STORAGE_ROOT_DIR=/path/to/3fs_mount_point python3 ... +``` +### Important Notes +1. The specified directory **must** be a 3FS mount point + - If not, the system will automatically fall back to POSIX API +2. For optimal performance: + - Ensure proper permissions on the 3FS mount point + - Verify 3FS service is running before execution + +### Example +```bash + +ROLE=prefill MOONCAKE_STORAGE_ROOT_DIR=/mnt/3fs python3 ./stress_cluster_benchmark.py +``` \ No newline at end of file diff --git a/mooncake-store/src/hf3fs/hf3fs_file.cpp b/mooncake-store/src/hf3fs/hf3fs_file.cpp new file mode 100644 index 000000000..0c7be83b2 --- /dev/null +++ b/mooncake-store/src/hf3fs/hf3fs_file.cpp @@ -0,0 +1,363 @@ +#include "file_interface.h" +#include +#include +#include +#include +#include + +namespace mooncake { + +ThreeFSFile::ThreeFSFile(const std::string& filename, int fd, USRBIOResourceManager* resource_manager) + : StorageFile(filename, fd), resource_manager_(resource_manager) {} + +ThreeFSFile::~ThreeFSFile() { + // Deregister and close file descriptor + if (fd_ >= 0) { + hf3fs_dereg_fd(fd_); + if (close(fd_) == -1) { + LOG(WARNING) << "Failed to close file: " << filename_; + } + fd_ = -1; + } + + // Delete potentially corrupted file if write failed + if (error_code_ == ErrorCode::FILE_WRITE_FAIL) { + if (::unlink(filename_.c_str()) == -1) { + LOG(ERROR) << "Failed to delete corrupted file: " << filename_; + } else { + LOG(INFO) << "Deleted corrupted file: " << filename_; + } + } +} + +tl::expected ThreeFSFile::write(const std::string& buffer, size_t length) { + return write(std::span(buffer.data(), length), length); +} + +tl::expected ThreeFSFile::write(std::span data, size_t length) { + // 1. Parameter validation + if (length == 0) { + return make_error(ErrorCode::FILE_INVALID_BUFFER); + } + + // 2. Get thread resources + auto* resource = resource_manager_->getThreadResource(); + if (!resource || !resource->initialized) { + return make_error(ErrorCode::FILE_OPEN_FAIL); + } + + // 3. Acquire write lock + auto lock = acquire_write_lock(); + if (!lock.is_locked()) { + return make_error(ErrorCode::FILE_LOCK_FAIL); + } + + // 4. Write in chunks + auto& threefs_iov = resource->iov_; + auto& ior_write = resource->ior_write_; + const char* data_ptr = data.data(); + size_t total_bytes_written = 0; + off_t current_offset = 0; + const size_t max_chunk_size = resource->config_.iov_size; + + while (total_bytes_written < length) { + // Calculate current chunk size + size_t chunk_size = std::min(length - total_bytes_written, max_chunk_size); + + // Copy data to shared buffer + memcpy(threefs_iov.base, data_ptr + total_bytes_written, chunk_size); + + // Prepare IO request + int ret = hf3fs_prep_io(&ior_write, &threefs_iov, false, + threefs_iov.base, fd_, current_offset, chunk_size, nullptr); + if (ret < 0) { + return make_error(ErrorCode::FILE_WRITE_FAIL); + } + + // Submit IO request + ret = hf3fs_submit_ios(&ior_write); + if (ret < 0) { + return make_error(ErrorCode::FILE_WRITE_FAIL); + } + + // Wait for IO completion + struct hf3fs_cqe cqe; + ret = hf3fs_wait_for_ios(&ior_write, &cqe, 1, 1, nullptr); + if (ret < 0 || cqe.result < 0) { + return make_error(ErrorCode::FILE_WRITE_FAIL); + } + + size_t bytes_written = cqe.result; + total_bytes_written += bytes_written; + current_offset += bytes_written; + + if (bytes_written < chunk_size) { + break; // Short write, possibly disk full + } + } + + if(total_bytes_written != length) { + return make_error(ErrorCode::FILE_WRITE_FAIL); + } + + return total_bytes_written; +} + +tl::expected ThreeFSFile::read(std::string& buffer, size_t length) { + // 1. Parameter validation + if (length == 0) { + return make_error(ErrorCode::FILE_INVALID_BUFFER); + } + + // 2. Get thread resources + auto* resource = resource_manager_->getThreadResource(); + if (!resource || !resource->initialized) { + return make_error(ErrorCode::FILE_OPEN_FAIL); + } + + // 3. Acquire read lock + auto lock = acquire_read_lock(); + if (!lock.is_locked()) { + return make_error(ErrorCode::FILE_LOCK_FAIL); + } + + // 4. Prepare buffer + buffer.clear(); + buffer.reserve(length); + size_t total_bytes_read = 0; + off_t current_offset = 0; + auto& threefs_iov = resource->iov_; + auto& ior_read = resource->ior_read_; + + // 5. Read in chunks + while (total_bytes_read < length) { + // Calculate current chunk size + size_t chunk_size = std::min( + length - total_bytes_read, + resource->config_.iov_size + ); + + // Prepare IO request + int ret = hf3fs_prep_io(&ior_read, &threefs_iov, true, + threefs_iov.base, fd_, current_offset, chunk_size, nullptr); + if (ret < 0) { + return make_error(ErrorCode::FILE_READ_FAIL); + } + + // Submit IO request + ret = hf3fs_submit_ios(&ior_read); + if (ret < 0) { + return make_error(ErrorCode::FILE_READ_FAIL); + } + + // Wait for IO completion + struct hf3fs_cqe cqe; + ret = hf3fs_wait_for_ios(&ior_read, &cqe, 1, 1, nullptr); + if (ret < 0 || cqe.result < 0) { + return make_error(ErrorCode::FILE_READ_FAIL); + } + + size_t bytes_read = cqe.result; + if (bytes_read == 0) { // EOF + break; + } + + // Append data to buffer + buffer.append(reinterpret_cast(threefs_iov.base), bytes_read); + total_bytes_read += bytes_read; + current_offset += bytes_read; + + if (bytes_read < chunk_size) { // Short read + break; + } + } + + if(total_bytes_read != length) { + return make_error(ErrorCode::FILE_READ_FAIL); + } + + return total_bytes_read; +} + +tl::expected ThreeFSFile::vector_write(const iovec* iov, int iovcnt, off_t offset) { + auto* resource = resource_manager_->getThreadResource(); + if (!resource || !resource->initialized) { + return make_error(ErrorCode::FILE_OPEN_FAIL); + } + + auto& threefs_iov = resource->iov_; + auto& ior_write = resource->ior_write_; + + auto lock = acquire_write_lock(); + if (!lock.is_locked()) { + return make_error(ErrorCode::FILE_LOCK_FAIL); + } + + // 1. Calculate total length + size_t total_length = 0; + for (int i = 0; i < iovcnt; ++i) { + total_length += iov[i].iov_len; + } + + size_t total_bytes_written = 0; + off_t current_offset = offset; + size_t bytes_remaining = total_length; + int current_iov_index = 0; + size_t current_iov_offset = 0; + + while (bytes_remaining > 0) { + // 2. Determine current write chunk size (not exceeding shared buffer size) + size_t current_chunk_size = std::min( + bytes_remaining, + resource->config_.iov_size + ); + + // 3. Copy data from user IOV to shared buffer + size_t bytes_copied = 0; + char* dest_ptr = reinterpret_cast(threefs_iov.base); + + while (bytes_copied < current_chunk_size && current_iov_index < iovcnt) { + const iovec* current_iov = &iov[current_iov_index]; + size_t copy_size = std::min( + current_chunk_size - bytes_copied, + current_iov->iov_len - current_iov_offset + ); + + memcpy( + dest_ptr + bytes_copied, + reinterpret_cast(current_iov->iov_base) + current_iov_offset, + copy_size + ); + + bytes_copied += copy_size; + current_iov_offset += copy_size; + + if (current_iov_offset >= current_iov->iov_len) { + current_iov_index++; + current_iov_offset = 0; + } + } + + // 4. Prepare and submit IO request + int ret = hf3fs_prep_io(&ior_write, &threefs_iov, false, + threefs_iov.base, fd_, current_offset, current_chunk_size, nullptr); + if (ret < 0) { + return make_error(ErrorCode::FILE_WRITE_FAIL); + } + + ret = hf3fs_submit_ios(&ior_write); + if (ret < 0) { + return make_error(ErrorCode::FILE_WRITE_FAIL); + } + + // 5. Wait for IO completion + struct hf3fs_cqe cqe; + ret = hf3fs_wait_for_ios(&ior_write, &cqe, 1, 1, nullptr); + if (ret < 0 || cqe.result < 0) { + return make_error(ErrorCode::FILE_WRITE_FAIL); + } + + size_t bytes_written = cqe.result; + total_bytes_written += bytes_written; + bytes_remaining -= bytes_written; + current_offset += bytes_written; + + if (bytes_written < current_chunk_size) { + break; // Short write, possibly disk full + } + } + + return total_bytes_written; +} + +tl::expected ThreeFSFile::vector_read(const iovec* iov, int iovcnt, off_t offset) { + auto* resource = resource_manager_->getThreadResource(); + if (!resource || !resource->initialized) { + return make_error(ErrorCode::FILE_OPEN_FAIL); + } + + auto& threefs_iov = resource->iov_; + auto& ior_read = resource->ior_read_; + + auto lock = acquire_read_lock(); + if (!lock.is_locked()) { + return make_error(ErrorCode::FILE_LOCK_FAIL); + } + + // Calculate total length + size_t total_length = 0; + for (int i = 0; i < iovcnt; ++i) { + total_length += iov[i].iov_len; + } + + size_t total_bytes_read = 0; + off_t current_offset = offset; + size_t bytes_remaining = total_length; + int current_iov_index = 0; + size_t current_iov_offset = 0; + + while(bytes_remaining > 0) { + // Determine current block size + size_t current_chunk_size = + std::min(bytes_remaining, resource->config_.iov_size); + + // Prepare IO request + int ret = hf3fs_prep_io(&ior_read, &threefs_iov, true, + threefs_iov.base, fd_, current_offset, current_chunk_size, nullptr); + if (ret < 0) { + return make_error(ErrorCode::FILE_READ_FAIL); + } + + // Submit IO request + ret = hf3fs_submit_ios(&ior_read); + if (ret < 0) { + return make_error(ErrorCode::FILE_READ_FAIL); + } + + // Wait for IO completion + struct hf3fs_cqe cqe; + ret = hf3fs_wait_for_ios(&ior_read, &cqe, 1, 1, nullptr); + size_t bytes_read = cqe.result; + if (ret < 0) { + return make_error(ErrorCode::FILE_READ_FAIL); + } + + // Copy data from shared buffer to user IOV + size_t bytes_to_copy = bytes_read; + char* src_ptr = reinterpret_cast(threefs_iov.base); + + while (bytes_to_copy > 0 && current_iov_index < iovcnt) { + const iovec* current_iov = &iov[current_iov_index]; + size_t copy_size = std::min( + bytes_to_copy, + current_iov->iov_len - current_iov_offset + ); + + memcpy( + static_cast(current_iov->iov_base) + current_iov_offset, + src_ptr, + copy_size + ); + + src_ptr += copy_size; + bytes_to_copy -= copy_size; + total_bytes_read += copy_size; + bytes_remaining -= copy_size; + current_offset += copy_size; + + current_iov_offset += copy_size; + if (current_iov_offset >= current_iov->iov_len) { + current_iov_index++; + current_iov_offset = 0; + } + } + if(bytes_read < current_chunk_size) { + // If bytes read is less than requested chunk size, we've reached EOF + break; + } + } + + return total_bytes_read; +} + +} // namespace mooncake \ No newline at end of file diff --git a/mooncake-store/src/hf3fs/hf3fs_resource_manager.cpp b/mooncake-store/src/hf3fs/hf3fs_resource_manager.cpp new file mode 100644 index 000000000..0af52c1fa --- /dev/null +++ b/mooncake-store/src/hf3fs/hf3fs_resource_manager.cpp @@ -0,0 +1,92 @@ +#include "file_interface.h" + +namespace mooncake { +// ============================================================================ +// USRBIO Resource manager Implementation +// ============================================================================ +bool ThreadUSRBIOResource::Initialize(const Hf3fsConfig &config) { + if (initialized) { + return true; + } + + this->config_ = config; + + // Create shared memory + int ret = + hf3fs_iovcreate(&iov_, config.mount_root.c_str(), config.iov_size, 0, -1); + if (ret < 0) { + return false; + } + + // Create read I/O ring + ret = + hf3fs_iorcreate4(&ior_read_, config.mount_root.c_str(), config.ior_entries, + true, config.io_depth, config.ior_timeout, -1, 0); + if (ret < 0) { + hf3fs_iovdestroy(&iov_); + return false; + } + + // Create write I/O ring + ret = hf3fs_iorcreate4(&ior_write_, config.mount_root.c_str(), + config.ior_entries, false, config.io_depth, + config.ior_timeout, -1, 0); + if (ret < 0) { + hf3fs_iordestroy(&ior_read_); + hf3fs_iovdestroy(&iov_); + return false; + } + + initialized = true; + return true; +} + +void ThreadUSRBIOResource::Cleanup() { + if (!initialized) { + return; + } + + // Destroy USRBIO resources + hf3fs_iordestroy(&ior_write_); + hf3fs_iordestroy(&ior_read_); + hf3fs_iovdestroy(&iov_); + + initialized = false; +} + +// Resource manager implementation +struct ThreadUSRBIOResource *USRBIOResourceManager::getThreadResource( + const Hf3fsConfig &config) { + std::thread::id thread_id = std::this_thread::get_id(); + + { + std::lock_guard lock(resource_map_mutex); + + // Find if current thread already has resources + auto it = thread_resources.find(thread_id); + if (it != thread_resources.end()) { + return it->second; + } + + // Create new thread resources + ThreadUSRBIOResource *resource = new ThreadUSRBIOResource(); + if (!resource->Initialize(config)) { + delete resource; + return nullptr; + } + + // Store resource mapping + thread_resources[thread_id] = resource; + return resource; + } +} + +USRBIOResourceManager::~USRBIOResourceManager() { + // Clean up all thread resources + for (auto &pair : thread_resources) { + delete pair.second; + } + thread_resources.clear(); +} + +} \ No newline at end of file diff --git a/mooncake-store/src/local_file.cpp b/mooncake-store/src/local_file.cpp deleted file mode 100644 index 1a271626c..000000000 --- a/mooncake-store/src/local_file.cpp +++ /dev/null @@ -1,224 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include - -#include "local_file.h" - -namespace mooncake { -LocalFile::LocalFile(const std::string& filename,FILE *file,ErrorCode ec) : filename_(filename),file_(file),error_code_(ec) { - if (!file_ || ferror(file_)) { - error_code_ = ErrorCode::FILE_INVALID_HANDLE; - } else if (ec != ErrorCode::OK) { - error_code_ = ec; - } -} - -LocalFile::~LocalFile() { - if (file_) { - release_lock(); - if (fclose(file_) != 0) { - LOG(WARNING) << "Failed to close file: " << filename_; - } - // If the file was opened with an error code indicating a write failure, - // attempt to delete the file to prevent corruption. - if (error_code_ == ErrorCode::FILE_WRITE_FAIL) { - if (::unlink(filename_.c_str()) == -1) { - LOG(ERROR) << "Failed to delete corrupted file: " << filename_; - } else { - LOG(INFO) << "Deleted corrupted file: " << filename_; - } - } - } - file_ = nullptr; -} - -ssize_t LocalFile::write(const std::string &buffer, size_t length){ - if (file_ == nullptr) { - error_code_ = ErrorCode::FILE_NOT_FOUND; - return -1; - } - if (length == 0 || buffer.empty()) { - error_code_ = ErrorCode::FILE_INVALID_BUFFER; - return -1; - } - - if(length > static_cast(std::numeric_limits::max())) { - error_code_ = ErrorCode::FILE_INVALID_BUFFER; - return -1; - } - - if (acquire_write_lock() == -1) { - error_code_ = ErrorCode::FILE_LOCK_FAIL; - return -1; - } - - size_t remaining = length; - size_t written_bytes = 0; - const char* ptr = buffer.data(); - - while (remaining > 0) { - size_t written = fwrite(ptr, 1, remaining, file_); - if (written == 0) break; - remaining -= written; - ptr += written; - written_bytes += written; - } - - if (remaining > 0) { - error_code_ = ErrorCode::FILE_WRITE_FAIL; - return -1; - } - - if (release_lock() == -1) { - error_code_ = ErrorCode::FILE_LOCK_FAIL; - LOG(INFO) << "Failed to release lock on file: " << filename_; - } - - if (ferror(file_)) { - error_code_ = ErrorCode::FILE_WRITE_FAIL; - return -1; - } - - return written_bytes; -} - -ssize_t LocalFile::read(std::string &buffer, size_t length){ - if (file_ == nullptr) { - error_code_ = ErrorCode::FILE_NOT_FOUND; - return -1; - } - if (length == 0) { - error_code_ = ErrorCode::FILE_INVALID_BUFFER; - return -1; - } - - if(length > static_cast(std::numeric_limits::max())) { - error_code_ = ErrorCode::FILE_INVALID_BUFFER; - return -1; - } - - if (acquire_read_lock() == -1) { - error_code_ = ErrorCode::FILE_LOCK_FAIL; - return -1; - } - - buffer.resize(length); - size_t read_bytes = fread(&buffer[0], 1, length, file_); - - if (release_lock() == -1) { - error_code_ = ErrorCode::FILE_LOCK_FAIL; - LOG(INFO) << "Failed to release lock on file: " << filename_; - } - - if (ferror(file_)) { - error_code_ = ErrorCode::FILE_READ_FAIL; - buffer.clear(); - return -1; - } - - buffer.resize(read_bytes); // shrink to actual read size - return read_bytes; -} - -ssize_t LocalFile::pwritev(const iovec *iov, int iovcnt, off_t offset){ - if(!file_){ - error_code_ = ErrorCode::FILE_NOT_FOUND; - return -1; - } - - int fd=fileno(file_); - - if (fd == -1) { - error_code_ = ErrorCode::FILE_INVALID_HANDLE; - LOG(ERROR) << "Invalid file handle for: " << filename_; - return -1; - } - - if (acquire_write_lock() == -1) { - error_code_ = ErrorCode::FILE_LOCK_FAIL; - return -1; - } - - size_t total_length = 0; - for (int i = 0; i < iovcnt; ++i) { - total_length += iov[i].iov_len; - } - - ssize_t ret = ::pwritev(fd, iov, iovcnt, offset); - - if (release_lock() == -1) { - error_code_ = ErrorCode::FILE_LOCK_FAIL; - LOG(INFO) << "Failed to release lock on file: " << filename_; - } - if (ret < 0) { - error_code_ = ErrorCode::FILE_WRITE_FAIL; - return -1; - } - - return ret; -} - - -ssize_t LocalFile::preadv(const iovec *iov, int iovcnt, off_t offset){ - if(!file_){ - error_code_ = ErrorCode::FILE_NOT_FOUND; - return -1; - } - - int fd=fileno(file_); - - if (fd == -1) { - error_code_ = ErrorCode::FILE_INVALID_HANDLE; - LOG(ERROR) << "Invalid file handle for: " << filename_; - return -1; - } - - if (acquire_read_lock() == -1) { - error_code_ = ErrorCode::FILE_LOCK_FAIL; - return -1; - } - - ssize_t ret = ::preadv(fd, iov, iovcnt, offset); - - if (release_lock() == -1) { - error_code_ = ErrorCode::FILE_LOCK_FAIL; - LOG(INFO) << "Failed to release lock on file: " << filename_; - } - - if (ret < 0) { - error_code_ = ErrorCode::FILE_READ_FAIL; - return -1; - } - return ret; -} - -int LocalFile::acquire_write_lock(){ - if (flock(fileno(file_), LOCK_EX) == -1) { - return -1; - } - is_locked_ = true; - return 0; -} - -int LocalFile::acquire_read_lock(){ - if (flock(fileno(file_), LOCK_SH) == -1) { - return -1; - } - is_locked_ = true; - return 0; -} - -int LocalFile::release_lock(){ - if (!is_locked_) return 0; - if (flock(fileno(file_), LOCK_UN) == -1) { - return -1; - } - is_locked_ = false; - return 0; -} - -} \ No newline at end of file diff --git a/mooncake-store/src/posix_file.cpp b/mooncake-store/src/posix_file.cpp new file mode 100644 index 000000000..cceb553a2 --- /dev/null +++ b/mooncake-store/src/posix_file.cpp @@ -0,0 +1,151 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +#include "file_interface.h" + +namespace mooncake { +PosixFile::PosixFile(const std::string& filename, int fd) : StorageFile(filename, fd) { + if (fd < 0) { + error_code_ = ErrorCode::FILE_INVALID_HANDLE; + } +} + +PosixFile::~PosixFile() { + if (fd_ >= 0) { + if (close(fd_) != 0) { + LOG(WARNING) << "Failed to close file: " << filename_; + } + // If the file was opened with an error code indicating a write failure, + // attempt to delete the file to prevent corruption. + if (error_code_ == ErrorCode::FILE_WRITE_FAIL) { + if (::unlink(filename_.c_str()) == -1) { + LOG(ERROR) << "Failed to delete corrupted file: " << filename_; + } else { + LOG(INFO) << "Deleted corrupted file: " << filename_; + } + } + } + fd_ = -1; +} + +tl::expected PosixFile::write(const std::string &buffer, size_t length) { + return write(std::span(buffer.data(), length), length); +} + +tl::expected PosixFile::write(std::span data, size_t length) { + + if (fd_ < 0) { + return make_error(ErrorCode::FILE_NOT_FOUND); + } + + if (length == 0) { + return make_error(ErrorCode::FILE_INVALID_BUFFER); + } + + auto lock = acquire_write_lock(); + if (!lock.is_locked()) { + return make_error(ErrorCode::FILE_LOCK_FAIL); + } + + size_t remaining = length; + size_t written_bytes = 0; + const char* ptr = data.data(); + + while (remaining > 0) { + ssize_t written = ::write(fd_, ptr, remaining); + if (written == -1) { + if (errno == EINTR) continue; + return make_error(ErrorCode::FILE_WRITE_FAIL); + } + remaining -= written; + ptr += written; + written_bytes += written; + } + + if(written_bytes != length) { + return make_error(ErrorCode::FILE_WRITE_FAIL); + } + return written_bytes; +} + +tl::expected PosixFile::read(std::string &buffer, size_t length) { + + if (fd_ < 0) { + return make_error(ErrorCode::FILE_NOT_FOUND); + } + + if (length == 0) { + return make_error(ErrorCode::FILE_INVALID_BUFFER); + } + + auto lock = acquire_read_lock(); + if (!lock.is_locked()) { + return make_error(ErrorCode::FILE_LOCK_FAIL); + } + + buffer.resize(length); + size_t read_bytes = 0; + char* ptr = buffer.data(); + + while (read_bytes < length) { + ssize_t n = ::read(fd_, ptr, length - read_bytes); + if (n == -1) { + if (errno == EINTR) continue; + buffer.clear(); + return make_error(ErrorCode::FILE_READ_FAIL); + } + if (n == 0) break; // EOF + read_bytes += n; + ptr += n; + } + + buffer.resize(read_bytes); + if(read_bytes != length) { + return make_error(ErrorCode::FILE_READ_FAIL); + } + return read_bytes; +} + +tl::expected PosixFile::vector_write(const iovec *iov, int iovcnt, off_t offset) { + if (fd_ < 0) { + return make_error(ErrorCode::FILE_NOT_FOUND); + } + + auto lock = acquire_write_lock(); + if (!lock.is_locked()) { + return make_error(ErrorCode::FILE_LOCK_FAIL); + } + + ssize_t ret = ::pwritev(fd_, iov, iovcnt, offset); + if (ret < 0) { + return make_error(ErrorCode::FILE_WRITE_FAIL); + } + + return ret; +} + +tl::expected PosixFile::vector_read(const iovec *iov, int iovcnt, off_t offset) { + if (fd_ < 0) { + return make_error(ErrorCode::FILE_NOT_FOUND); + } + + auto lock = acquire_read_lock(); + if (!lock.is_locked()) { + return make_error(ErrorCode::FILE_LOCK_FAIL); + } + + ssize_t ret = ::preadv(fd_, iov, iovcnt, offset); + if (ret < 0) { + return make_error(ErrorCode::FILE_READ_FAIL); + } + + return ret; +} + +} // namespace mooncake \ No newline at end of file diff --git a/mooncake-store/src/storage_backend.cpp b/mooncake-store/src/storage_backend.cpp index b1d72ed30..4ed88a0a2 100644 --- a/mooncake-store/src/storage_backend.cpp +++ b/mooncake-store/src/storage_backend.cpp @@ -10,174 +10,142 @@ namespace mooncake { -ErrorCode StorageBackend::StoreObject(const ObjectKey& key, +tl::expected StorageBackend::StoreObject(const ObjectKey& key, const std::vector& slices) { std::string path = ResolvePath(key); - if(std::filesystem::exists(path) == true) { - return ErrorCode::FILE_OPEN_FAIL; + if(std::filesystem::exists(path)) { + return tl::make_unexpected(ErrorCode::FILE_OPEN_FAIL); } - FILE* file = fopen(path.c_str(), "wb"); - size_t slices_total_size = 0; - std::vector iovs; - + auto file = create_file(path, FileMode::Write); if (!file) { LOG(INFO) << "Failed to open file for writing: " << path; - return ErrorCode::FILE_OPEN_FAIL; - } - - LocalFile local_file(path,file,ErrorCode::OK); + return tl::make_unexpected(ErrorCode::FILE_OPEN_FAIL); + } + std::vector iovs; + size_t slices_total_size = 0; for (const auto& slice : slices) { iovec io{ slice.ptr, slice.size }; iovs.push_back(io); slices_total_size += slice.size; } - ssize_t ret = local_file.pwritev(iovs.data(), static_cast(iovs.size()), 0); - - if (ret < 0) { - LOG(INFO) << "pwritev failed for: " << path; - return ErrorCode::FILE_WRITE_FAIL; + auto write_result = file->vector_write(iovs.data(), static_cast(iovs.size()), 0); + if (!write_result) { + LOG(INFO) << "vector_write failed for: " << path << ", error: " << write_result.error(); + return tl::make_unexpected(write_result.error()); } - if (ret != static_cast(slices_total_size)) { + if (*write_result != slices_total_size) { LOG(INFO) << "Write size mismatch for: " << path - << ", expected: " << slices_total_size - << ", got: " << ret; - return ErrorCode::FILE_WRITE_FAIL; + << ", expected: " << slices_total_size + << ", got: " << *write_result; + return tl::make_unexpected(ErrorCode::FILE_WRITE_FAIL); } - // TODO: Determine whether the data has been completely and correctly written. - // If the write operation fails, the corresponding file should be deleted - // to prevent incomplete data from being found in subsequent get operations. - // Alternatively, a marking method can be used to record that the file is not a valid file. - // Note: fclose is not necessary here as LocalFile destructor will handle it - - return ErrorCode::OK; + return {}; } -ErrorCode StorageBackend::StoreObject(const ObjectKey& key, +tl::expected StorageBackend::StoreObject(const ObjectKey& key, const std::string& str) { + return StoreObject(key, std::span(str.data(), str.size())); +} + +tl::expected StorageBackend::StoreObject(const ObjectKey& key, + std::span data) { std::string path = ResolvePath(key); - if(std::filesystem::exists(path) == true) { - return ErrorCode::FILE_OPEN_FAIL; + if (std::filesystem::exists(path)) { + return tl::make_unexpected(ErrorCode::FILE_OPEN_FAIL); } - FILE* file = fopen(path.c_str(), "wb"); - size_t file_total_size=str.size(); - + auto file = create_file(path, FileMode::Write); if (!file) { - LOG(INFO) << "Failed to open file for reading: " << path; - return ErrorCode::FILE_OPEN_FAIL; + LOG(INFO) << "Failed to open file for writing: " << path; + return tl::make_unexpected(ErrorCode::FILE_OPEN_FAIL); } - LocalFile local_file(path,file,ErrorCode::OK); - - ssize_t ret = local_file.write(str, file_total_size); - - if (ret < 0) { - LOG(INFO) << "pwritev failed for: " << path; + size_t file_total_size = data.size(); + auto write_result = file->write(data, file_total_size); - return ErrorCode::FILE_WRITE_FAIL; + if (!write_result) { + LOG(INFO) << "Write failed for: " << path << ", error: " << write_result.error(); + return tl::make_unexpected(write_result.error()); } - if (ret != static_cast(file_total_size)) { + if (*write_result != file_total_size) { LOG(INFO) << "Write size mismatch for: " << path - << ", expected: " << file_total_size - << ", got: " << ret; - - return ErrorCode::FILE_WRITE_FAIL; + << ", expected: " << file_total_size + << ", got: " << *write_result; + return tl::make_unexpected(ErrorCode::FILE_WRITE_FAIL); } - // Note: fclose is not necessary here as LocalFile destructor will handle it - - return ErrorCode::OK; -} - -ErrorCode StorageBackend::LoadObject(const ObjectKey& key, - std::vector& slices) { - std::string path = ResolvePath(key); - return LoadObjectInPath(path,slices); -} -ErrorCode StorageBackend::LoadObject(const ObjectKey& key, - std::vector& slices, std::string& path) { - return LoadObjectInPath(path, slices); + return {}; } -ErrorCode StorageBackend::LoadObjectInPath(const std::string& path, - std::vector& slices) { - FILE* file = fopen(path.c_str(), "rb"); - size_t slices_total_size=0; - std::vector iovs; - +tl::expected StorageBackend::LoadObject(std::string& path, + std::vector& slices, size_t length) { + auto file = create_file(path, FileMode::Read); if (!file) { LOG(INFO) << "Failed to open file for reading: " << path; - return ErrorCode::FILE_OPEN_FAIL; + return tl::make_unexpected(ErrorCode::FILE_OPEN_FAIL); } - LocalFile local_file(path,file,ErrorCode::OK); - + std::vector iovs; for (const auto& slice : slices) { iovec io{ slice.ptr, slice.size }; iovs.push_back(io); - slices_total_size += slice.size; } - ssize_t ret = local_file.preadv(iovs.data(), static_cast(iovs.size()), 0); - - if (ret < 0) { - LOG(INFO) << "preadv failed for: " << path; - - return ErrorCode::FILE_READ_FAIL; + auto read_result = file->vector_read(iovs.data(), static_cast(iovs.size()), 0); + if (!read_result) { + LOG(INFO) << "vector_read failed for: " << path << ", error: " << read_result.error(); + return tl::make_unexpected(read_result.error()); } - if (ret != static_cast(slices_total_size)) { + if (*read_result != length) { LOG(INFO) << "Read size mismatch for: " << path - << ", expected: " << slices_total_size - << ", got: " << ret; - - return ErrorCode::FILE_READ_FAIL; + << ", expected: " << length + << ", got: " << *read_result; + return tl::make_unexpected(ErrorCode::FILE_READ_FAIL); } - // Note: fclose is not necessary here as LocalFile destructor will handle it - - return ErrorCode::OK; + return {}; } -ErrorCode StorageBackend::LoadObject(const ObjectKey& key, - std::string& str) { - std::string path = ResolvePath(key); - FILE* file = fopen(path.c_str(), "rb"); - size_t file_total_size=0; - +tl::expected StorageBackend::LoadObject(std::string& path, + std::string& str, size_t length) { + auto file = create_file(path, FileMode::Read); if (!file) { - return ErrorCode::FILE_OPEN_FAIL; + LOG(INFO) << "Failed to open file for reading: " << path; + return tl::make_unexpected(ErrorCode::FILE_OPEN_FAIL); } - fseek(file, 0, SEEK_END); - file_total_size = ftell(file); - fseek(file, 0, SEEK_SET); - - LocalFile local_file(path,file,ErrorCode::OK); - - ssize_t ret = local_file.read(str, file_total_size); - - if (ret < 0) { - LOG(INFO) << "preadv failed for: " << path; - return ErrorCode::FILE_READ_FAIL; + auto read_result = file->read(str, length); + if (!read_result) { + LOG(INFO) << "read failed for: " << path << ", error: " << read_result.error(); + return tl::make_unexpected(read_result.error()); } - if (ret != static_cast(file_total_size)) { + if (*read_result != length) { LOG(INFO) << "Read size mismatch for: " << path - << ", expected: " << file_total_size - << ", got: " << ret; - - return ErrorCode::FILE_READ_FAIL; + << ", expected: " << length + << ", got: " << *read_result; + return tl::make_unexpected(ErrorCode::FILE_READ_FAIL); } - // Note: fclose is not necessary here as LocalFile destructor will handle it + return {}; +} + +bool StorageBackend::Existkey(const ObjectKey& key) { + std::string path = ResolvePath(key); + namespace fs = std::filesystem; - return ErrorCode::OK; + // Check if the file exists + if (fs::exists(path)) { + return true; + } else { + return false; + } } std::optional StorageBackend::Querykey(const ObjectKey& key) { @@ -224,18 +192,6 @@ StorageBackend::BatchQueryKey(const std::vector& keys) { return result; } -ErrorCode StorageBackend::Existkey(const ObjectKey& key) { - std::string path = ResolvePath(key); - namespace fs = std::filesystem; - - // Check if the file exists - if (fs::exists(path)) { - return ErrorCode::OK; - } else { - return ErrorCode::FILE_NOT_FOUND; - } -} - void StorageBackend::RemoveFile(const ObjectKey& key) { std::string path = ResolvePath(key); namespace fs = std::filesystem; @@ -310,4 +266,36 @@ std::string StorageBackend::ResolvePath(const ObjectKey& key) const { return full_path.lexically_normal().string(); } +std::unique_ptr StorageBackend::create_file( + const std::string& path, FileMode mode) const { + int flags = O_CLOEXEC; + int access_mode = 0; + switch (mode) { + case FileMode::Read: + access_mode = O_RDONLY; + break; + case FileMode::Write: + access_mode = O_WRONLY | O_CREAT | O_TRUNC; + break; + } + + int fd = open(path.c_str(), flags | access_mode, 0644); + if (fd < 0) { + return nullptr; + } + +#ifdef USE_3FS + if (is_3fs_dir_) { + if (hf3fs_reg_fd(fd, 0) > 0) { + close(fd); + return nullptr; + } + return resource_manager_ ? + std::make_unique(path, fd, resource_manager_.get()) : nullptr; + } +#endif + + return std::make_unique(path, fd); +} + } // namespace mooncake \ No newline at end of file diff --git a/mooncake-store/src/transfer_task.cpp b/mooncake-store/src/transfer_task.cpp index da360d66e..96f2df830 100644 --- a/mooncake-store/src/transfer_task.cpp +++ b/mooncake-store/src/transfer_task.cpp @@ -12,8 +12,8 @@ namespace mooncake { // ============================================================================ // FilereadWorkerPool Implementation // ============================================================================ -//to fully utilize the available ssd bandwidth, we use a default of 8 worker threads. -constexpr int kDefaultFilereadWorkers = 8; +//to fully utilize the available ssd bandwidth, we use a default of 10 worker threads. +constexpr int kDefaultFilereadWorkers = 10; FilereadWorkerPool::FilereadWorkerPool(std::shared_ptr& backend) : shutdown_(false) { VLOG(1) << "Creating FilereadWorkerPool with " << kDefaultFilereadWorkers @@ -88,18 +88,18 @@ void FilereadWorkerPool::workerThread() { if (!backend_) { LOG(ERROR) << "Backend is not initialized, cannot load object"; task.state->set_completed(ErrorCode::TRANSFER_FAIL); - continue; + continue; } - auto error_code = backend_->LoadObject("", task.slices, task.file_path); - if(error_code == ErrorCode::OK){ - VLOG(2) << "Fileread task completed successfully with " - << task.file_path; + auto load_result = backend_->LoadObject(task.file_path, task.slices, task.file_size); + if (load_result) { + VLOG(2) << "Fileread task completed successfully with " + << task.file_path ; task.state->set_completed(ErrorCode::OK); - }else{ - LOG(ERROR) << "Fileread task failed for file: " - << task.file_path - << " with error code: " << toString(error_code); + } else { + LOG(ERROR) << "Fileread task failed for file: " + << task.file_path + << " with error: " << toString(load_result.error()); task.state->set_completed(ErrorCode::TRANSFER_FAIL); } } catch (const std::exception& e) { diff --git a/mooncake-store/tests/CMakeLists.txt b/mooncake-store/tests/CMakeLists.txt index 5b1f4c84a..59d71265a 100644 --- a/mooncake-store/tests/CMakeLists.txt +++ b/mooncake-store/tests/CMakeLists.txt @@ -46,8 +46,8 @@ target_link_libraries(stress_workload_test PUBLIC pthread ) -add_executable(local_file_test local_file_test.cpp) -target_link_libraries(local_file_test PUBLIC +add_executable(posix_file_test posix_file_test.cpp) +target_link_libraries(posix_file_test PUBLIC mooncake_store cachelib_memory_allocator glog @@ -55,7 +55,7 @@ target_link_libraries(local_file_test PUBLIC gtest_main pthread ) -add_test(NAME local_file_test COMMAND local_file_test) +add_test(NAME posix_file_test COMMAND posix_file_test) add_executable(thread_pool_test thread_pool_test.cpp) target_link_libraries(thread_pool_test PUBLIC diff --git a/mooncake-store/tests/local_file_test.cpp b/mooncake-store/tests/local_file_test.cpp deleted file mode 100644 index 85c068c74..000000000 --- a/mooncake-store/tests/local_file_test.cpp +++ /dev/null @@ -1,165 +0,0 @@ -#include -#include -#include -#include -#include -#include - -#include "local_file.h" - -namespace mooncake { - -// Test fixture for LocalFile tests -class LocalFileTest : public ::testing::Test { -protected: - void SetUp() override { - google::InitGoogleLogging("LocalFileTest"); - FLAGS_logtostderr = 1; - - // Create a test file - test_filename = "test_file.txt"; - std::ofstream outfile(test_filename); - outfile << "Initial test content"; - outfile.close(); - } - - void TearDown() override { - google::ShutdownGoogleLogging(); - - // Remove the test file - remove(test_filename.c_str()); - } - - std::string test_filename; -}; - -// Test basic file creation and destruction -TEST_F(LocalFileTest, FileLifecycle) { - FILE* file = fopen(test_filename.c_str(), "r+"); - ASSERT_NE(file, nullptr); - - LocalFile local_file(test_filename, file); - EXPECT_EQ(local_file.get_error_code(), ErrorCode::OK); - - // Destructor will close the file -} - -// Test basic write operation -TEST_F(LocalFileTest, BasicWrite) { - FILE* file = fopen(test_filename.c_str(), "r+"); - ASSERT_NE(file, nullptr); - - LocalFile local_file(test_filename, file); - - std::string test_data = "Test write data"; - ssize_t result = local_file.write(test_data, test_data.size()); - - EXPECT_EQ(result, test_data.size()); - EXPECT_EQ(local_file.get_error_code(), ErrorCode::OK); -} - -// Test basic read operation -TEST_F(LocalFileTest, BasicRead) { - // First write some data - { - FILE* file = fopen(test_filename.c_str(), "w"); - ASSERT_NE(file, nullptr); - LocalFile local_file(test_filename, file); - std::string test_data = "Test read data"; - local_file.write(test_data, test_data.size()); - } - - // Now read it back - FILE* file = fopen(test_filename.c_str(), "r"); - ASSERT_NE(file, nullptr); - - LocalFile local_file(test_filename, file); - - std::string buffer; - ssize_t result = local_file.read(buffer, 100); // Read up to 100 bytes - - EXPECT_EQ(result, 14); // "Test read data" is 14 bytes - EXPECT_EQ(buffer, "Test read data"); - EXPECT_EQ(local_file.get_error_code(), ErrorCode::OK); -} - -// Test vectorized write operation -TEST_F(LocalFileTest, VectorizedWrite) { - FILE* file = fopen(test_filename.c_str(), "r+"); - ASSERT_NE(file, nullptr); - - LocalFile local_file(test_filename, file); - - std::string data1 = "First part "; - std::string data2 = "Second part"; - - iovec iov[2]; - iov[0].iov_base = const_cast(data1.data()); - iov[0].iov_len = data1.size(); - iov[1].iov_base = const_cast(data2.data()); - iov[1].iov_len = data2.size(); - - ssize_t result = local_file.pwritev(iov, 2, 0); - - EXPECT_EQ(result, data1.size() + data2.size()); - EXPECT_EQ(local_file.get_error_code(), ErrorCode::OK); -} - -// Test vectorized read operation -TEST_F(LocalFileTest, VectorizedRead) { - // First write some data - { - FILE* file = fopen(test_filename.c_str(), "w"); - ASSERT_NE(file, nullptr); - LocalFile local_file(test_filename, file); - std::string test_data = "Vectorized read test data"; - local_file.write(test_data, test_data.size()); - } - - // Now read it back with vectorized read - FILE* file = fopen(test_filename.c_str(), "r"); - ASSERT_NE(file, nullptr); - - LocalFile local_file(test_filename, file); - - char buf1[10]; - char buf2[15]; - - iovec iov[2]; - iov[0].iov_base = buf1; - iov[0].iov_len = sizeof(buf1); - iov[1].iov_base = buf2; - iov[1].iov_len = sizeof(buf2); - - ssize_t result = local_file.preadv(iov, 2, 0); - - EXPECT_EQ(result, 25); // "Vectorized read test data" is 25 bytes - EXPECT_EQ(std::string(buf1, 10), "Vectorized"); - EXPECT_EQ(std::string(buf2, 15), " read test data"); - EXPECT_EQ(local_file.get_error_code(), ErrorCode::OK); -} - -// Test error cases -TEST_F(LocalFileTest, ErrorCases) { - // Test invalid file handle - LocalFile local_file("nonexistent.txt", nullptr, ErrorCode::OK); - EXPECT_EQ(local_file.get_error_code(), ErrorCode::FILE_INVALID_HANDLE); - - // Test write to invalid file - std::string test_data = "test"; - ssize_t result = local_file.write(test_data, test_data.size()); - EXPECT_EQ(result, -1); - - // Test read from invalid file - std::string buffer; - result = local_file.read(buffer, 10); - EXPECT_EQ(result, -1); -} - - -} // namespace mooncake - -int main(int argc, char** argv) { - ::testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -} \ No newline at end of file diff --git a/mooncake-store/tests/posix_file_test.cpp b/mooncake-store/tests/posix_file_test.cpp new file mode 100644 index 000000000..f15762d98 --- /dev/null +++ b/mooncake-store/tests/posix_file_test.cpp @@ -0,0 +1,174 @@ +#include +#include +#include +#include +#include +#include "file_interface.h" + +namespace mooncake { + +class PosixFileTest : public ::testing::Test { +protected: + void SetUp() override { + google::InitGoogleLogging("PosixFileTest"); + FLAGS_logtostderr = 1; + + // Create and open a test file + test_filename = "test_file.txt"; + test_fd = open(test_filename.c_str(), O_CREAT | O_RDWR, 0644); + ASSERT_GE(test_fd, 0) << "Failed to open test file"; + } + + void TearDown() override { + google::ShutdownGoogleLogging(); + if (test_fd >= 0) { + close(test_fd); + } + remove(test_filename.c_str()); + } + + std::string test_filename; + int test_fd = -1; +}; + +// Test basic file lifecycle +TEST_F(PosixFileTest, FileLifecycle) { + PosixFile posix_file(test_filename, test_fd); + EXPECT_EQ(posix_file.get_error_code(), ErrorCode::OK); + // Destructor will close the file +} + +// Test basic write operation +TEST_F(PosixFileTest, BasicWrite) { + PosixFile posix_file(test_filename, test_fd); + + std::string test_data = "Test write data"; + auto result = posix_file.write(test_data, test_data.size()); + + ASSERT_TRUE(result) << "Write failed with error: " << toString(result.error()); + EXPECT_EQ(*result, test_data.size()); + EXPECT_EQ(posix_file.get_error_code(), ErrorCode::OK); +} + +// Test basic read operation +TEST_F(PosixFileTest, BasicRead) { + // Clear file content + ASSERT_EQ(ftruncate(test_fd, 0), 0) << "Failed to truncate file"; + ASSERT_NE(lseek(test_fd, 0, SEEK_SET), -1) << "Seek failed"; + + // Write test data + const char* test_data = "Test read data"; + ssize_t written = write(test_fd, test_data, strlen(test_data)); + ASSERT_EQ(written, static_cast(strlen(test_data))) << "Write failed"; + ASSERT_NE(lseek(test_fd, 0, SEEK_SET), -1) << "Seek failed"; + + PosixFile posix_file(test_filename, test_fd); + + std::string buffer; + auto result = posix_file.read(buffer, strlen(test_data)); // Read up to test_data bytes + + ASSERT_TRUE(result) << "Read failed with error: " << toString(result.error()); + EXPECT_EQ(*result, strlen(test_data)); + EXPECT_EQ(buffer, test_data); + EXPECT_EQ(posix_file.get_error_code(), ErrorCode::OK); +} + +// Test vectorized write operation +TEST_F(PosixFileTest, VectorizedWrite) { + PosixFile posix_file(test_filename, test_fd); + + std::string data1 = "First part "; + std::string data2 = "Second part"; + + iovec iov[2]; + iov[0].iov_base = const_cast(data1.data()); + iov[0].iov_len = data1.size(); + iov[1].iov_base = const_cast(data2.data()); + iov[1].iov_len = data2.size(); + + auto result = posix_file.vector_write(iov, 2, 0); + + ASSERT_TRUE(result) << "Vector write failed with error: " << toString(result.error()); + EXPECT_EQ(*result, data1.size() + data2.size()); + EXPECT_EQ(posix_file.get_error_code(), ErrorCode::OK); +} + +// Test vectorized read operation +TEST_F(PosixFileTest, VectorizedRead) { + // Clear file content + ASSERT_EQ(ftruncate(test_fd, 0), 0) << "Failed to truncate file"; + ASSERT_NE(lseek(test_fd, 0, SEEK_SET), -1) << "Seek failed"; + + // Write test data + const char* test_data = "Vectorized read test data"; + ssize_t written = write(test_fd, test_data, strlen(test_data)); + ASSERT_EQ(written, static_cast(strlen(test_data))) << "Write failed"; + ASSERT_NE(lseek(test_fd, 0, SEEK_SET), -1) << "Seek failed"; + + PosixFile posix_file(test_filename, test_fd); + + char buf1[11] = {0}; // "Vectorized" + null + char buf2[16] = {0}; // " read test data" + null + + iovec iov[2]; + iov[0].iov_base = buf1; + iov[0].iov_len = 10; // Exact length of "Vectorized" + iov[1].iov_base = buf2; + iov[1].iov_len = 15; // Exact length of " read test data" + + auto result = posix_file.vector_read(iov, 2, 0); + + ASSERT_TRUE(result) << "Vector read failed with error: " << toString(result.error()); + EXPECT_EQ(*result, strlen(test_data)); + EXPECT_STREQ(buf1, "Vectorized"); + EXPECT_STREQ(buf2, " read test data"); + EXPECT_EQ(posix_file.get_error_code(), ErrorCode::OK); +} + +// Test error cases +TEST_F(PosixFileTest, ErrorCases) { + // Test invalid file descriptor + PosixFile posix_file("invalid.txt", -1); + EXPECT_EQ(posix_file.get_error_code(), ErrorCode::FILE_INVALID_HANDLE); + + // Test write to invalid file + std::string test_data = "test"; + auto write_result = posix_file.write(test_data, test_data.size()); + EXPECT_FALSE(write_result); + EXPECT_EQ(write_result.error(), ErrorCode::FILE_NOT_FOUND); + + // Test read from invalid file + std::string buffer; + auto read_result = posix_file.read(buffer, test_data.size()); + EXPECT_FALSE(read_result); + EXPECT_EQ(read_result.error(), ErrorCode::FILE_NOT_FOUND); +} + +// Test file locking +TEST_F(PosixFileTest, FileLocking) { + PosixFile posix_file(test_filename, test_fd); + + { + // Acquire write lock + auto lock = posix_file.acquire_write_lock(); + EXPECT_TRUE(lock.is_locked()); + + // Try to read while locked + std::string buffer; + auto result = posix_file.read(buffer, 10); + EXPECT_FALSE(result); + } + + { + // Acquire read lock + auto lock = posix_file.acquire_read_lock(); + EXPECT_TRUE(lock.is_locked()); + } +} + +} // namespace mooncake + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} \ No newline at end of file diff --git a/mooncake-store/tests/stress_cluster_benchmark.py b/mooncake-store/tests/stress_cluster_benchmark.py index 5bf1abf89..5e7ac56c1 100644 --- a/mooncake-store/tests/stress_cluster_benchmark.py +++ b/mooncake-store/tests/stress_cluster_benchmark.py @@ -177,6 +177,10 @@ def __init__(self, args): def setup(self): """Initialize the MooncakeDistributedStore and allocate registered memory.""" + if self.args.root_dir: + os.environ["MOONCAKE_STORAGE_ROOT_DIR"] = self.args.root_dir + logger.info(f"Set storage root directory to: {self.args.root_dir}") + self.store = MooncakeDistributedStore() self.performance_tracker.start_timer() @@ -441,6 +445,7 @@ def parse_arguments(): parser.add_argument("--value-length", type=int, default=4*1024*1024, help="Size of each value in bytes") parser.add_argument("--batch-size", type=int, default=1, help="Batch size for operations") parser.add_argument("--wait-time", type=int, default=20, help="Wait time in seconds after operations complete") + parser.add_argument("--root-dir", type=str, default="", help="Root directory for storage (sets MOONCAKE_STORAGE_ROOT_DIR)") # Multi-threading parameters parser.add_argument("--num-workers", type=int, default=1,