Skip to content

Commit

Permalink
feat grpc-client: move deadline propagation to middleware
Browse files Browse the repository at this point in the history
  • Loading branch information
DmitriySud committed Jul 5, 2023
1 parent 5afd699 commit 799cc6e
Show file tree
Hide file tree
Showing 17 changed files with 272 additions and 95 deletions.
2 changes: 2 additions & 0 deletions grpc/functional_tests/basic_chaos/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <userver/components/minimal_server_component_list.hpp>
#include <userver/testsuite/testsuite_support.hpp>
#include <userver/ugrpc/client/middlewares/deadline_propagation/component.hpp>
#include <userver/ugrpc/client/middlewares/log/component.hpp>
#include <userver/ugrpc/server/middlewares/deadline_propagation/component.hpp>
#include <userver/ugrpc/server/middlewares/log/component.hpp>
Expand All @@ -19,6 +20,7 @@ int main(int argc, char* argv[]) {
.Append<ugrpc::server::middlewares::log::Component>()
.Append<ugrpc::server::middlewares::deadline_propagation::Component>()
.Append<ugrpc::client::middlewares::log::Component>()
.Append<ugrpc::client::middlewares::deadline_propagation::Component>()
.Append<ugrpc::client::ClientFactoryComponent>()
.Append<samples::GreeterServiceComponent>()
.Append<samples::GreeterClient>()
Expand Down
2 changes: 2 additions & 0 deletions grpc/functional_tests/basic_chaos/static_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ components_manager:
channel-args: {}
middlewares:
- grpc-client-logging
- grpc-client-deadline-propagation
greeter-client:
endpoint: '[::]:8081'

Expand All @@ -56,6 +57,7 @@ components_manager:

testsuite-support:
grpc-client-logging:
grpc-client-deadline-propagation:

default_task_processor: main-task-processor
task_processors:
Expand Down
2 changes: 2 additions & 0 deletions grpc/functional_tests/metrics/service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <userver/yaml_config/merge_schemas.hpp>

#include <userver/ugrpc/client/client_factory_component.hpp>
#include <userver/ugrpc/client/middlewares/deadline_propagation/component.hpp>
#include <userver/ugrpc/client/middlewares/log/component.hpp>
#include <userver/ugrpc/server/middlewares/deadline_propagation/component.hpp>
#include <userver/ugrpc/server/middlewares/log/component.hpp>
Expand Down Expand Up @@ -121,6 +122,7 @@ int main(int argc, const char* const argv[]) {
.Append<ugrpc::server::middlewares::log::Component>()
.Append<ugrpc::server::middlewares::deadline_propagation::Component>()
.Append<ugrpc::client::middlewares::log::Component>()
.Append<ugrpc::client::middlewares::deadline_propagation::Component>()
.Append<ugrpc::client::ClientFactoryComponent>()
.Append<functional_tests::GreeterClient>()
.Append<functional_tests::GreeterServiceComponent>();
Expand Down
2 changes: 2 additions & 0 deletions grpc/functional_tests/metrics/static_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ components_manager:
channel-args: {}
middlewares:
- grpc-client-logging
- grpc-client-deadline-propagation
greeter-client:
endpoint: '[::]:8081'

Expand All @@ -62,6 +63,7 @@ components_manager:

testsuite-support:
grpc-client-logging:
grpc-client-deadline-propagation:

default_task_processor: main-task-processor
task_processors:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#pragma once

/// @file userver/ugrpc/client/middlewares/deadline_propagation/component.hpp
/// @brief @copybrief
/// ugrpc::client::middlewares::deadline_propagation::Component

#include <userver/ugrpc/client/middlewares/base.hpp>

USERVER_NAMESPACE_BEGIN

/// Client logging middleware
namespace ugrpc::client::middlewares::deadline_propagation {

/// @ingroup userver_components
///
/// @brief Component for gRPC client deadline_propagation. Update deadline
/// from TaskInheritedData if it exists and more strict than
/// context deadline.

class Component final : public MiddlewareComponentBase {
public:
static constexpr std::string_view kName = "grpc-client-deadline-propagation";

Component(const components::ComponentConfig& config,
const components::ComponentContext& context);

std::shared_ptr<const MiddlewareFactoryBase> GetMiddlewareFactory() override;

static yaml_config::Schema GetStaticConfigSchema();
};

} // namespace ugrpc::client::middlewares::deadline_propagation

USERVER_NAMESPACE_END
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#pragma once

/// @file userver/ugrpc/client/logging/component.hpp
/// @file userver/ugrpc/client/middleware/log/component.hpp
/// @brief @copybrief ugrpc::client::middlewares::log::Component

#include <userver/ugrpc/client/middlewares/base.hpp>
Expand Down
46 changes: 32 additions & 14 deletions grpc/include/userver/ugrpc/client/rpc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <userver/ugrpc/client/impl/channel_cache.hpp>
#include <userver/ugrpc/client/middlewares/fwd.hpp>
#include <userver/ugrpc/impl/deadline_timepoint.hpp>
#include <userver/ugrpc/impl/internal_tag_fwd.hpp>
#include <userver/ugrpc/impl/statistics_scope.hpp>

USERVER_NAMESPACE_BEGIN
Expand Down Expand Up @@ -108,6 +109,14 @@ class CallAnyBase {
/// @returns RPC name
std::string_view GetCallName() const;

/// @returns RPC span
tracing::Span& GetSpan();

/// @cond
// For internal use only
impl::RpcData& GetData(ugrpc::impl::InternalTag);
/// @endcond

protected:
impl::RpcData& GetData();

Expand Down Expand Up @@ -465,12 +474,15 @@ InputStream<Response>::InputStream(
impl::CallParams&& params, Stub& stub,
impl::RawReaderPreparer<Stub, Request, Response> prepare_func,
const Request& req)
: CallAnyBase(std::move(params)),
stream_((stub.*prepare_func)(&GetData().GetContext(), req,
&GetData().GetQueue())) {
: CallAnyBase(std::move(params)) {
CallMiddlewares(
GetData().GetMiddlewares(), *this,
[this] { impl::StartCall(*stream_, GetData()); }, &req);
[&] {
stream_ = (stub.*prepare_func)(&GetData().GetContext(), req,
&GetData().GetQueue());
impl::StartCall(*stream_, GetData());
},
&req);
GetData().SetWritesFinished();
}

Expand All @@ -492,14 +504,17 @@ OutputStream<Request, Response>::OutputStream(
impl::CallParams&& params, Stub& stub,
impl::RawWriterPreparer<Stub, Request, Response> prepare_func)
: CallAnyBase(std::move(params)),
final_response_(std::make_unique<Response>()),
// 'final_response_' will be filled upon successful 'Finish' async call
stream_((stub.*prepare_func)(&GetData().GetContext(),
final_response_.get(),
&GetData().GetQueue())) {
final_response_(std::make_unique<Response>()) {
CallMiddlewares(
GetData().GetMiddlewares(), *this,
[this] { impl::StartCall(*stream_, GetData()); }, nullptr);
[&] {
// 'final_response_' will be filled upon successful 'Finish' async call
stream_ =
(stub.*prepare_func)(&GetData().GetContext(), final_response_.get(),
&GetData().GetQueue());
impl::StartCall(*stream_, GetData());
},
nullptr);
}

template <typename Request, typename Response>
Expand Down Expand Up @@ -540,12 +555,15 @@ template <typename Stub>
BidirectionalStream<Request, Response>::BidirectionalStream(
impl::CallParams&& params, Stub& stub,
impl::RawReaderWriterPreparer<Stub, Request, Response> prepare_func)
: CallAnyBase(std::move(params)),
stream_((stub.*prepare_func)(&GetData().GetContext(),
&GetData().GetQueue())) {
: CallAnyBase(std::move(params)) {
CallMiddlewares(
GetData().GetMiddlewares(), *this,
[this] { impl::StartCall(*stream_, GetData()); }, nullptr);
[&] {
stream_ = (stub.*prepare_func)(&GetData().GetContext(),
&GetData().GetQueue());
impl::StartCall(*stream_, GetData());
},
nullptr);
}

template <typename Request, typename Response>
Expand Down
18 changes: 9 additions & 9 deletions grpc/include/userver/ugrpc/server/rpc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class CallAnyBase {
/// @brief Name of the call. Consists of service and method names
std::string_view GetCallName() const { return params_.call_name; }

tracing::Span& GetCallSpan() { return params_.call_span; }
tracing::Span& GetSpan() { return params_.call_span; }

/// @cond
// For internal use only
Expand Down Expand Up @@ -282,7 +282,7 @@ void UnaryCall<Response>::Finish(const Response& response) {
LogFinish(grpc::Status::OK);
impl::Finish(stream_, response, grpc::Status::OK, GetCallName());
Statistics().OnExplicitFinish(grpc::StatusCode::OK);
ugrpc::impl::UpdateSpanWithStatus(GetCallSpan(), grpc::Status::OK);
ugrpc::impl::UpdateSpanWithStatus(GetSpan(), grpc::Status::OK);
}

template <typename Response>
Expand All @@ -292,7 +292,7 @@ void UnaryCall<Response>::FinishWithError(const grpc::Status& status) {
LogFinish(status);
impl::FinishWithError(stream_, status, GetCallName());
Statistics().OnExplicitFinish(status.error_code());
ugrpc::impl::UpdateSpanWithStatus(GetCallSpan(), status);
ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
}

template <typename Request, typename Response>
Expand Down Expand Up @@ -328,7 +328,7 @@ void InputStream<Request, Response>::Finish(const Response& response) {
LogFinish(grpc::Status::OK);
impl::Finish(stream_, response, grpc::Status::OK, GetCallName());
Statistics().OnExplicitFinish(grpc::StatusCode::OK);
ugrpc::impl::UpdateSpanWithStatus(GetCallSpan(), grpc::Status::OK);
ugrpc::impl::UpdateSpanWithStatus(GetSpan(), grpc::Status::OK);
}

template <typename Request, typename Response>
Expand All @@ -341,7 +341,7 @@ void InputStream<Request, Response>::FinishWithError(
LogFinish(status);
impl::FinishWithError(stream_, status, GetCallName());
Statistics().OnExplicitFinish(status.error_code());
ugrpc::impl::UpdateSpanWithStatus(GetCallSpan(), status);
ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
}

template <typename Response>
Expand Down Expand Up @@ -381,7 +381,7 @@ void OutputStream<Response>::Finish() {
LogFinish(status);
impl::Finish(stream_, status, GetCallName());
Statistics().OnExplicitFinish(grpc::StatusCode::OK);
ugrpc::impl::UpdateSpanWithStatus(GetCallSpan(), status);
ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
}

template <typename Response>
Expand All @@ -393,7 +393,7 @@ void OutputStream<Response>::FinishWithError(const grpc::Status& status) {
LogFinish(status);
impl::Finish(stream_, status, GetCallName());
Statistics().OnExplicitFinish(status.error_code());
ugrpc::impl::UpdateSpanWithStatus(GetCallSpan(), status);
ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
}

template <typename Response>
Expand Down Expand Up @@ -456,7 +456,7 @@ void BidirectionalStream<Request, Response>::Finish() {
LogFinish(status);
impl::Finish(stream_, status, GetCallName());
Statistics().OnExplicitFinish(grpc::StatusCode::OK);
ugrpc::impl::UpdateSpanWithStatus(GetCallSpan(), status);
ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
}

template <typename Request, typename Response>
Expand All @@ -469,7 +469,7 @@ void BidirectionalStream<Request, Response>::FinishWithError(
LogFinish(status);
impl::Finish(stream_, status, GetCallName());
Statistics().OnExplicitFinish(status.error_code());
ugrpc::impl::UpdateSpanWithStatus(GetCallSpan(), status);
ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
}

template <typename Request, typename Response>
Expand Down
3 changes: 0 additions & 3 deletions grpc/src/tests/deadline_metrics_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,6 @@ class DeadlineStatsTests
true);
experiments_.Set(utils::impl::kGrpcServerDeadlinePropagationExperiment,
true);
GetServerMiddlewares().push_back(
std::make_shared<
ugrpc::server::middlewares::deadline_propagation::Middleware>());
}

void BeSlow() { GetService().SetWaitDeadline(true); }
Expand Down
50 changes: 26 additions & 24 deletions grpc/src/tests/deadline_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <userver/utils/impl/userver_experiments.hpp>

#include <ugrpc/client/impl/client_configs.hpp>
#include <ugrpc/client/middlewares/deadline_propagation/middleware.hpp>
#include <ugrpc/server/impl/server_configs.hpp>
#include <ugrpc/server/middlewares/deadline_propagation/middleware.hpp>
#include <userver/ugrpc/client/exceptions.hpp>
Expand Down Expand Up @@ -132,16 +133,13 @@ class GrpcDeadlinePropagation
: public GrpcServiceFixtureSimple<UnitTestDeadlinePropagationService> {
public:
using ClientType = sample::ugrpc::UnitTestServiceClient;

GrpcDeadlinePropagation()
: client_deadline_(
engine::Deadline::FromDuration(helpers::kShortTimeout)),
long_deadline_(engine::Deadline::FromDuration(helpers::kLongTimeout)),
client_(MakeClient<ClientType>()) {
helpers::InitTaskInheritedDeadline(client_deadline_);
ExtendDynamicConfig({
{ugrpc::client::impl::kEnforceClientTaskDeadline, true},
{ugrpc::server::impl::kServerCancelTaskByDeadline, true},
});
experiments_.Set(utils::impl::kGrpcClientDeadlinePropagationExperiment,
true);
experiments_.Set(utils::impl::kGrpcServerDeadlinePropagationExperiment,
Expand Down Expand Up @@ -328,15 +326,8 @@ class UnitTestInheritedDeadline final
engine::Deadline client_deadline_;
};

class GrpcTestInheritedDedline
: public GrpcServiceFixtureSimple<UnitTestInheritedDeadline> {
public:
GrpcTestInheritedDedline() {
GetServerMiddlewares().push_back(
std::make_shared<
ugrpc::server::middlewares::deadline_propagation::Middleware>());
}
};
using GrpcTestInheritedDedline =
GrpcServiceFixtureSimple<UnitTestInheritedDeadline>;

} // namespace

Expand All @@ -362,22 +353,33 @@ namespace {

class UnitTestClientNotSend final : public sample::ugrpc::UnitTestServiceBase {
public:
void SayHello(SayHelloCall& call,
sample::ugrpc::GreetingRequest&& request) override {
UASSERT(false);
sample::ugrpc::GreetingResponse response;
response.set_name("Hello " + request.name());

call.Finish(response);
void SayHello(SayHelloCall& /*call*/,
sample::ugrpc::GreetingRequest&& /*request*/) override {
FAIL();
}
};

using GrpcTestClientNotSendData =
GrpcServiceFixtureSimple<UnitTestClientNotSend>;
class GrpcTestClientNotSendData
: public GrpcServiceFixtureSimple<UnitTestClientNotSend> {
public:
using ClientType = sample::ugrpc::UnitTestServiceClient;

GrpcTestClientNotSendData() : client_(MakeClient<ClientType>()) {
experiments_.Set(utils::impl::kGrpcClientDeadlinePropagationExperiment,
true);
experiments_.Set(utils::impl::kGrpcServerDeadlinePropagationExperiment,
true);
}
ClientType& Client() { return client_; }

private:
ClientType client_;
utils::impl::UserverExperimentsScope experiments_;
};

} // namespace

UTEST_F(GrpcDeadlinePropagation, TestClientDoNotStartCallWithoutDeadline) {
UTEST_F(GrpcTestClientNotSendData, TestClientDoNotStartCallWithoutDeadline) {
auto task_deadline = engine::Deadline::FromDuration(helpers::kShortTimeout);
helpers::InitTaskInheritedDeadline(task_deadline);

Expand All @@ -393,7 +395,7 @@ UTEST_F(GrpcDeadlinePropagation, TestClientDoNotStartCallWithoutDeadline) {
UEXPECT_THROW(in = call.Finish(), ugrpc::client::DeadlineExceededError);
}

UTEST_F(GrpcDeadlinePropagation, TestClientDoNotStartCallWithDeadline) {
UTEST_F(GrpcTestClientNotSendData, TestClientDoNotStartCallWithDeadline) {
auto task_deadline = engine::Deadline::FromDuration(helpers::kShortTimeout);
helpers::InitTaskInheritedDeadline(task_deadline);

Expand Down
Loading

0 comments on commit 799cc6e

Please sign in to comment.