Skip to content
4 changes: 3 additions & 1 deletion doc/en/mooncake-store.md
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,9 @@ Currently, an approximate LRU policy is adopted, where the least recently used o

### Lease

To avoid data conflicts, a per-object lease will be granted whenever an `ExistKey` request or a `GetReplicaListRequest` request succeeds. An object is guaranteed to be protected from `Remove` request, `RemoveAll` request and `Eviction` task until its lease expires. A `Remove` request on a leased object will fail. A `RemoveAll` request will only remove objects without a lease.
To avoid data conflicts, a per-object lease is granted whenever an `ExistKey` request or a `GetReplicaListRequest` request succeeds. While the lease is active, the object is protected from `Remove`, `RemoveAll`, and `Eviction` operations. Specifically, a `Remove` request targeting a leased object will fail, and a `RemoveAll` request will only delete objects without an active lease. This ensures that the object’s data can be safely read as long as the lease has not expired.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a suggestion, maybe we could write "during lease, the object is read-only (or immutable)"

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The immutability of an object once it is put is a fundamental property of the store, not limited to the lease period. Let me introduce this feature in another section.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in another pr: 4cc34ba


However, if the lease expires before a `Get` operation finishes reading the data, the operation will be considered failed, and no data will be returned, in order to prevent potential data corruption.

The default lease TTL is 5 seconds and is configurable via a startup parameter of `master_service`.

