Skip to content

Commit

Permalink
feat grpc: add GenericClient
Browse files Browse the repository at this point in the history
45c630765c628f16cd600ab26711d67e118c8767
  • Loading branch information
Anton3 committed Jul 19, 2024
1 parent c688be6 commit 9646b90
Show file tree
Hide file tree
Showing 43 changed files with 970 additions and 251 deletions.
8 changes: 8 additions & 0 deletions .mapping.json
Original file line number Diff line number Diff line change
Expand Up @@ -1771,11 +1771,13 @@
"grpc/handlers/proto/healthchecking/healthchecking.proto":"taxi/uservices/userver/grpc/handlers/proto/healthchecking/healthchecking.proto",
"grpc/handlers/src/ugrpc/server/health/health.cpp":"taxi/uservices/userver/grpc/handlers/src/ugrpc/server/health/health.cpp",
"grpc/handlers/src/ugrpc/server/health/test_test.cpp":"taxi/uservices/userver/grpc/handlers/src/ugrpc/server/health/test_test.cpp",
"grpc/include/userver/ugrpc/byte_buffer_utils.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/byte_buffer_utils.hpp",
"grpc/include/userver/ugrpc/client/channels.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/channels.hpp",
"grpc/include/userver/ugrpc/client/client_factory.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/client_factory.hpp",
"grpc/include/userver/ugrpc/client/client_factory_component.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/client_factory_component.hpp",
"grpc/include/userver/ugrpc/client/exceptions.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/exceptions.hpp",
"grpc/include/userver/ugrpc/client/fwd.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/fwd.hpp",
"grpc/include/userver/ugrpc/client/generic.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/generic.hpp",
"grpc/include/userver/ugrpc/client/impl/async_method_invocation.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/impl/async_method_invocation.hpp",
"grpc/include/userver/ugrpc/client/impl/async_methods.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/impl/async_methods.hpp",
"grpc/include/userver/ugrpc/client/impl/call_params.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/impl/call_params.hpp",
Expand All @@ -1797,6 +1799,7 @@
"grpc/include/userver/ugrpc/impl/completion_queues.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/impl/completion_queues.hpp",
"grpc/include/userver/ugrpc/impl/deadline_timepoint.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/impl/deadline_timepoint.hpp",
"grpc/include/userver/ugrpc/impl/internal_tag_fwd.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/impl/internal_tag_fwd.hpp",
"grpc/include/userver/ugrpc/impl/maybe_owned_string.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/impl/maybe_owned_string.hpp",
"grpc/include/userver/ugrpc/impl/queue_runner.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/impl/queue_runner.hpp",
"grpc/include/userver/ugrpc/impl/span.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/impl/span.hpp",
"grpc/include/userver/ugrpc/impl/static_metadata.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/impl/static_metadata.hpp",
Expand Down Expand Up @@ -1835,16 +1838,19 @@
"grpc/proto/tests/repeating_word_in_package_name.proto":"taxi/uservices/userver/grpc/proto/tests/repeating_word_in_package_name.proto",
"grpc/proto/tests/same_service_and_method_name.proto":"taxi/uservices/userver/grpc/proto/tests/same_service_and_method_name.proto",
"grpc/proto/tests/unit_test.proto":"taxi/uservices/userver/grpc/proto/tests/unit_test.proto",
"grpc/src/ugrpc/byte_buffer_utils.cpp":"taxi/uservices/userver/grpc/src/ugrpc/byte_buffer_utils.cpp",
"grpc/src/ugrpc/client/channels.cpp":"taxi/uservices/userver/grpc/src/ugrpc/client/channels.cpp",
"grpc/src/ugrpc/client/client_factory.cpp":"taxi/uservices/userver/grpc/src/ugrpc/client/client_factory.cpp",
"grpc/src/ugrpc/client/client_factory_component.cpp":"taxi/uservices/userver/grpc/src/ugrpc/client/client_factory_component.cpp",
"grpc/src/ugrpc/client/exceptions.cpp":"taxi/uservices/userver/grpc/src/ugrpc/client/exceptions.cpp",
"grpc/src/ugrpc/client/generic.cpp":"taxi/uservices/userver/grpc/src/ugrpc/client/generic.cpp",
"grpc/src/ugrpc/client/impl/async_method_invocation.cpp":"taxi/uservices/userver/grpc/src/ugrpc/client/impl/async_method_invocation.cpp",
"grpc/src/ugrpc/client/impl/async_methods.cpp":"taxi/uservices/userver/grpc/src/ugrpc/client/impl/async_methods.cpp",
"grpc/src/ugrpc/client/impl/call_params.cpp":"taxi/uservices/userver/grpc/src/ugrpc/client/impl/call_params.cpp",
"grpc/src/ugrpc/client/impl/channel_cache.cpp":"taxi/uservices/userver/grpc/src/ugrpc/client/impl/channel_cache.cpp",
"grpc/src/ugrpc/client/impl/client_configs.cpp":"taxi/uservices/userver/grpc/src/ugrpc/client/impl/client_configs.cpp",
"grpc/src/ugrpc/client/impl/client_configs.hpp":"taxi/uservices/userver/grpc/src/ugrpc/client/impl/client_configs.hpp",
"grpc/src/ugrpc/client/impl/client_data.cpp":"taxi/uservices/userver/grpc/src/ugrpc/client/impl/client_data.cpp",
"grpc/src/ugrpc/client/impl/client_factory_config.cpp":"taxi/uservices/userver/grpc/src/ugrpc/client/impl/client_factory_config.cpp",
"grpc/src/ugrpc/client/impl/client_factory_config.hpp":"taxi/uservices/userver/grpc/src/ugrpc/client/impl/client_factory_config.hpp",
"grpc/src/ugrpc/client/impl/client_qos.cpp":"taxi/uservices/userver/grpc/src/ugrpc/client/impl/client_qos.cpp",
Expand Down Expand Up @@ -1920,6 +1926,7 @@
"grpc/tests/src/deadline_metrics_test.cpp":"taxi/uservices/userver/grpc/tests/src/deadline_metrics_test.cpp",
"grpc/tests/src/deadline_test.cpp":"taxi/uservices/userver/grpc/tests/src/deadline_test.cpp",
"grpc/tests/src/error_test.cpp":"taxi/uservices/userver/grpc/tests/src/error_test.cpp",
"grpc/tests/src/generic_client_test.cpp":"taxi/uservices/userver/grpc/tests/src/generic_client_test.cpp",
"grpc/tests/src/logging_test.cpp":"taxi/uservices/userver/grpc/tests/src/logging_test.cpp",
"grpc/tests/src/middlewares_test.cpp":"taxi/uservices/userver/grpc/tests/src/middlewares_test.cpp",
"grpc/tests/src/serialization_test.cpp":"taxi/uservices/userver/grpc/tests/src/serialization_test.cpp",
Expand Down Expand Up @@ -4024,6 +4031,7 @@
"universal/src/utils/impl/disable_core_dumps.cpp":"taxi/uservices/userver/universal/src/utils/impl/disable_core_dumps.cpp",
"universal/src/utils/impl/internal_tag.hpp":"taxi/uservices/userver/universal/src/utils/impl/internal_tag.hpp",
"universal/src/utils/impl/projecting_view_test.cpp":"taxi/uservices/userver/universal/src/utils/impl/projecting_view_test.cpp",
"universal/src/utils/impl/source_location.cpp":"taxi/uservices/userver/universal/src/utils/impl/source_location.cpp",
"universal/src/utils/impl/source_location_test.cpp":"taxi/uservices/userver/universal/src/utils/impl/source_location_test.cpp",
"universal/src/utils/impl/static_registration.cpp":"taxi/uservices/userver/universal/src/utils/impl/static_registration.cpp",
"universal/src/utils/impl/transparent_hash_test.cpp":"taxi/uservices/userver/universal/src/utils/impl/transparent_hash_test.cpp",
Expand Down
6 changes: 6 additions & 0 deletions core/include/userver/concurrent/variable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ class Variable final {
return {mutex_, data_};
}

