Skip to content

Commit

Permalink
Make Open/R KvStore initial and max backoff time configurable via Ope…
Browse files Browse the repository at this point in the history
…nrConfig

Summary:
- Make kvstore inital and max backoff configurable, use defaults when not configured
- modify requestThriftPeerSync to use appropriate backoff parameters
- Expose processThriftFailure as a public API to allow test code to make use of it
- Enhance kvstore wrapper to provide APIs to test-code, in order to trigger processThriftFailure
- Add UT automation

Reviewed By: xiangxu1121

Differential Revision: D54129764

fbshipit-source-id: 211d97df6430fa13e59265fefcac7e981273e739
  • Loading branch information
Shitanshu Shah authored and facebook-github-bot committed Mar 5, 2024
1 parent 4cad9c6 commit 0476c96
Show file tree
Hide file tree
Showing 9 changed files with 250 additions and 24 deletions.
2 changes: 2 additions & 0 deletions openr/config/Config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,8 @@ Config::toThriftKvStoreConfig() const {
config.node_name() = getNodeName();
config.key_ttl_ms() = *oldConfig.key_ttl_ms();
config.ttl_decrement_ms() = *oldConfig.ttl_decrement_ms();
config.sync_initial_backoff_ms() = *oldConfig.sync_initial_backoff_ms();
config.sync_max_backoff_ms() = *oldConfig.sync_max_backoff_ms();

if (auto floodRate = oldConfig.flood_rate()) {
thrift::KvStoreFloodRate rate;
Expand Down
2 changes: 2 additions & 0 deletions openr/if/KvStore.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,8 @@ struct KvStoreConfig {
13: optional string x509_ca_path;
/** Knob to enable/disable TLS thrift client. */
14: bool enable_secure_thrift_client = false;
15: i32 sync_initial_backoff_ms = 4000;
16: i32 sync_max_backoff_ms = 256000;
}

/**
Expand Down
5 changes: 5 additions & 0 deletions openr/if/OpenrConfig.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ struct KvstoreConfig {
*/
6: optional list<string> key_prefix_filters;
7: optional list<string> key_originator_id_filters;
/**
* Configurable Initial and Max backoffs for kvstore peer full sync
*/
8: i32 sync_initial_backoff_ms = 4000;
9: i32 sync_max_backoff_ms = 256000;
}

/*
Expand Down
76 changes: 69 additions & 7 deletions openr/kvstore/KvStore-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,41 @@ KvStore<ClientType>::KvStore(
kvParams_.maybeIpTos.value(),
*kvStoreConfig.node_name());
}
if (*kvStoreConfig.sync_initial_backoff_ms() <= 0) {
XLOG(INFO) << fmt::format(
"non-zero sync initial backoff ms {}, re-setting to {}",
*kvStoreConfig.sync_initial_backoff_ms(),
std::chrono::milliseconds(Constants::kKvstoreSyncInitialBackoff)
.count());

kvParams_.syncInitialBackoff = Constants::kKvstoreSyncInitialBackoff;
} else {
kvParams_.syncInitialBackoff =
std::chrono::milliseconds(*kvStoreConfig.sync_initial_backoff_ms());
}

if (*kvStoreConfig.sync_max_backoff_ms() <=
std::chrono::milliseconds(kvParams_.syncInitialBackoff).count()) {
if (kvParams_.syncInitialBackoff < Constants::kKvstoreSyncMaxBackoff) {
kvParams_.syncMaxBackoff = Constants::kKvstoreSyncMaxBackoff;
} else {
// to be tuned if this case is of interest
kvParams_.syncMaxBackoff = (kvParams_.syncInitialBackoff * 2);
}

XLOG(INFO) << fmt::format(
"sync max backoff ms {} less than initial backoff, re-setting to {}",
*kvStoreConfig.sync_max_backoff_ms(),
kvParams_.syncMaxBackoff.count());
} else {
kvParams_.syncMaxBackoff =
std::chrono::milliseconds(*kvStoreConfig.sync_max_backoff_ms());
}

XLOG(INFO) << fmt::format(
"Initial backoff {} and Max backoff {}",
kvParams_.syncInitialBackoff.count(),
kvParams_.syncMaxBackoff.count());

{
auto fiber = addFiberTaskFuture(
Expand Down Expand Up @@ -520,6 +555,29 @@ KvStore<ClientType>::semifuture_setKvStoreKeyValues(
return sf;
}

template <class ClientType>
folly::SemiFuture<std::unique_ptr<bool>>
KvStore<ClientType>::semifuture_injectThriftFailure(
std::string area, std::string peerName) {
folly::Promise<std::unique_ptr<bool>> p;
auto sf = p.getSemiFuture();
runInEventBaseThread(
[this, p = std::move(p), peerName = std::move(peerName), area]() mutable {
try {
bool r = true;
auto& kvStoreDb = getAreaDbOrThrow(area, "disconnectPeer");
kvStoreDb.processThriftFailure(
peerName,
"injected thrift failure",
std::chrono::milliseconds(500)); // arbitrary timeout
p.setValue(std::make_unique<bool>(std::move(r)));
} catch (thrift::KvStoreError const& e) {
p.setException(e);
}
});
return sf;
}

template <class ClientType>
folly::SemiFuture<std::optional<thrift::KvStorePeerState>>
KvStore<ClientType>::semifuture_getKvStorePeerState(
Expand Down Expand Up @@ -1049,6 +1107,11 @@ KvStoreDb<ClientType>::KvStorePeer::KvStorePeer(
CHECK(not this->peerSpec.peerAddr()->empty());
CHECK(
this->expBackoff.getInitialBackoff() <= this->expBackoff.getMaxBackoff());
XLOG(INFO) << fmt::format(
"node: {}, initial backoff {} and max backoff {}",
nodeName,
this->expBackoff.getInitialBackoff().count(),
this->expBackoff.getMaxBackoff().count());
}

template <class ClientType>
Expand Down Expand Up @@ -1599,7 +1662,7 @@ KvStoreDb<ClientType>::advertiseSelfOriginatedKeys() {
// Build keys to be cleaned from local storage
std::vector<std::string> keysToClear;

std::chrono::milliseconds timeout = Constants::kMaxBackoff;
std::chrono::milliseconds timeout = kvParams_.syncMaxBackoff;
for (auto const& key : keysToAdvertise_) {
// Each key was introduced through a persistSelfOriginatedKey() call.
// Therefore, each key is in selfOriginatedKeyVals_ and has a keyBackoff.
Expand Down Expand Up @@ -1918,7 +1981,7 @@ template <class ClientType>
void
KvStoreDb<ClientType>::requestThriftPeerSync() {
// minimal timeout for next run
auto timeout = std::chrono::milliseconds(Constants::kKvstoreSyncMaxBackoff);
auto timeout = kvParams_.syncMaxBackoff;

// pre-fetch of peers in "SYNCING" state for later calculation
uint32_t numThriftPeersInSync =
Expand All @@ -1935,7 +1998,7 @@ KvStoreDb<ClientType>::requestThriftPeerSync() {
}

// update the global minimum timeout value for next try
if (not thriftPeer.expBackoff.canTryNow()) {
if (not expBackoff.canTryNow()) {
timeout = std::min(timeout, expBackoff.getTimeRemainingUntilRetry());
continue;
}
Expand Down Expand Up @@ -2006,9 +2069,9 @@ KvStoreDb<ClientType>::requestThriftPeerSync() {
});

// in case pending peer size is over parallelSyncLimit,
// wait until kMaxBackoff before sending next round of sync
// wait until syncInitialBackoff before sending next round of sync
if (numThriftPeersInSync > parallelSyncLimitOverThrift_) {
timeout = Constants::kKvstoreSyncInitialBackoff;
timeout = kvParams_.syncInitialBackoff;
XLOG(INFO)
<< AreaTag()
<< fmt::format(
Expand Down Expand Up @@ -2317,8 +2380,7 @@ KvStoreDb<ClientType>::addThriftPeers(
AreaTag(),
newPeerSpec,
ExponentialBackoff<std::chrono::milliseconds>(
Constants::kKvstoreSyncInitialBackoff,
Constants::kKvstoreSyncMaxBackoff),
kvParams_.syncInitialBackoff, kvParams_.syncMaxBackoff),
kvParams_);
peer.peerSpec.stateEpochTimeMs() = getTimeSinceEpochMs();
peer.peerSpec.flaps() = -1;
Expand Down
37 changes: 20 additions & 17 deletions openr/kvstore/KvStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,23 @@ class KvStoreDb {
void unsetSelfOriginatedKey(std::string const& key, std::string const& value);
void eraseSelfOriginatedKey(std::string const& key);

/*
* [Initial Sync]
*
* util method to process thrift sync response in:
* 1) Success
* 2) Failure
*/
void processThriftSuccess(
std::string const& peerName,
thrift::Publication&& pub,
std::chrono::milliseconds timeDelta);

void processThriftFailure(
std::string const& peerName,
folly::fbstring const& exceptionStr,
std::chrono::milliseconds timeDelta);

private:
// disable copying
KvStoreDb(KvStoreDb const&) = delete;
Expand Down Expand Up @@ -243,23 +260,6 @@ class KvStoreDb {
void finalizeFullSync(
const std::unordered_set<std::string>& keys, const std::string& senderId);

/*
* [Initial Sync]
*
* util method to process thrift sync response in:
* 1) Success
* 2) Failure
*/
void processThriftSuccess(
std::string const& peerName,
thrift::Publication&& pub,
std::chrono::milliseconds timeDelta);

void processThriftFailure(
std::string const& peerName,
folly::fbstring const& exceptionStr,
std::chrono::milliseconds timeDelta);

/*
* [Version Inconsistency Mitigation]
*/
Expand Down Expand Up @@ -609,6 +609,9 @@ class KvStore final : public OpenrEventBase {
semifuture_setKvStoreKeyValues(
std::string area, thrift::KeySetParams keySetParams);

folly::SemiFuture<std::unique_ptr<bool>> semifuture_injectThriftFailure(
std::string area, std::string peerName);

folly::SemiFuture<std::unique_ptr<std::vector<thrift::Publication>>>
semifuture_dumpKvStoreKeys(
thrift::KeyDumpParams keyDumpParams,
Expand Down
7 changes: 7 additions & 0 deletions openr/kvstore/KvStoreParams.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ struct KvStoreParams {
std::chrono::milliseconds ttlDecr{Constants::kTtlDecrement};
// TTL for self-originated keys
std::chrono::milliseconds keyTtl{0};
std::chrono::milliseconds syncInitialBackoff{
Constants::kKvstoreSyncInitialBackoff};
std::chrono::milliseconds syncMaxBackoff{Constants::kKvstoreSyncMaxBackoff};

// TLS knob
bool enable_secure_thrift_client{false};
Expand All @@ -59,6 +62,10 @@ struct KvStoreParams {
*kvStoreConfig.ttl_decrement_ms())), /* TTL decrement factor */
keyTtl(std::chrono::milliseconds(
*kvStoreConfig.key_ttl_ms())), /*TTL for self-originated keys */
syncInitialBackoff(std::chrono::milliseconds(
*kvStoreConfig.sync_initial_backoff_ms())),
syncMaxBackoff(
std::chrono::milliseconds(*kvStoreConfig.sync_max_backoff_ms())),
enable_secure_thrift_client(
*kvStoreConfig.enable_secure_thrift_client()),
x509_cert_path(kvStoreConfig.x509_cert_path().to_optional()),
Expand Down
15 changes: 15 additions & 0 deletions openr/kvstore/KvStoreWrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,21 @@ KvStoreWrapper<ClientType>::setKeys(
return true;
}

template <class ClientType>
bool
KvStoreWrapper<ClientType>::injectThriftFailure(
AreaId const& area, std::string const& peerName) {
try {
kvStore_->semifuture_injectThriftFailure(area, peerName);
} catch (std::exception const& e) {
XLOG(ERR) << "Exception to thrift failure injection: "
<< folly::exceptionStr(e);
return false;
}

return true;
}

template <class ClientType>
void
KvStoreWrapper<ClientType>::pushToKvStoreUpdatesQueue(
Expand Down
2 changes: 2 additions & 0 deletions openr/kvstore/KvStoreWrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ class KvStoreWrapper {
const std::vector<std::pair<std::string, thrift::Value>>& keyVals,
std::optional<std::vector<std::string>> nodeIds = std::nullopt);

bool injectThriftFailure(AreaId const& area, std::string const& peerName);

void
publishKvStoreSynced() {
kvStoreUpdatesQueue_.push(thrift::InitializationEvent::KVSTORE_SYNCED);
Expand Down
Loading

0 comments on commit 0476c96

Please sign in to comment.