Skip to content

Commit

Permalink
iteration
Browse files Browse the repository at this point in the history
  • Loading branch information
eskedesu committed Feb 1, 2025
1 parent 3c1e377 commit fc3f6e7
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 50 deletions.
40 changes: 32 additions & 8 deletions etcd/include/userver/storages/etcd/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,56 @@
#include <chrono>
#include <cstdint>
#include <memory>
#include <string>
#include <vector>

#include <userver/clients/http/component.hpp>
#include <userver/engine/shared_mutex.hpp>
#include <userver/engine/task/task_with_result.hpp>
#include <userver/storages/etcd/settings.hpp>
#include <userver/yaml_config/fwd.hpp>

USERVER_NAMESPACE_BEGIN

namespace storages::etcd {

class Client final {
namespace impl {

class ClientImpl;

}

class Client {
public:
Client(clients::http::Client& http_client, ClientSettings settings);
void Put(const std::string& key, const std::string& value);
[[nodiscard]] std::vector<std::string> Range(const std::string& key);
void DeleteRange(const std::string& key);
virtual ~Client() = default;
virtual void Put(const std::string& key, const std::string& value) = 0;
[[nodiscard]] virtual std::vector<std::string> Range(const std::string& key) = 0;
virtual void DeleteRange(const std::string& key) = 0;
virtual void StartWatch(const std::string& key) = 0;
};

namespace impl {

class ClientImpl : public Client {
public:
~ClientImpl() override = default;
ClientImpl(clients::http::Client& http_client, ClientSettings settings);
void Put(const std::string& key, const std::string& value) override;
[[nodiscard]] std::vector<std::string> Range(const std::string& key) override;
void DeleteRange(const std::string& key) override;
void StartWatch(const std::string& key) override;
private:
[[nodiscard]] std::shared_ptr<clients::http::Response>
PerformEtcdRequest(const std::function<std::string(const std::string&)>& url_builder, const std::string& data);
[[nodiscard]] clients::http::StreamedResponse PerformStreamEtcdRequest(
const std::function<std::string(const std::string&)>& url_builder, const std::string& data);

clients::http::Client& http_client_;
engine::SharedMutex endpoints_shared_mutex_;
ClientSettings settings_;
userver::engine::TaskWithResult<void> watch_task_;
const ClientSettings settings_;
};

}

using ClientPtr = std::shared_ptr<Client>;

}
Expand Down
6 changes: 3 additions & 3 deletions etcd/include/userver/storages/etcd/settings.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ USERVER_NAMESPACE_BEGIN
namespace storages::etcd {

struct ClientSettings final {
std::vector<std::string> endpoints;
std::uint32_t retries;
std::chrono::microseconds request_timeout_ms;
const std::vector<std::string> endpoints;
const std::uint32_t retries;
const std::chrono::microseconds request_timeout_ms;
};

}
Expand Down
110 changes: 78 additions & 32 deletions etcd/src/storages/etcd/client.cpp
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
#include <cstddef>
#include <memory>
#include <userver/storages/etcd/client.hpp>

#include <iostream>
#include <string>

#include <fmt/format.h>

#include <userver/clients/http/streamed_response.hpp>
#include <userver/http/status_code.hpp>
#include <userver/concurrent/queue.hpp>
#include <userver/crypto/base64.hpp>
#include <userver/dynamic_config/value.hpp>
#include <userver/formats/json/value_builder.hpp>
#include <userver/formats/json/string_builder.hpp>
#include <userver/formats/parse/common_containers.hpp>
#include <userver/http/common_headers.hpp>
#include <userver/logging/log.hpp>
#include <userver/utils/rand.hpp>
#include <userver/yaml_config/yaml_config.hpp>
#include <userver/utils/async.hpp>

USERVER_NAMESPACE_BEGIN

Expand All @@ -25,61 +32,61 @@ std::string BuildPutUrl(const std::string& service_url) {
}

std::string BuildPutData(const std::string& key, const std::string& value) {
formats::json::StringBuilder sb;
{
formats::json::StringBuilder::ObjectGuard guard{sb};
sb.Key("key");
sb.WriteString(crypto::base64::Base64Encode(key));
sb.Key("value");
sb.WriteString(crypto::base64::Base64Encode(value));
}
return sb.GetString();
formats::json::ValueBuilder builder;
builder["key"] = crypto::base64::Base64Encode(key);
builder["value"] = crypto::base64::Base64Encode(value);
return formats::json::ToString(builder.ExtractValue());
}

std::string BuildRangeUrl(const std::string& service_url) {
return fmt::format("{}/v3/kv/range", service_url);
}

std::string BuildRangeData(const std::string& key) {
formats::json::StringBuilder sb;
{
formats::json::StringBuilder::ObjectGuard guard{sb};
sb.Key("key");
sb.WriteString(crypto::base64::Base64Encode(key));
}
return sb.GetString();
formats::json::ValueBuilder builder;
builder["key"] = crypto::base64::Base64Encode(key);
return formats::json::ToString(builder.ExtractValue());
}

std::string BuildDeleteRangeUrl(const std::string& service_url) {
return fmt::format("{}/v3/kv/deleterange", service_url);
}

std::string BuildDeleteRangeData(const std::string& key) {
formats::json::StringBuilder sb;
{
formats::json::StringBuilder::ObjectGuard guard{sb};
sb.Key("key");
sb.WriteString(crypto::base64::Base64Encode(key));
}
return sb.GetString();
formats::json::ValueBuilder builder;
builder["key"] = crypto::base64::Base64Encode(key);
return formats::json::ToString(builder.ExtractValue());
}


std::string BuildWatchUrl(const std::string& service_url) {
return fmt::format("{}/v3/watch", service_url);
}

bool ShouldRetry(std::shared_ptr<clients::http::Response> response) {
std::string BuildWatchData(const std::string& key) {
formats::json::ValueBuilder builder;
builder["create_request"]["key"] = crypto::base64::Base64Encode(key);
return formats::json::ToString(builder.ExtractValue());
}

bool ShouldRetry(const http::StatusCode status_code) {
return false;
}

}

Client::Client(clients::http::Client& http_client, ClientSettings settings)
namespace impl {

ClientImpl::ClientImpl(clients::http::Client& http_client, ClientSettings settings)
: http_client_(http_client),
settings_(settings) {
}

void Client::Put(const std::string& key, const std::string& value) {
void ClientImpl::Put(const std::string& key, const std::string& value) {
auto response = PerformEtcdRequest(BuildPutUrl, BuildPutData(key, value));
}

std::vector<std::string> Client::Range(const std::string& key) {
std::vector<std::string> ClientImpl::Range(const std::string& key) {
auto response = PerformEtcdRequest(BuildRangeUrl, BuildRangeData(key));

const auto json_body = formats::json::FromString(response->body());
Expand All @@ -94,16 +101,53 @@ std::vector<std::string> Client::Range(const std::string& key) {
return values;
}

void Client::DeleteRange(const std::string& key) {
void ClientImpl::DeleteRange(const std::string& key) {
auto response = PerformEtcdRequest(BuildDeleteRangeUrl, BuildDeleteRangeData(key));
}

std::shared_ptr<clients::http::Response> Client::PerformEtcdRequest(
clients::http::StreamedResponse ClientImpl::PerformStreamEtcdRequest(
const std::function<std::string(const std::string&)>& url_builder, const std::string& data
){
auto endpoints = settings_.endpoints;
utils::Shuffle(endpoints);

for (const auto& endpoint : endpoints) {
const auto queue = concurrent::StringStreamQueue::Create();
auto stream_response = http_client_
.CreateRequest()
.post(url_builder(endpoint), data)
.retry(settings_.retries)
.timeout(1'000'000'000)
.async_perform_stream_body(queue);
if (!ShouldRetry(stream_response.StatusCode())) {
return stream_response;
}
}
}

void ClientImpl::StartWatch(const std::string& key) {

const auto data = BuildWatchData(key);
auto stream_response = PerformStreamEtcdRequest(BuildWatchUrl, BuildWatchData(key));

watch_task_ = utils::Async(
"watch task",
[stream_response = std::move(stream_response)] mutable {
std::string body_part;
std::string result;
const auto deadline = engine::Deadline::FromDuration(std::chrono::seconds{100'000'000});
while (stream_response.ReadChunk(body_part, deadline)) {
LOG_ERROR() << "Kek " << body_part;
result += body_part;
}
}
);
}

std::shared_ptr<clients::http::Response> ClientImpl::PerformEtcdRequest(
const std::function<std::string(const std::string&)>& url_builder, const std::string& data
) {
endpoints_shared_mutex_.lock_shared();
auto endpoints = settings_.endpoints;
endpoints_shared_mutex_.unlock_shared();
utils::Shuffle(endpoints);

for (const auto& endpoint : endpoints) {
Expand All @@ -114,12 +158,14 @@ std::shared_ptr<clients::http::Response> Client::PerformEtcdRequest(
.timeout(settings_.request_timeout_ms.count())
.perform();
LOG_DEBUG() << "Response: " << formats::json::FromString(response->body());
if (!ShouldRetry(response)) {
if (!ShouldRetry(response->status_code())) {
return response;
}
}
}

}

} // namespace storages::etcd

USERVER_NAMESPACE_END
2 changes: 1 addition & 1 deletion etcd/src/storages/etcd/component.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace storages::etcd {
Component::Component(const components::ComponentConfig& config, const components::ComponentContext& context)
:
ComponentBase(config, context),
etcd_client_ptr_(std::make_shared<Client>(
etcd_client_ptr_(std::make_shared<impl::ClientImpl>(
context.FindComponent<components::HttpClient>().GetHttpClient(),
config.As<ClientSettings>()
)) {}
Expand Down
10 changes: 5 additions & 5 deletions etcd/src/storages/etcd/settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ constexpr std::chrono::milliseconds kDefaultRequestTimeout{1'000};
namespace formats::parse {

storages::etcd::ClientSettings Parse(const yaml_config::YamlConfig& cofig, To<storages::etcd::ClientSettings>) {
storages::etcd::ClientSettings result;
result.endpoints = cofig["endpoints"].As<std::vector<std::string>>(result.endpoints);
result.retries = cofig["retries"].As<std::uint32_t>(storages::etcd::kDefaultRetries);
result.request_timeout_ms = cofig["request_timeout_ms"].As<std::chrono::milliseconds>(storages::etcd::kDefaultRequestTimeout);
return result;
return storages::etcd::ClientSettings {
.endpoints = cofig["endpoints"].As<std::vector<std::string>>(),
.retries = cofig["retries"].As<std::uint32_t>(storages::etcd::kDefaultRetries),
.request_timeout_ms = cofig["request_timeout_ms"].As<std::chrono::milliseconds>(storages::etcd::kDefaultRequestTimeout),
};
}

} // namespace formats::parse
Expand Down
2 changes: 1 addition & 1 deletion testsuite/pytest_plugins/pytest_userver/plugins/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ def patch_config(config, config_vars):
return
http_client = components['http-client'] or {}
http_client['testsuite-enabled'] = True
http_client['testsuite-timeout'] = '10s'
# http_client['testsuite-timeout'] = '30s'

allowed_urls = [mockserver_info.base_url]
if mockserver_ssl_info:
Expand Down

0 comments on commit fc3f6e7

Please sign in to comment.