/// Useful for grabbing a reference to an object in a node-based container,
/// e.g. `std::unordered_map`. Values must support concurrent modification.
LockedPtr<std::shared_lock<Mutex>, Data> SharedMutableLockUnsafe() {
return {mutex_, data_};
}

LockedPtr<std::lock_guard<Mutex>, Data> Lock() { return {mutex_, data_}; }

LockedPtr<std::lock_guard<Mutex>, const Data> Lock() const {
Expand Down
8 changes: 5 additions & 3 deletions core/include/userver/utils/statistics/histogram_view.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ struct Access;
class HistogramView final {
public:
// trivially copyable
HistogramView(const HistogramView&) noexcept = default;
HistogramView& operator=(const HistogramView&) noexcept = default;
constexpr HistogramView(const HistogramView&) noexcept = default;
constexpr HistogramView& operator=(const HistogramView&) noexcept = default;

/// Returns the number of "normal" (non-"infinity") buckets.
std::size_t GetBucketCount() const noexcept;
Expand All @@ -48,7 +48,9 @@ class HistogramView final {
private:
friend struct impl::histogram::Access;

explicit HistogramView(const impl::histogram::Bucket* buckets) noexcept;
constexpr explicit HistogramView(
const impl::histogram::Bucket& buckets) noexcept
: buckets_(&buckets) {}

const impl::histogram::Bucket* buckets_;
};
Expand Down
28 changes: 22 additions & 6 deletions core/include/userver/utils/statistics/metric_value.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,34 @@ class MetricValue final {
public:
using RawType = std::variant<std::int64_t, double, Rate, HistogramView>;

/// Creates an unspecified metric value.
constexpr MetricValue() noexcept : value_(std::int64_t{0}) {}

/// Constructs MetricValue for tests.
/// @{
constexpr /*implicit*/ MetricValue(std::int64_t value) noexcept
: value_(value) {}

constexpr /*implicit*/ MetricValue(double value) noexcept : value_(value) {}

constexpr /*implicit*/ MetricValue(Rate value) noexcept : value_(value) {}

constexpr /*implicit*/ MetricValue(HistogramView value) noexcept
: value_(value) {}
/// @}

// trivially copyable
MetricValue(const MetricValue&) = default;
MetricValue& operator=(const MetricValue&) = default;

bool operator==(const MetricValue& other) const noexcept {
return value_ == other.value_;
friend bool operator==(const MetricValue& lhs,
const MetricValue& rhs) noexcept {
return lhs.value_ == rhs.value_;
}

bool operator!=(const MetricValue& other) const noexcept {
return value_ != other.value_;
friend bool operator!=(const MetricValue& lhs,
const MetricValue& rhs) noexcept {
return lhs.value_ != rhs.value_;
}

/// @brief Retrieve the value of an integer metric.
Expand Down Expand Up @@ -65,8 +83,6 @@ class MetricValue final {
}

/// @cond
MetricValue() noexcept : value_(std::int64_t{0}) {}

explicit MetricValue(RawType value) noexcept : value_(value) {}
/// @endcond

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class StripedRateCounter final {
return val_.Read() + offset_.load(std::memory_order_relaxed);
}

concurrent::StripedCounter val_;
USERVER_NAMESPACE::concurrent::StripedCounter val_;
std::atomic<std::uintptr_t> offset_{0};
};

Expand Down
5 changes: 0 additions & 5 deletions core/src/utils/statistics/histogram_view.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,6 @@ static_assert(std::is_trivially_copyable_v<HistogramView> &&
"HistogramView should fit in registers, because it is expected "
"to be passed around by value");

HistogramView::HistogramView(const impl::histogram::Bucket* buckets) noexcept
: buckets_(buckets) {
UASSERT(buckets);
}

std::size_t HistogramView::GetBucketCount() const noexcept {
UASSERT(buckets_);
return buckets_[0].upper_bound.size;
Expand Down
3 changes: 2 additions & 1 deletion core/src/utils/statistics/impl/histogram_view_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ namespace utils::statistics::impl::histogram {

struct Access final {
static HistogramView MakeView(const Bucket* buckets) noexcept {
return HistogramView{buckets};
UASSERT(buckets);
return HistogramView{*buckets};
}

template <typename AnyHistogramView>
Expand Down
1 change: 1 addition & 0 deletions core/utest/include/userver/utils/statistics/testing.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
/// @brief Utilities for analyzing emitted metrics in unit tests

#include <iosfwd>
#include <optional>
#include <stdexcept>
#include <string>
#include <vector>
Expand Down
38 changes: 38 additions & 0 deletions grpc/include/userver/ugrpc/byte_buffer_utils.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#pragma once

/// @file userver/ugrpc/byte_buffer_utils.hpp
/// @brief Helper functions for working with `grpc::ByteBuffer`

#include <cstddef>

#include <google/protobuf/message.h>
#include <grpcpp/support/byte_buffer.h>

USERVER_NAMESPACE_BEGIN

namespace ugrpc {

/// @see @ref SerializeToByteBuffer
inline constexpr std::size_t kDefaultSerializeBlockSize = 4096;

/// @brief Serialize a Protobuf message to the wire format.
/// @param message the message to serialize
/// @param block_size can be used for performance tuning, too small chunk size
/// results in extra allocations, too large chunk size results in wasting memory
/// @throws std::runtime_error on serialization errors (supposedly only happens
/// on extremely rare allocation failures or proto reflection malfunctioning).
grpc::ByteBuffer SerializeToByteBuffer(
const ::google::protobuf::Message& message,
std::size_t block_size = kDefaultSerializeBlockSize);

/// @brief Parse a Protobuf message from the wire format.
/// @param buffer the buffer that might be tempered with during deserialization
/// @param message will contain the parsing result on success
/// @returns `true` on success, `false` if @a buffer does not contain a valid
/// message, according to the derived type of @a message
bool ParseFromByteBuffer(grpc::ByteBuffer&& buffer,
::google::protobuf::Message& message);

} // namespace ugrpc

USERVER_NAMESPACE_END
23 changes: 12 additions & 11 deletions grpc/include/userver/ugrpc/client/client_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ struct ClientFactorySettings final {
std::size_t channel_count{1};
};

/// @brief Creates generated gRPC clients. Has a minimal built-in channel cache:
/// @ingroup userver_clients
///
/// @brief Creates gRPC clients. Has a minimal built-in channel cache:
/// as long as a channel to the same endpoint is used somewhere, the same
/// channel is given out.
class ClientFactory final {
Expand Down Expand Up @@ -84,17 +86,16 @@ class ClientFactory final {
template <typename Client>
Client ClientFactory::MakeClient(const std::string& client_name,
const std::string& endpoint) {
auto& statistics = client_statistics_storage_.GetServiceStatistics(
Client::GetMetadata(), endpoint);

Middlewares mws;
mws.reserve(mws_.size());
for (const auto& mw_factory : mws_)
mws.push_back(mw_factory->GetMiddleware(client_name));

return Client(impl::ClientParams{
client_name, std::move(mws), queue_, statistics,
GetChannel(client_name, endpoint), config_source_, testsuite_grpc_});
client_name,
endpoint,
impl::InstantiateMiddlewares(mws_, client_name),
queue_,
client_statistics_storage_,
GetChannel(client_name, endpoint),
config_source_,
testsuite_grpc_,
});
}

} // namespace ugrpc::client
Expand Down
88 changes: 88 additions & 0 deletions grpc/include/userver/ugrpc/client/generic.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#pragma once

/// @file userver/ugrpc/client/generic.hpp
/// @brief @copybrief ugrpc::client::GenericClient

#include <optional>
#include <string_view>

#include <grpcpp/client_context.h>
#include <grpcpp/support/byte_buffer.h>

#include <userver/ugrpc/client/impl/client_data.hpp>
#include <userver/ugrpc/client/qos.hpp>
#include <userver/ugrpc/client/rpc.hpp>

USERVER_NAMESPACE_BEGIN

namespace ugrpc::client {

struct GenericOptions {
/// Client QOS for this call. Note that there is no QOS dynamic config by
/// default, so unless a timeout is specified here, only the deadline
/// propagation mechanism will affect the gRPC deadline.
Qos qos{};

/// If non-`nullopt`, metrics are accounted for specified fake call name.
/// If `nullopt` (by default), writes a set of metrics per real call name.
/// If the microservice serves as a proxy and has untrusted clients, it is
/// a good idea to have this option set to non-`nullopt` to avoid
/// the situations where an upstream client can spam various RPCs with
/// non-existent names, which leads to this microservice spamming RPCs
/// with non-existent names, which leads to creating storage for infinite
/// metrics and causes OOM.
std::optional<std::string_view> metrics_call_name{};
};

/// @ingroup userver_clients
///
/// @brief Allows to talk to gRPC services (generic and normal) using dynamic
/// method names.
///
/// Created using @ref ClientFactory::MakeClient.
///
/// `call_name` must be in the format `full.path.to.TheService/MethodName`.
/// Note that unlike in base grpc++, there must be no initial `/` character.
///
/// The API is mainly intended for proxies, where the request-response body is
/// passed unchanged, with settings taken solely from the RPC metadata.
/// In cases where the code needs to operate on the actual messages,
/// serialization of requests and responses is left as an excercise to the user.
///
/// Middlewares are customizable and are applied as usual, except that no
/// message hooks are called, meaning that there won't be any logs of messages
/// from the default middleware.
///
/// Metrics are written per-method by default, which causes OOM in some corner
/// cases, for details see @ref GenericOptions::metrics_call_name.
///
/// ## Example GenericClient usage with known message types
///
/// @snippet grpc/tests/src/generic_client_test.cpp sample
class GenericClient final {
public:
GenericClient(GenericClient&&) noexcept = default;
GenericClient& operator=(GenericClient&&) noexcept = delete;

/// Initiate a `single request -> single response` RPC with the given name.
client::UnaryCall<grpc::ByteBuffer> UnaryCall(
std::string_view call_name, const grpc::ByteBuffer& request,
std::unique_ptr<grpc::ClientContext> context =
std::make_unique<grpc::ClientContext>(),
const GenericOptions& options = {}) const;

/// @cond
// For internal use only.
explicit GenericClient(impl::ClientParams&&);
/// @endcond

private:
template <typename Client>
friend impl::ClientData& impl::GetClientData(Client& client);

impl::ClientData impl_;
};

} // namespace ugrpc::client

USERVER_NAMESPACE_END
24 changes: 2 additions & 22 deletions grpc/include/userver/ugrpc/client/impl/async_methods.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <userver/ugrpc/client/impl/async_method_invocation.hpp>
#include <userver/ugrpc/client/impl/call_params.hpp>
#include <userver/ugrpc/impl/async_method_invocation.hpp>
#include <userver/ugrpc/impl/maybe_owned_string.hpp>
#include <userver/ugrpc/impl/statistics_scope.hpp>

USERVER_NAMESPACE_BEGIN
Expand All @@ -45,27 +46,6 @@ using RawReaderWriter =
std::unique_ptr<grpc::ClientAsyncReaderWriter<Request, Response>>;
/// @}

/// @{
/// @brief Helper type aliases for stub member function pointers
template <typename Stub, typename Request, typename Response>
using RawResponseReaderPreparer = RawResponseReader<Response> (Stub::*)(
grpc::ClientContext*, const Request&, grpc::CompletionQueue*);

template <typename Stub, typename Request, typename Response>
using RawReaderPreparer = RawReader<Response> (Stub::*)(grpc::ClientContext*,
const Request&,
grpc::CompletionQueue*);

template <typename Stub, typename Request, typename Response>
using RawWriterPreparer = RawWriter<Request> (Stub::*)(grpc::ClientContext*,
Response*,
grpc::CompletionQueue*);

template <typename Stub, typename Request, typename Response>
using RawReaderWriterPreparer = RawReaderWriter<Request, Response> (Stub::*)(
grpc::ClientContext*, grpc::CompletionQueue*);
/// @}

struct RpcConfigValues final {
explicit RpcConfigValues(const dynamic_config::Snapshot& config);

Expand Down Expand Up @@ -156,7 +136,7 @@ class RpcData final {
private:
std::unique_ptr<grpc::ClientContext> context_;
std::string client_name_;
std::string_view call_name_;
ugrpc::impl::MaybeOwnedString call_name_;
bool writes_finished_{false};
bool is_finished_{false};
bool is_deadline_propagated_{false};
Expand Down
Loading

0 comments on commit 9646b90

Please sign in to comment.