Skip to content

Commit

Permalink
iteration
Browse files Browse the repository at this point in the history
  • Loading branch information
eskedesu committed Feb 2, 2025
1 parent fc3f6e7 commit a1119f6
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 25 deletions.
2 changes: 0 additions & 2 deletions etcd/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
project(userver-etcd CXX)

# TODO: Add etcd setup

userver_module(etcd
SOURCE_DIR "${CMAKE_CURRENT_SOURCE_DIR}"
UTEST_SOURCES "${CMAKE_CURRENT_SOURCE_DIR}/src/*_test.cpp"
Expand Down
9 changes: 6 additions & 3 deletions etcd/include/userver/storages/etcd/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
#include <vector>

#include <userver/clients/http/component.hpp>
#include <userver/concurrent/queue.hpp>
#include <userver/engine/task/task_with_result.hpp>
#include <userver/storages/etcd/settings.hpp>
#include <userver/storages/etcd/watch_listener.hpp>
#include <userver/yaml_config/fwd.hpp>

USERVER_NAMESPACE_BEGIN
Expand All @@ -27,7 +29,7 @@ class Client {
virtual void Put(const std::string& key, const std::string& value) = 0;
[[nodiscard]] virtual std::vector<std::string> Range(const std::string& key) = 0;
virtual void DeleteRange(const std::string& key) = 0;
virtual void StartWatch(const std::string& key) = 0;
virtual WatchListener StartWatch(const std::string& key) = 0;
};

namespace impl {
Expand All @@ -39,15 +41,16 @@ class ClientImpl : public Client {
void Put(const std::string& key, const std::string& value) override;
[[nodiscard]] std::vector<std::string> Range(const std::string& key) override;
void DeleteRange(const std::string& key) override;
void StartWatch(const std::string& key) override;
WatchListener StartWatch(const std::string& key) override;
private:
[[nodiscard]] std::shared_ptr<clients::http::Response>
PerformEtcdRequest(const std::function<std::string(const std::string&)>& url_builder, const std::string& data);
[[nodiscard]] clients::http::StreamedResponse PerformStreamEtcdRequest(
const std::function<std::string(const std::string&)>& url_builder, const std::string& data);

clients::http::Client& http_client_;
userver::engine::TaskWithResult<void> watch_task_;
std::vector<userver::engine::TaskWithResult<void>> watch_tasks_;
std::vector<std::shared_ptr<concurrent::SpscQueue<KVEvent>>> watch_queues_;
const ClientSettings settings_;
};

Expand Down
19 changes: 19 additions & 0 deletions etcd/include/userver/storages/etcd/exceptions.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#pragma once

#include <string>

#include <stdexcept>

USERVER_NAMESPACE_BEGIN

namespace storages::etcd {

/// @brief Base class for all etcd client exceptions
class EtcdError : public std::runtime_error {
public:
using std::runtime_error::runtime_error;
};

}

USERVER_NAMESPACE_END
25 changes: 25 additions & 0 deletions etcd/include/userver/storages/etcd/watch_listener.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#pragma once

#include <string>

#include <userver/concurrent/queue.hpp>

USERVER_NAMESPACE_BEGIN

namespace storages::etcd {

struct KVEvent final {
std::string key;
std::string value;
std::size_t version;
};

struct WatchListener final {
concurrent::SpscQueue<KVEvent>::Consumer consumer;

KVEvent GetEvent();
};

}

USERVER_NAMESPACE_END
63 changes: 43 additions & 20 deletions etcd/src/storages/etcd/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

#include <userver/clients/http/streamed_response.hpp>
#include <userver/http/status_code.hpp>
#include <userver/concurrent/queue.hpp>
#include <userver/crypto/base64.hpp>
#include <userver/dynamic_config/value.hpp>
#include <userver/formats/json/value_builder.hpp>
Expand Down Expand Up @@ -105,6 +104,49 @@ void ClientImpl::DeleteRange(const std::string& key) {
auto response = PerformEtcdRequest(BuildDeleteRangeUrl, BuildDeleteRangeData(key));
}

WatchListener ClientImpl::StartWatch(const std::string& key) {

const auto data = BuildWatchData(key);
auto stream_response = PerformStreamEtcdRequest(BuildWatchUrl, BuildWatchData(key));
auto queue = concurrent::SpscQueue<KVEvent>::Create();
// TODO: add lock
watch_queues_.push_back(queue);
watch_tasks_.push_back(utils::Async(
"watch task",
[stream_response = std::move(stream_response), produser = queue->GetProducer()] mutable {
std::string body_part;
const auto deadline = engine::Deadline::FromDuration(std::chrono::seconds{100'000'000});
while (stream_response.ReadChunk(body_part, deadline)) {
const auto watch_response = formats::json::FromString(body_part);
LOG_ERROR() << watch_response;
if (!watch_response["result"].HasMember("events")) {
LOG_INFO() << "No events in watch part response, skipping";
continue;
}
for (const auto event : watch_response["result"]["events"]) {
if (!event.HasMember("kv")) {
continue;
}
const auto key = crypto::base64::Base64Decode(event["kv"]["key"].As<std::string>());
const auto value = crypto::base64::Base64Decode(event["kv"]["value"].As<std::string>());
const auto version = std::stoi(event["kv"]["version"].As<std::string>());
if (!produser.PushNoblock({
.key = key,
.value = value,
.version = version,
})) {
LOG_ERROR() << "PushNoblock failed";
return;
};
}
}
}
));
return WatchListener{
.consumer = queue->GetConsumer()
};
}

clients::http::StreamedResponse ClientImpl::PerformStreamEtcdRequest(
const std::function<std::string(const std::string&)>& url_builder, const std::string& data
){
Expand All @@ -125,25 +167,6 @@ clients::http::StreamedResponse ClientImpl::PerformStreamEtcdRequest(
}
}

void ClientImpl::StartWatch(const std::string& key) {

const auto data = BuildWatchData(key);
auto stream_response = PerformStreamEtcdRequest(BuildWatchUrl, BuildWatchData(key));

watch_task_ = utils::Async(
"watch task",
[stream_response = std::move(stream_response)] mutable {
std::string body_part;
std::string result;
const auto deadline = engine::Deadline::FromDuration(std::chrono::seconds{100'000'000});
while (stream_response.ReadChunk(body_part, deadline)) {
LOG_ERROR() << "Kek " << body_part;
result += body_part;
}
}
);
}

std::shared_ptr<clients::http::Response> ClientImpl::PerformEtcdRequest(
const std::function<std::string(const std::string&)>& url_builder, const std::string& data
) {
Expand Down
19 changes: 19 additions & 0 deletions etcd/src/storages/etcd/watch_listener.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#include <userver/storages/etcd/watch_listener.hpp>

#include <userver/storages/etcd/exceptions.hpp>

USERVER_NAMESPACE_BEGIN

namespace storages::etcd {

KVEvent WatchListener::GetEvent() {
KVEvent event;
if (!consumer.Pop(event)) {
throw EtcdError("Consumer pop failed");
}
return event;
}

} // namespace formats::parse

USERVER_NAMESPACE_END

0 comments on commit a1119f6

Please sign in to comment.