Skip to content

Commit

Permalink
Implemented but not tested
Browse files Browse the repository at this point in the history
  • Loading branch information
domwst committed Feb 18, 2024
1 parent d77fd47 commit 0592900
Show file tree
Hide file tree
Showing 9 changed files with 51 additions and 24 deletions.
10 changes: 10 additions & 0 deletions ydb/core/grpc_services/base/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <ydb/library/yql/public/issue/yql_issue_manager.h>
#include <ydb/library/aclib/aclib.h>

#include <ydb/core/jaeger_tracing/request_discriminator.h>
#include <ydb/core/grpc_services/counters/proxy_counters.h>
#include <ydb/core/grpc_streaming/grpc_streaming.h>
#include <ydb/core/tx/scheme_board/events.h>
Expand Down Expand Up @@ -347,6 +348,7 @@ struct TRequestAuxSettings {
TRateLimiterMode RlMode = TRateLimiterMode::Off;
void (*CustomAttributeProcessor)(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData, ICheckerIface*) = nullptr;
TAuditMode AuditMode = TAuditMode::Off;
NJaegerTracing::TRequestDiscriminator RequestDiscriminator = {};
};

// grpc_request_proxy part
Expand All @@ -368,6 +370,10 @@ class IRequestProxyCtx : public virtual IRequestCtxBase {
virtual void LegacyFinishSpan() = 0;

// Used for per-type sampling
virtual const NJaegerTracing::TRequestDiscriminator& GetRequestDiscriminator() const {
return NJaegerTracing::TRequestDiscriminator::EMPTY;
};

virtual const TString& GetInternalRequestType() const = 0;

// validation
Expand Down Expand Up @@ -1460,6 +1466,10 @@ class TGrpcRequestCall
}
}

const NJaegerTracing::TRequestDiscriminator& GetRequestDiscriminator() const override {
return AuxSettings.RequestDiscriminator;
}

