Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/base/counters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ static const THashSet<TString> DATABASE_SERVICES
TString("pqproxy|readSession"),
TString("pqproxy|schemecache"),
TString("pqproxy|mirrorWriteTimeLag"),
TString("pqproxy|userAgents"),
TString("datastreams"),
}};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,12 @@ class TGRpcConnectionsImpl
SetDatabaseHeader(meta, dbState->Database);
}

static const TStringType clientPid = GetClientPIDHeaderValue();

meta.Aux.push_back({YDB_SDK_BUILD_INFO_HEADER, CreateSDKBuildInfo()});
meta.Aux.push_back({YDB_CLIENT_PID, clientPid});
meta.Aux.insert(meta.Aux.end(), requestSettings.Header.begin(), requestSettings.Header.end());

dbState->StatCollector.IncGRpcInFlight();
dbState->StatCollector.IncGRpcInFlightByHost(endpoint.GetEndpoint());

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#include "data_plane_helpers.h"
#include <ydb/public/sdk/cpp/client/resources/ydb_resources.h>
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>

namespace NKikimr::NPersQueueTests {

Expand Down Expand Up @@ -51,7 +53,8 @@ namespace NKikimr::NPersQueueTests {
std::optional<ui32> partitionGroup,
std::optional<TString> codec,
std::optional<bool> reconnectOnFailure,
THashMap<TString, TString> sessionMeta
THashMap<TString, TString> sessionMeta,
const TString& userAgent
) {
auto settings = TWriteSessionSettings().Path(topic).MessageGroupId(sourceId);
if (partitionGroup) settings.PartitionGroupId(*partitionGroup);
Expand All @@ -66,6 +69,9 @@ namespace NKikimr::NPersQueueTests {
}
settings.MaxMemoryUsage(1024*1024*1024*1024ll);
settings.Meta_.Fields = sessionMeta;
if (!userAgent.empty()) {
settings.Header({{NYdb::YDB_APPLICATION_NAME, userAgent}});
}
return CreateSimpleWriter(driver, settings);
}

Expand All @@ -79,6 +85,21 @@ namespace NKikimr::NPersQueueTests {
return TPersQueueClient(driver, clientSettings).CreateReadSession(TReadSessionSettings(settings).DisableClusterDiscovery(true));
}

std::shared_ptr<NYdb::NTopic::IReadSession> CreateReader(
NYdb::TDriver& driver,
const NYdb::NTopic::TReadSessionSettings& settings,
std::shared_ptr<NYdb::ICredentialsProviderFactory> creds,
const TString& userAgent
) {
NYdb::NTopic::TTopicClientSettings clientSettings;
if (creds) clientSettings.CredentialsProviderFactory(creds);
auto readerSettings = settings;
if (!userAgent.empty()) {
readerSettings.Header({{NYdb::YDB_APPLICATION_NAME, userAgent}});
}
return NYdb::NTopic::TTopicClient(driver, clientSettings).CreateReadSession(readerSettings);
}

TMaybe<TReadSessionEvent::TDataReceivedEvent> GetNextMessageSkipAssignment(std::shared_ptr<IReadSession>& reader, TDuration timeout) {
while (true) {
auto future = reader->WaitEvent();
Expand All @@ -99,4 +120,25 @@ namespace NKikimr::NPersQueueTests {
}
return {};
}

TMaybe<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent> GetNextMessageSkipAssignment(std::shared_ptr<NYdb::NTopic::IReadSession>& reader, TDuration timeout) {
while (true) {
auto future = reader->WaitEvent();
future.Wait(timeout);

TMaybe<NYdb::NTopic::TReadSessionEvent::TEvent> event = reader->GetEvent(false, 1);
if (!event)
return {};
if (auto e = std::get_if<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent>(&*event)) {
return *e;
} else if (auto* e = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&*event)) {
e->Confirm();
} else if (auto* e = std::get_if<NYdb::NTopic::TReadSessionEvent::TStopPartitionSessionEvent>(&*event)) {
e->Confirm();
} else if (std::get_if<NYdb::NTopic::TSessionClosedEvent>(&*event)) {
return {};
}
}
return {};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
#include <ydb/public/sdk/cpp/client/ydb_persqueue_public/persqueue.h>
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>

namespace NKikimr::NPersQueueTests {

Expand Down Expand Up @@ -34,16 +35,24 @@ namespace NKikimr::NPersQueueTests {
std::optional<ui32> partitionGroup = {},
std::optional<TString> codec = {},
std::optional<bool> reconnectOnFailure = {},
THashMap<TString, TString> sessionMeta = {}
THashMap<TString, TString> sessionMeta = {},
const TString& userAgent = {}
);

std::shared_ptr<NYdb::NPersQueue::IReadSession> CreateReader(
NYdb::TDriver& driver,
const NYdb::NPersQueue::TReadSessionSettings& settings,
std::shared_ptr<NYdb::ICredentialsProviderFactory> creds = nullptr
);

std::shared_ptr<NYdb::NTopic::IReadSession> CreateReader(
NYdb::TDriver& driver,
const NYdb::NTopic::TReadSessionSettings& settings,
std::shared_ptr<NYdb::ICredentialsProviderFactory> creds = nullptr,
const TString& userAgent = ""
);

TMaybe<NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent> GetNextMessageSkipAssignment(std::shared_ptr<NYdb::NPersQueue::IReadSession>& reader, TDuration timeout = TDuration::Max());
TMaybe<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent> GetNextMessageSkipAssignment(std::shared_ptr<NYdb::NTopic::IReadSession>& reader, TDuration timeout = TDuration::Max());

}
5 changes: 5 additions & 0 deletions ydb/services/deprecated/persqueue_v0/grpc_pq_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,7 @@ class TWriteSessionActor : public NActors::TActorBootstrapped<TWriteSessionActor

void GenerateNextWriteRequest(const NActors::TActorContext& ctx);

void SetupBytesWrittenByUserAgentCounter();
void SetupCounters();
void SetupCounters(const TString& cloudId, const TString& dbId, const TString& dbPath,
bool isServerless, const TString& folderId);
Expand Down Expand Up @@ -570,6 +571,7 @@ class TWriteSessionActor : public NActors::TActorBootstrapped<TWriteSessionActor
TInstant StartTime;
NKikimr::NPQ::TPercentileCounter InitLatency;
NKikimr::NPQ::TMultiCounter SLIBigLatency;
NYdb::NPersQueue::TCounterPtr BytesWrittenByUserAgent;

THolder<NPersQueue::TTopicNamesConverterFactory> ConverterFactory;
NPersQueue::TDiscoveryConverterPtr DiscoveryConverter;
Expand Down Expand Up @@ -699,6 +701,7 @@ class TReadSessionActor : public TActorBootstrapped<TReadSessionActor> {
void SendAuthRequest(const TActorContext& ctx);
void CreateInitAndAuthActor(const TActorContext& ctx);

void SetupBytesReadByUserAgentCounter();
void SetupCounters();
void SetupTopicCounters(const NPersQueue::TTopicConverterPtr& topic);
void SetupTopicCounters(const NPersQueue::TTopicConverterPtr& topic, const TString& cloudId, const TString& dbId,
Expand Down Expand Up @@ -746,6 +749,7 @@ class TReadSessionActor : public TActorBootstrapped<TReadSessionActor> {
TString Session;
TString PeerName;
TString Database;
TString UserAgent;

bool ClientsideLocksAllowed;
bool BalanceRightNow;
Expand Down Expand Up @@ -929,6 +933,7 @@ class TReadSessionActor : public TActorBootstrapped<TReadSessionActor> {
NKikimr::NPQ::TPercentileCounter InitLatency;
NKikimr::NPQ::TPercentileCounter CommitLatency;
NKikimr::NPQ::TMultiCounter SLIBigLatency;
NYdb::NPersQueue::TCounterPtr BytesReadByUserAgent;

NKikimr::NPQ::TPercentileCounter ReadLatency;
NKikimr::NPQ::TPercentileCounter ReadLatencyFromDisk;
Expand Down
16 changes: 16 additions & 0 deletions ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#include <ydb/library/actors/core/log.h>
#include <ydb/library/actors/interconnect/interconnect.h>
#include <ydb/services/persqueue_v1/actors/helpers.h>

#include <library/cpp/protobuf/util/repeated_field_utils.h>

#include <util/string/strip.h>
Expand Down Expand Up @@ -655,6 +657,7 @@ void TReadSessionActor::Handle(TEvPQProxy::TEvReadInit::TPtr& ev, const TActorCo
Session = session;
ProtocolVersion = init.GetProtocolVersion();
CommitsDisabled = init.GetCommitsDisabled();
UserAgent = init.GetVersion();

if (ProtocolVersion >= NPersQueue::TReadRequest::ReadParamsInInit) {
ReadSettingsInited = true;
Expand Down Expand Up @@ -835,6 +838,14 @@ void TReadSessionActor::RegisterSessions(const TActorContext& ctx) {
}
}

void TReadSessionActor::SetupBytesReadByUserAgentCounter() {
BytesReadByUserAgent = GetServiceCounters(Counters, "pqproxy|userAgents")
->GetSubgroup("host", "")
->GetSubgroup("protocol", "pqv0")
->GetSubgroup("consumer", ClientPath)
->GetSubgroup("user_agent", V1::CleanupCounterValueString(UserAgent))
->GetExpiringNamedCounter("sensor", "BytesReadByUserAgent", true);
}

void TReadSessionActor::SetupCounters()
{
Expand Down Expand Up @@ -864,6 +875,8 @@ void TReadSessionActor::SetupCounters()
if (ProtocolVersion < NPersQueue::TReadRequest::Batching) {
++(*SessionsWithOldBatchingVersion);
}

SetupBytesReadByUserAgentCounter();
}


Expand Down Expand Up @@ -1527,6 +1540,9 @@ bool TReadSessionActor::ProcessAnswer(const TActorContext& ctx, TFormedReadRespo

Y_ABORT_UNLESS(formedResponse->RequestsInfly == 0);
i64 diff = formedResponse->Response.ByteSize();

BytesReadByUserAgent->Add(diff);

const bool hasMessages = HasMessages(formedResponse->Response.GetBatchedData());
if (hasMessages) {
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " assign read id " << ReadIdToResponse << " to read request " << formedResponse->Guid);
Expand Down
17 changes: 17 additions & 0 deletions ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <ydb/library/persqueue/topic_parser/topic_parser.h>
#include <ydb/library/persqueue/topic_parser/counters.h>
#include <ydb/services/lib/sharding/sharding.h>
#include <ydb/services/persqueue_v1/actors/helpers.h>

#include <ydb/library/actors/core/log.h>
#include <util/string/hex.h>
Expand Down Expand Up @@ -256,6 +257,16 @@ void TWriteSessionActor::InitAfterDiscovery(const TActorContext& ctx) {
}


void TWriteSessionActor::SetupBytesWrittenByUserAgentCounter() {
BytesWrittenByUserAgent = GetServiceCounters(Counters, "pqproxy|userAgents")
->GetSubgroup("host", "")
->GetSubgroup("protocol", "pqv0")
->GetSubgroup("topic", FullConverter->GetFederationPath())
->GetSubgroup("user_agent", V1::CleanupCounterValueString(UserAgent))
->GetExpiringNamedCounter("sensor", "BytesWrittenByUserAgent", true);
}


void TWriteSessionActor::SetupCounters()
{
if (SessionsCreated) {
Expand Down Expand Up @@ -286,6 +297,8 @@ void TWriteSessionActor::SetupCounters()

SessionsCreated.Inc();
SessionsActive.Inc();

SetupBytesWrittenByUserAgentCounter();
}


Expand All @@ -307,6 +320,8 @@ void TWriteSessionActor::SetupCounters(const TString& cloudId, const TString& db

SessionsCreated.Inc();
SessionsActive.Inc();

SetupBytesWrittenByUserAgentCounter();
}


Expand Down Expand Up @@ -849,6 +864,8 @@ void TWriteSessionActor::Handle(TEvPQProxy::TEvWrite::TPtr& ev, const TActorCont
BytesInflight.Inc(diff);
BytesInflightTotal.Inc(diff);

BytesWrittenByUserAgent->Add(diff);

if (BytesInflight_ < MAX_BYTES_INFLIGHT) { //allow only one big request to be readed but not sended
Y_ABORT_UNLESS(NextRequestInited);
Handler->ReadyForNextRead();
Expand Down
28 changes: 28 additions & 0 deletions ydb/services/persqueue_v1/actors/helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,32 @@ bool HasMessages(const Topic::StreamReadMessage::ReadResponse& data) {
return false;
}


TString CleanupCounterValueString(const TString& value) {
// Internal Monitoring system requires metrics values to no longer than 200 characters
// and prohibits some ASCII characters.

TString clean;
constexpr auto valueLenghtLimit = 200;

for (auto c : value) {
switch (c) {
case '|':
case '*':
case '?':
case '"':
case '\'':
case '`':
case '\\':
continue;
default:
clean.push_back(c);
if (clean.size() == valueLenghtLimit) {
break;
}
}
}
return clean;
}

}
2 changes: 2 additions & 0 deletions ydb/services/persqueue_v1/actors/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,6 @@ bool HasMessages(const PersQueue::V1::MigrationStreamingReadServerMessage::DataB

bool HasMessages(const Topic::StreamReadMessage::ReadResponse& data);

TString CleanupCounterValueString(const TString& value);

}
27 changes: 27 additions & 0 deletions ydb/services/persqueue_v1/actors/read_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ TReadSessionActor<UseMigrationProtocol>::TReadSessionActor(
, AutoPartitioningSupport(false)
{
Y_ASSERT(Request);

if (auto values = Request->GetStreamCtx()->GetPeerMetaValues(NYdb::YDB_APPLICATION_NAME); !values.empty()) {
UserAgent = values[0];
}
if (auto values = Request->GetStreamCtx()->GetPeerMetaValues(NYdb::YDB_SDK_BUILD_INFO_HEADER); !values.empty()) {
SdkBuildInfo = values[0];
}
}

template <bool UseMigrationProtocol>
Expand Down Expand Up @@ -884,6 +891,18 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename TEvReadInit::TPtr&
}
}

template<bool UseMigrationProtocol>
void TReadSessionActor<UseMigrationProtocol>::SetupBytesReadByUserAgentCounter() {
static constexpr auto protocol = UseMigrationProtocol ? "pqv1" : "topic";
BytesReadByUserAgent = GetServiceCounters(Counters, "pqproxy|userAgents")
->GetSubgroup("host", "")
->GetSubgroup("protocol", protocol)
->GetSubgroup("consumer", ClientPath)
->GetSubgroup("sdk_build_info", CleanupCounterValueString(SdkBuildInfo))
->GetSubgroup("user_agent", CleanupCounterValueString(UserAgent))
->GetExpiringNamedCounter("sensor", "BytesReadByUserAgent", true);
}

template <bool UseMigrationProtocol>
void TReadSessionActor<UseMigrationProtocol>::SetupCounters() {
if (SessionsCreated) {
Expand Down Expand Up @@ -913,6 +932,8 @@ void TReadSessionActor<UseMigrationProtocol>::SetupCounters() {
++(*SessionsCreated);
++(*SessionsActive);
PartsPerSession.IncFor(Partitions.size(), 1); // for 0

SetupBytesReadByUserAgentCounter();
}

template <bool UseMigrationProtocol>
Expand All @@ -937,6 +958,8 @@ void TReadSessionActor<UseMigrationProtocol>::SetupTopicCounters(const NPersQueu
topicCounters.CommitLatency = CommitLatency;
topicCounters.SLIBigLatency = SLIBigLatency;
topicCounters.SLITotal = SLITotal;

SetupBytesReadByUserAgentCounter();
}

template <bool UseMigrationProtocol>
Expand All @@ -960,6 +983,8 @@ void TReadSessionActor<UseMigrationProtocol>::SetupTopicCounters(const NPersQueu
topicCounters.CommitLatency = CommitLatency;
topicCounters.SLIBigLatency = SLIBigLatency;
topicCounters.SLITotal = SLITotal;

SetupBytesReadByUserAgentCounter();
}

template <bool UseMigrationProtocol>
Expand Down Expand Up @@ -1990,6 +2015,8 @@ void TReadSessionActor<UseMigrationProtocol>::ProcessAnswer(typename TFormedRead
formedResponse->Response.mutable_read_response()->set_bytes_size(sizeEstimation);
}

BytesReadByUserAgent->Add(sizeEstimation);

if (formedResponse->IsDirectRead) {
auto it = Partitions.find(formedResponse->AssignId);
if (it == Partitions.end()) {
Expand Down
Loading