Expand Down
4 changes: 3 additions & 1 deletion doc/zh/mooncake-store.md
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,9 @@ virtual std::unique_ptr<AllocatedBuffer> Allocate(

### 租约机制

为避免数据冲突,每当 `ExistKey` 请求或 `GetReplicaListRequest` 请求成功时,系统会为对应对象授予一个租约。在租约过期前,该对象将受到保护,不会被 `Remove`、`RemoveAll` 或替换任务删除。对有租约的对象执行 `Remove` 请求会失败;`RemoveAll` 请求则只会删除没有租约的对象。
为避免数据冲突,每当 `ExistKey` 请求或 `GetReplicaListRequest` 请求成功时,系统会为对应对象授予一个租约。在租约过期前,该对象将受到保护,不会被 `Remove`、`RemoveAll` 或替换任务删除。对有租约的对象执行 `Remove` 请求会失败;`RemoveAll` 请求则只会删除没有租约的对象。这保证了只要租约未过期,就可以安全地读取该对象的数据。

然而,如果在 `Get` 操作完成读取数据之前租约已过期,该操作将被视为失败,并且不会返回任何数据,以防止潜在的数据损坏。

默认的租约时间为 5 秒,并可通过 `master_service` 的启动参数进行配置。

Expand Down
5 changes: 4 additions & 1 deletion docs/source/design/mooncake-store.md
Original file line number Diff line number Diff line change
Expand Up @@ -468,9 +468,12 @@ The strategy automatically handles cases where the preferred segment is unavaila
When a `PutStart` request fails due to insufficient memory, or when the eviction thread detects that space usage has reached the configured high watermark (95% by default, configurable via `-eviction_high_watermark_ratio`), an eviction task is triggered to free up space by evicting a portion of objects (5% by default, configurable via `-eviction_ratio`). Similar to `Remove`, evicted objects are simply marked as deleted, with no data transfer required.

Currently, an approximate LRU policy is adopted, where the least recently used objects are preferred for eviction. To avoid data races and corruption, objects currently being read or written by clients should not be evicted. For this reason, objects that have leases or have not been marked as complete by `PutEnd` requests will be ignored by the eviction task.

### Lease

To avoid data conflicts, a per-object lease will be granted whenever an `ExistKey` request or a `GetReplicaListRequest` request succeeds. An object is guaranteed to be protected from `Remove` request, `RemoveAll` request and `Eviction` task until its lease expires. A `Remove` request on a leased object will fail. A `RemoveAll` request will only remove objects without a lease.
To avoid data conflicts, a per-object lease is granted whenever an `ExistKey` request or a `GetReplicaListRequest` request succeeds. While the lease is active, the object is protected from `Remove`, `RemoveAll`, and `Eviction` operations. Specifically, a `Remove` request targeting a leased object will fail, and a `RemoveAll` request will only delete objects without an active lease. This ensures that the object’s data can be safely read as long as the lease has not expired.

However, if the lease expires before a `Get` operation finishes reading the data, the operation will be considered failed, and no data will be returned, in order to prevent potential data corruption.

The default lease TTL is 5 seconds and is configurable via a startup parameter of `master_service`.

Expand Down
66 changes: 45 additions & 21 deletions mooncake-store/include/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <thread>
#include <vector>
#include <ylt/util/tl/expected.hpp>
#include <chrono>

#include "client_metric.h"
#include "ha_helper.h"
Expand All @@ -23,6 +24,31 @@ namespace mooncake {

class PutOperation;

/**
* @brief Result of a query operation containing replica information and lease
* timeout
*/
class QueryResult {
public:
/** @brief List of available replicas for the queried key */
const std::vector<Replica::Descriptor> replicas;
/** @brief Time point when the lease for this key expires */
const std::chrono::steady_clock::time_point lease_timeout;

QueryResult(std::vector<Replica::Descriptor>&& replicas_param,
std::chrono::steady_clock::time_point lease_timeout_param)
: replicas(std::move(replicas_param)),
lease_timeout(lease_timeout_param) {}

bool IsLeaseExpired() const {
return std::chrono::steady_clock::now() >= lease_timeout;
}

bool IsLeaseExpired(std::chrono::steady_clock::time_point& now) const {
return now >= lease_timeout;
}
};

/**
* @brief Client for interacting with the mooncake distributed object store
*/
Expand Down Expand Up @@ -71,11 +97,10 @@ class Client {
/**
* @brief Gets object metadata without transferring data
* @param object_key Key to query
* @param object_info Output parameter for object metadata
* @return ErrorCode indicating success/failure
* @return QueryResult containing replicas and lease timeout, or ErrorCode
* indicating failure
*/
tl::expected<std::vector<Replica::Descriptor>, ErrorCode> Query(
const std::string& object_key);
tl::expected<QueryResult, ErrorCode> Query(const std::string& object_key);

/**
* @brief Queries replica lists for object keys that match a regex pattern.
Expand All @@ -91,33 +116,33 @@ class Client {
/**
* @brief Batch query object metadata without transferring data
* @param object_keys Keys to query
* @param object_infos Output parameter for object metadata
* @return Vector of QueryResult objects containing replicas and lease
* timeouts
*/

std::vector<tl::expected<std::vector<Replica::Descriptor>, ErrorCode>>
BatchQuery(const std::vector<std::string>& object_keys);
std::vector<tl::expected<QueryResult, ErrorCode>> BatchQuery(
const std::vector<std::string>& object_keys);

/**
* @brief Transfers data using pre-queried object information
* @param object_key Key of the object
* @param replica_list Previously queried replica list
* @param query_result Previously queried object metadata containing
* replicas and lease timeout
* @param slices Vector of slices to store the data
* @return ErrorCode indicating success/failure
*/
tl::expected<void, ErrorCode> Get(
const std::string& object_key,
const std::vector<Replica::Descriptor>& replica_list,
std::vector<Slice>& slices);
tl::expected<void, ErrorCode> Get(const std::string& object_key,
const QueryResult& query_result,
std::vector<Slice>& slices);
/**
* @brief Transfers data using pre-queried object information
* @param object_keys Keys of the objects
* @param object_infos Previously queried object metadata
* @param query_results Previously queried object metadata for each key
* @param slices Map of object keys to their data slices
* @return ErrorCode indicating success/failure
* @return Vector of ErrorCode results for each object
*/
std::vector<tl::expected<void, ErrorCode>> BatchGet(
const std::vector<std::string>& object_keys,
const std::vector<std::vector<Replica::Descriptor>>& replica_lists,
const std::vector<QueryResult>& query_results,
std::unordered_map<std::string, std::vector<Slice>>& slices);

/**
Expand Down Expand Up @@ -214,8 +239,7 @@ class Client {
/**
* @brief Checks if multiple objects exist
* @param keys Vector of keys to check
* @param exist_results Output vector of existence results for each key
* @return ErrorCode indicating success/failure of the batch operation
* @return Vector of existence results for each key
*/
std::vector<tl::expected<bool, ErrorCode>> BatchIsExist(
const std::vector<std::string>& keys);
Expand Down Expand Up @@ -257,12 +281,12 @@ class Client {
const std::string& local_hostname,
const std::string& metadata_connstring, const std::string& protocol,
const std::optional<std::string>& device_names);
ErrorCode TransferData(const Replica::Descriptor& replica,
ErrorCode TransferData(const Replica::Descriptor& replica_descriptor,
std::vector<Slice>& slices,
TransferRequest::OpCode op_code);
ErrorCode TransferWrite(const Replica::Descriptor& replica,
ErrorCode TransferWrite(const Replica::Descriptor& replica_descriptor,
std::vector<Slice>& slices);
ErrorCode TransferRead(const Replica::Descriptor& replica,
ErrorCode TransferRead(const Replica::Descriptor& replica_descriptor,
std::vector<Slice>& slices);

/**
Expand Down
7 changes: 4 additions & 3 deletions mooncake-store/include/master_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "client_metric.h"
#include "replica.h"
#include "types.h"
#include "rpc_types.h"

namespace mooncake {

Expand Down Expand Up @@ -67,7 +68,7 @@ class MasterClient {
* @param object_info Output parameter for object metadata
* @return ErrorCode indicating success/failure
*/
[[nodiscard]] tl::expected<std::vector<Replica::Descriptor>, ErrorCode>
[[nodiscard]] tl::expected<GetReplicaListResponse, ErrorCode>
GetReplicaList(const std::string& object_key);

/**
Expand All @@ -88,8 +89,8 @@ class MasterClient {
* @param object_infos Output parameter for object metadata
* @return ErrorCode indicating success/failure
*/
[[nodiscard]] std::vector<
tl::expected<std::vector<Replica::Descriptor>, ErrorCode>>
[[nodiscard]]
std::vector<tl::expected<GetReplicaListResponse, ErrorCode>>
BatchGetReplicaList(const std::vector<std::string>& object_keys);

/**
Expand Down
53 changes: 53 additions & 0 deletions mooncake-store/include/master_config.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <optional>
#include <stdexcept>

#include "config_helper.h"
Expand Down Expand Up @@ -361,4 +362,56 @@ inline MasterServiceConfigBuilder MasterServiceConfig::builder() {
return MasterServiceConfigBuilder();
}

// Configuration for InProcMaster (in-process master server for testing)
struct InProcMasterConfig {
std::optional<int> rpc_port;
std::optional<int> http_metrics_port;
std::optional<int> http_metadata_port;
std::optional<uint64_t> default_kv_lease_ttl;
};

// Builder class for InProcMasterConfig
class InProcMasterConfigBuilder {
private:
std::optional<int> rpc_port_ = std::nullopt;
std::optional<int> http_metrics_port_ = std::nullopt;
std::optional<int> http_metadata_port_ = std::nullopt;
std::optional<uint64_t> default_kv_lease_ttl_ = std::nullopt;

public:
InProcMasterConfigBuilder() = default;

InProcMasterConfigBuilder& set_rpc_port(int port) {
rpc_port_ = port;
return *this;
}

InProcMasterConfigBuilder& set_http_metrics_port(int port) {
http_metrics_port_ = port;
return *this;
}

InProcMasterConfigBuilder& set_http_metadata_port(int port) {
http_metadata_port_ = port;
return *this;
}

InProcMasterConfigBuilder& set_default_kv_lease_ttl(uint64_t ttl) {
default_kv_lease_ttl_ = ttl;
return *this;
}

InProcMasterConfig build() const;
};

// Implementation of InProcMasterConfigBuilder::build()
inline InProcMasterConfig InProcMasterConfigBuilder::build() const {
InProcMasterConfig config;
config.rpc_port = rpc_port_;
config.http_metrics_port = http_metrics_port_;
config.http_metadata_port = http_metadata_port_;
config.default_kv_lease_ttl = default_kv_lease_ttl_;
return config;
}

} // namespace mooncake
11 changes: 2 additions & 9 deletions mooncake-store/include/master_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "segment.h"
#include "types.h"
#include "master_config.h"
#include "rpc_types.h"
#include "replica.h"

namespace mooncake {
Expand Down Expand Up @@ -127,15 +128,7 @@ class MasterService {
* ready
*/
auto GetReplicaList(std::string_view key)
-> tl::expected<std::vector<Replica::Descriptor>, ErrorCode>;

/**
* @brief Get list of replicas for a batch of objects
* @param[out] batch_replica_list Vector to store replicas information for
* slices
*/
std::vector<tl::expected<std::vector<Replica::Descriptor>, ErrorCode>>
BatchGetReplicaList(const std::vector<std::string>& keys);
-> tl::expected<GetReplicaListResponse, ErrorCode>;

/**
* @brief Start a put operation for an object
Expand Down
5 changes: 3 additions & 2 deletions mooncake-store/include/rpc_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include "master_service.h"
#include "types.h"
#include "rpc_types.h"
#include "master_config.h"

namespace mooncake {
Expand All @@ -33,10 +34,10 @@ class WrappedMasterService {
ErrorCode>
GetReplicaListByRegex(const std::string& str);

tl::expected<std::vector<Replica::Descriptor>, ErrorCode> GetReplicaList(
tl::expected<GetReplicaListResponse, ErrorCode> GetReplicaList(
const std::string& key);

std::vector<tl::expected<std::vector<Replica::Descriptor>, ErrorCode>>
std::vector<tl::expected<GetReplicaListResponse, ErrorCode>>
BatchGetReplicaList(const std::vector<std::string>& keys);

tl::expected<std::vector<Replica::Descriptor>, ErrorCode> PutStart(
Expand Down
43 changes: 43 additions & 0 deletions mooncake-store/include/rpc_types.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#pragma once

#include "types.h"
#include "replica.h"

namespace mooncake {

/**
* @brief Response structure for Ping operation
*/
struct PingResponse {
ViewVersionId view_version_id;
ClientStatus client_status;

PingResponse() = default;
PingResponse(ViewVersionId view_version, ClientStatus status)
: view_version_id(view_version), client_status(status) {}

friend std::ostream& operator<<(std::ostream& os,
const PingResponse& response) noexcept {
return os << "PingResponse: { view_version_id: "
<< response.view_version_id
<< ", client_status: " << response.client_status << " }";
}
};
YLT_REFL(PingResponse, view_version_id, client_status);

/**
* @brief Response structure for GetReplicaList operation
*/
struct GetReplicaListResponse {
std::vector<Replica::Descriptor> replicas;
uint64_t lease_ttl_ms;

GetReplicaListResponse() : lease_ttl_ms(0) {}
GetReplicaListResponse(std::vector<Replica::Descriptor>&& replicas_param,
uint64_t lease_ttl_ms_param)
: replicas(std::move(replicas_param)),
lease_ttl_ms(lease_ttl_ms_param) {}
};
YLT_REFL(GetReplicaListResponse, replicas, lease_ttl_ms);

} // namespace mooncake
29 changes: 6 additions & 23 deletions mooncake-store/include/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,16 @@ enum class ErrorCode : int32_t {
INVALID_PARAMS = -600, ///< Invalid parameters.

// Engine operation errors (Range: -700 to -799)
INVALID_WRITE = -700, ///< Invalid write operation.
INVALID_READ = -701, ///< Invalid read operation.
INVALID_REPLICA = -702, ///< Invalid replica operation.
INVALID_WRITE = -700, ///< Invalid write operation.
INVALID_READ = -701, ///< Invalid read operation.
INVALID_REPLICA = -702, ///< Invalid replica operation.

// Object errors (Range: -703 to -707)
REPLICA_IS_NOT_READY = -703, ///< Replica is not ready.
OBJECT_NOT_FOUND = -704, ///< Object not found.
OBJECT_ALREADY_EXISTS = -705, ///< Object already exists.
OBJECT_HAS_LEASE = -706, ///< Object has lease.
LEASE_EXPIRED = -707, ///< Lease expired before data transfer completed.

// Transfer errors (Range: -800 to -899)
TRANSFER_FAIL = -800, ///< Transfer operation failed.
Expand Down Expand Up @@ -202,26 +205,6 @@ inline std::ostream& operator<<(std::ostream& os,
return os;
}

/**
* @brief Response structure for Ping operation
*/
struct PingResponse {
ViewVersionId view_version_id;
ClientStatus client_status;

PingResponse() = default;
PingResponse(ViewVersionId view_version, ClientStatus status)
: view_version_id(view_version), client_status(status) {}

friend std::ostream& operator<<(std::ostream& os,
const PingResponse& response) noexcept {
return os << "PingResponse: { view_version_id: "
<< response.view_version_id
<< ", client_status: " << response.client_status << " }";
}
};
YLT_REFL(PingResponse, view_version_id, client_status);

enum class BufferAllocatorType {
CACHELIB = 0, // CachelibBufferAllocator
OFFSET = 1, // OffsetBufferAllocator
Expand Down
Loading
Loading