// IRequestCtxBaseMtSafe
//
bool IsAuditable() const override {
Expand Down
6 changes: 1 addition & 5 deletions ydb/core/grpc_services/grpc_request_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,15 +413,11 @@ bool TGRpcRequestProxyImpl::IsAuthStateOK(const IRequestProxyCtx& ctx) {
}

void TGRpcRequestProxyImpl::MaybeStartTracing(IRequestProxyCtx& ctx) {
auto requestType = ctx.GetInternalRequestType();
if (requestType.empty()) {
return;
}
NWilson::TTraceId traceId;
if (const auto otelHeader = ctx.GetPeerMetaValues(NYdb::OTEL_TRACE_HEADER)) {
traceId = NWilson::TTraceId::FromTraceparentHeader(otelHeader.GetRef());
}
TracingControl->HandleTracing(traceId, NJaegerTracing::TRequestDiscriminator{});
TracingControl->HandleTracing(traceId, ctx.GetRequestDiscriminator());
if (traceId) {
NWilson::TSpan grpcRequestProxySpan(TWilsonGrpc::RequestProxy, std::move(traceId), "GrpcRequestProxy");
ctx.StartTracing(std::move(grpcRequestProxySpan));
Expand Down
9 changes: 9 additions & 0 deletions ydb/core/jaeger_tracing/request_discriminator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#include "request_discriminator.h"

namespace NKikimr::NJaegerTracing {

const TRequestDiscriminator TRequestDiscriminator::EMPTY {
.RequestType = ERequestType::UNSPECIFIED,
};

} // namespace NKikimr::NJaegerTracing
2 changes: 2 additions & 0 deletions ydb/core/jaeger_tracing/request_discriminator.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ static const THashMap<TStringBuf, ERequestType> NameToRequestType = {

struct TRequestDiscriminator {
ERequestType RequestType = ERequestType::UNSPECIFIED;

static const TRequestDiscriminator EMPTY;
};

} // namespace NKikimr::NJaegerTracing
2 changes: 1 addition & 1 deletion ydb/core/jaeger_tracing/sampling_throttling_control.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ TSamplingThrottlingControl::TSamplingThrottlingControl(std::unique_ptr<TSampling
: Impl(std::move(initialImpl))
{}

void TSamplingThrottlingControl::HandleTracing(NWilson::TTraceId& traceId, TRequestDiscriminator discriminator) {
void TSamplingThrottlingControl::HandleTracing(NWilson::TTraceId& traceId, const TRequestDiscriminator& discriminator) {
if (ImplUpdate.load(std::memory_order_relaxed)) {
auto newImpl = ImplUpdate.exchange(nullptr, std::memory_order_relaxed);
Y_ABORT_UNLESS(newImpl);
Expand Down
6 changes: 2 additions & 4 deletions ydb/core/jaeger_tracing/sampling_throttling_control.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,11 @@

namespace NKikimr::NJaegerTracing {

class TSamplingThrottlingControl
: public TThrRefBase
, private TMoveOnly {
class TSamplingThrottlingControl: public TThrRefBase {
friend class TSamplingThrottlingConfigurator;

public:
void HandleTracing(NWilson::TTraceId& traceId, TRequestDiscriminator discriminator);
void HandleTracing(NWilson::TTraceId& traceId, const TRequestDiscriminator& discriminator);

private:
struct TSamplingThrottlingImpl;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/jaeger_tracing/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ PEERDIR(

SRCS(
request_discriminator.h
request_discriminator.cpp
sampler.h
sampling_throttling_configurator.cpp
sampling_throttling_configurator.h
Expand Down
34 changes: 20 additions & 14 deletions ydb/services/keyvalue/grpc_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <ydb/core/grpc_services/grpc_helper.h>
#include <ydb/core/grpc_services/base/base.h>
#include <ydb/core/grpc_services/service_keyvalue.h>
#include <ydb/core/jaeger_tracing/request_discriminator.h>


namespace NKikimr::NGRpcService {
Expand Down Expand Up @@ -40,7 +41,7 @@ void TKeyValueGRpcService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) {
#error SETUP_METHOD macro collision
#endif

#define SETUP_METHOD(methodName, method, rlMode) \
#define SETUP_METHOD(methodName, method, rlMode, requestType) \
MakeIntrusive<NGRpcService::TGRpcRequest< \
Ydb::KeyValue::Y_CAT(methodName, Request), \
Ydb::KeyValue::Y_CAT(methodName, Response), \
Expand All @@ -54,26 +55,31 @@ void TKeyValueGRpcService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) {
ActorSystem->Send(GRpcRequestProxyId, new TGrpcRequestOperationCall< \
Ydb::KeyValue::Y_CAT(methodName, Request), \
Ydb::KeyValue::Y_CAT(methodName, Response)>(reqCtx, &method, \
TRequestAuxSettings{rlMode, nullptr})); \
TRequestAuxSettings { \
.RlMode = rlMode, \
.RequestDiscriminator = { \
.RequestType = NJaegerTracing::ERequestType::requestType, \
}, \
})); \
}, \
&Ydb::KeyValue::V1::KeyValueService::AsyncService::Y_CAT(Request, methodName), \
"KeyValue/" Y_STRINGIZE(methodName), \
logger, \
getCounterBlock("keyvalue", Y_STRINGIZE(methodName)) \
)->Run()

SETUP_METHOD(CreateVolume, DoCreateVolumeKeyValue, TRateLimiterMode::Rps);
SETUP_METHOD(DropVolume, DoDropVolumeKeyValue, TRateLimiterMode::Rps);
SETUP_METHOD(AlterVolume, DoAlterVolumeKeyValue, TRateLimiterMode::Rps);
SETUP_METHOD(DescribeVolume, DoDescribeVolumeKeyValue, TRateLimiterMode::Rps);
SETUP_METHOD(ListLocalPartitions, DoListLocalPartitionsKeyValue, TRateLimiterMode::Rps);

SETUP_METHOD(AcquireLock, DoAcquireLockKeyValue, TRateLimiterMode::Rps);
SETUP_METHOD(ExecuteTransaction, DoExecuteTransactionKeyValue, TRateLimiterMode::Rps);
SETUP_METHOD(Read, DoReadKeyValue, TRateLimiterMode::Rps);
SETUP_METHOD(ReadRange, DoReadRangeKeyValue, TRateLimiterMode::Rps);
SETUP_METHOD(ListRange, DoListRangeKeyValue, TRateLimiterMode::Rps);
SETUP_METHOD(GetStorageChannelStatus, DoGetStorageChannelStatusKeyValue, TRateLimiterMode::Rps);
SETUP_METHOD(CreateVolume, DoCreateVolumeKeyValue, TRateLimiterMode::Rps, UNSPECIFIED);
SETUP_METHOD(DropVolume, DoDropVolumeKeyValue, TRateLimiterMode::Rps, UNSPECIFIED);
SETUP_METHOD(AlterVolume, DoAlterVolumeKeyValue, TRateLimiterMode::Rps, UNSPECIFIED);
SETUP_METHOD(DescribeVolume, DoDescribeVolumeKeyValue, TRateLimiterMode::Rps, UNSPECIFIED);
SETUP_METHOD(ListLocalPartitions, DoListLocalPartitionsKeyValue, TRateLimiterMode::Rps, UNSPECIFIED);

SETUP_METHOD(AcquireLock, DoAcquireLockKeyValue, TRateLimiterMode::Rps, KEYVALUE_ACQUIRELOCK);
SETUP_METHOD(ExecuteTransaction, DoExecuteTransactionKeyValue, TRateLimiterMode::Rps, KEYVALUE_EXECUTETRANSACTION);
SETUP_METHOD(Read, DoReadKeyValue, TRateLimiterMode::Rps, KEYVALUE_READ);
SETUP_METHOD(ReadRange, DoReadRangeKeyValue, TRateLimiterMode::Rps, KEYVALUE_READRANGE);
SETUP_METHOD(ListRange, DoListRangeKeyValue, TRateLimiterMode::Rps, KEYVALUE_LISTRANGE);
SETUP_METHOD(GetStorageChannelStatus, DoGetStorageChannelStatusKeyValue, TRateLimiterMode::Rps, KEYVALUE_GETSTORAGECHANNELSTATUS);

#undef SETUP_METHOD
}
Expand Down
5 changes: 5 additions & 0 deletions ydb/services/local_discovery/grpc_func_call.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ class TGrpcRequestFunctionCall
return true;
}
}

const NJaegerTracing::TRequestDiscriminator& GetRequestDiscriminator() const override {
return AuxSettings.RequestDiscriminator;
}

private:
TFuncCallback PassMethod;
const TRequestAuxSettings AuxSettings;
Expand Down

0 comments on commit 0592900

Please sign in to comment.