Skip to content

Commit

Permalink
mon page has been added to row dispatcher (#11443)
Browse files Browse the repository at this point in the history
  • Loading branch information
dorooleg authored Nov 10, 2024
1 parent a27ab2e commit e2103f9
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 4 deletions.
31 changes: 27 additions & 4 deletions ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
#include <ydb/library/yql/providers/dq/counters/counters.h>
#include <ydb/library/yql/public/purecalc/common/interface.h>

#include <ydb/core/base/appdata_fwd.h>
#include <ydb/core/fq/libs/actors/logging/log.h>
#include <ydb/core/fq/libs/events/events.h>
#include <ydb/core/mon/mon.h>

#include <ydb/core/fq/libs/row_dispatcher/actors_factory.h>
#include <ydb/core/fq/libs/row_dispatcher/events/data_plane.h>
Expand Down Expand Up @@ -223,11 +225,12 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed::TPtr&);
void Handle(NFq::TEvPrivate::TEvUpdateMetrics::TPtr&);
void Handle(NFq::TEvPrivate::TEvPrintStateToLog::TPtr&);
void Handle(const NMon::TEvHttpInfo::TPtr&);

void DeleteConsumer(const ConsumerSessionKey& key);
void UpdateInterconnectSessions(const NActors::TActorId& interconnectSession);
void UpdateMetrics();
void PrintInternalState();
TString GetInternalState();

STRICT_STFUNC(
StateFunc, {
Expand All @@ -252,6 +255,7 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
hFunc(NFq::TEvRowDispatcher::TEvNewDataArrived, Handle);
hFunc(NFq::TEvPrivate::TEvUpdateMetrics, Handle);
hFunc(NFq::TEvPrivate::TEvPrintStateToLog, Handle);
hFunc(NMon::TEvHttpInfo, Handle);
})
};

Expand Down Expand Up @@ -287,6 +291,13 @@ void TRowDispatcher::Bootstrap() {
Schedule(TDuration::Seconds(CoordinatorPingPeriodSec), new TEvPrivate::TEvCoordinatorPing());
Schedule(TDuration::Seconds(UpdateMetricsPeriodSec), new NFq::TEvPrivate::TEvUpdateMetrics());
Schedule(TDuration::Seconds(PrintStateToLogPeriodSec), new NFq::TEvPrivate::TEvPrintStateToLog());

NActors::TMon* mon = NKikimr::AppData()->Mon;
if (mon) {
::NMonitoring::TIndexMonPage* actorsMonPage = mon->RegisterIndexPage("actors", "Actors");
mon->RegisterActorPage(actorsMonPage, "row_dispatcher", "Row Dispatcher", false,
TlsActivationContext->ExecutorThread.ActorSystem, SelfId());
}
}

void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPtr& ev) {
Expand Down Expand Up @@ -372,7 +383,7 @@ void TRowDispatcher::UpdateMetrics() {
}
}

void TRowDispatcher::PrintInternalState() {
TString TRowDispatcher::GetInternalState() {
TStringStream str;
str << "Statistics:\n";
for (auto& [key, sessionsInfo] : TopicSessions) {
Expand All @@ -390,7 +401,7 @@ void TRowDispatcher::PrintInternalState() {
}
}
}
LOG_ROW_DISPATCHER_DEBUG(str.Str());
return str.Str();
}

void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
Expand Down Expand Up @@ -632,10 +643,22 @@ void TRowDispatcher::Handle(NFq::TEvPrivate::TEvUpdateMetrics::TPtr&) {
}

void TRowDispatcher::Handle(NFq::TEvPrivate::TEvPrintStateToLog::TPtr&) {
PrintInternalState();
LOG_ROW_DISPATCHER_DEBUG(GetInternalState());
Schedule(TDuration::Seconds(PrintStateToLogPeriodSec), new NFq::TEvPrivate::TEvPrintStateToLog());
}

void TRowDispatcher::Handle(const NMon::TEvHttpInfo::TPtr& ev) {
TStringStream str;
HTML(str) {
PRE() {
str << "Current state:" << Endl;
str << GetInternalState() << Endl;
str << Endl;
}
}
Send(ev->Sender, new NMon::TEvHttpInfoRes(str.Str()));
}

void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvSessionStatistic::TPtr& ev) {
LOG_ROW_DISPATCHER_TRACE("TEvSessionStatistic from " << ev->Sender);
const auto& key = ev->Get()->Stat.SessionKey;
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/fq/libs/row_dispatcher/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ SRCS(
PEERDIR(
contrib/libs/fmt
contrib/libs/simdjson
ydb/core/base
ydb/core/fq/libs/actors/logging
ydb/core/fq/libs/config/protos
ydb/core/fq/libs/row_dispatcher/events
ydb/core/fq/libs/shared_resources
ydb/core/fq/libs/ydb
ydb/core/mon
ydb/library/actors/core
ydb/library/security
ydb/library/yql/dq/actors/common
Expand Down

0 comments on commit e2103f9

Please sign in to comment.