Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion ydb/core/fq/libs/compute/common/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ void EnumeratePlans(NYson::TYsonWriter& writer, NJson::TJsonValue& value, ui32&
}
}

TString GetV1StatFromV2Plan(const TString& plan) {
TString GetV1StatFromV2Plan(const TString& plan, double* cpuUsage) {
TStringStream out;
NYson::TYsonWriter writer(&out);
writer.OnBeginMap();
Expand All @@ -358,6 +358,9 @@ TString GetV1StatFromV2Plan(const TString& plan) {
totals.MaxMemoryUsage.Write(writer, "MaxMemoryUsage");
totals.CpuTimeUs.Write(writer, "CpuTimeUs");
totals.SourceCpuTimeUs.Write(writer, "SourceCpuTimeUs");
if (cpuUsage) {
*cpuUsage = (totals.CpuTimeUs.Sum + totals.SourceCpuTimeUs.Sum) / 1000000.0;
}
totals.InputBytes.Write(writer, "InputBytes");
totals.InputRows.Write(writer, "InputRows");
totals.OutputBytes.Write(writer, "OutputBytes");
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/fq/libs/compute/common/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ inline std::shared_ptr<NYdb::NTable::TTableClient> CreateNewTableClient(const TS
tableSettings);
}

TString GetV1StatFromV2Plan(const TString& plan);
TString GetV1StatFromV2Plan(const TString& plan, double* cpuUsage = nullptr);
TString GetV1StatFromV2PlanV2(const TString& plan);

TString FormatDurationMs(ui64 durationMs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ struct TDatabaseClients {
TActorId ActorId;
NConfig::TComputeDatabaseConfig Config;
TActorId DatabasesCacheActorId;
TActorId MonitoringActorId;
};

std::optional<TClientConfig> GetClient(const TString& scope, const TString& endpoint, const TString& database) const {
Expand Down Expand Up @@ -306,6 +307,7 @@ class TComputeDatabaseControlPlaneServiceActor : public NActors::TActorBootstrap
switch (controlPlane.type_case()) {
case NConfig::TYdbComputeControlPlane::TYPE_NOT_SET:
case NConfig::TYdbComputeControlPlane::kSingle:
CreateSingleClientActors(controlPlane.GetSingle());
break;
case NConfig::TYdbComputeControlPlane::kCms:
CreateCmsClientActors(controlPlane.GetCms(), controlPlane.GetDatabasesCacheReloadPeriod());
Expand All @@ -317,6 +319,16 @@ class TComputeDatabaseControlPlaneServiceActor : public NActors::TActorBootstrap
Become(&TComputeDatabaseControlPlaneServiceActor::StateFunc);
}

static NCloud::TGrpcClientSettings CreateGrpcClientSettings(const NConfig::TYdbStorageConfig& connection) {
NCloud::TGrpcClientSettings settings;
settings.Endpoint = connection.GetEndpoint();
settings.EnableSsl = connection.GetUseSsl();
if (connection.GetCertificateFile()) {
settings.CertificateRootCA = StripString(TFileInput(connection.GetCertificateFile()).ReadAll());
}
return settings;
}

static NCloud::TGrpcClientSettings CreateGrpcClientSettings(const NConfig::TComputeDatabaseConfig& config) {
NCloud::TGrpcClientSettings settings;
const auto& connection = config.GetControlPlaneConnection();
Expand All @@ -328,20 +340,45 @@ class TComputeDatabaseControlPlaneServiceActor : public NActors::TActorBootstrap
return settings;
}

void CreateSingleClientActors(const NConfig::TYdbComputeControlPlane::TSingle& singleConfig) {
auto globalLoadConfig = Config.GetYdb().GetLoadControlConfig();
if (globalLoadConfig.GetEnable()) {
auto clientActor = Register(CreateMonitoringGrpcClientActor(CreateGrpcClientSettings(singleConfig.GetConnection()), CredentialsProviderFactory(GetYdbCredentialSettings(singleConfig.GetConnection()))->CreateProvider()).release());
MonitoringActorId = Register(CreateDatabaseMonitoringActor(clientActor, globalLoadConfig, Counters).release());
}
}

void CreateCmsClientActors(const NConfig::TYdbComputeControlPlane::TCms& cmsConfig, const TString& databasesCacheReloadPeriod) {
const auto& mapping = cmsConfig.GetDatabaseMapping();
auto globalLoadConfig = Config.GetYdb().GetLoadControlConfig();
for (const auto& config: mapping.GetCommon()) {
const auto clientActor = Register(CreateCmsGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider()).release());
const auto cacheActor = Register(CreateComputeDatabasesCacheActor(clientActor, databasesCacheReloadPeriod, Counters).release());
Clients->CommonDatabaseClients.push_back({clientActor, config, cacheActor});
TActorId databaseMonitoringActor;
const NConfig::TLoadControlConfig& loadConfig = config.GetLoadControlConfig().GetEnable()
? Config.GetYdb().GetLoadControlConfig()
: globalLoadConfig;
if (loadConfig.GetEnable()) {
auto clientActor = Register(CreateMonitoringGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider()).release());
databaseMonitoringActor = Register(CreateDatabaseMonitoringActor(clientActor, loadConfig, Counters).release());
}
Clients->CommonDatabaseClients.push_back({clientActor, config, cacheActor, databaseMonitoringActor});
}

Y_ABORT_UNLESS(Clients->CommonDatabaseClients);

for (const auto& [scope, config]: mapping.GetScopeToComputeDatabase()) {
const auto clientActor = Register(CreateCmsGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider()).release());
const auto cacheActor = Register(CreateComputeDatabasesCacheActor(clientActor, databasesCacheReloadPeriod, Counters).release());
Clients->ScopeToDatabaseClient[scope] = {clientActor, config, cacheActor};
TActorId databaseMonitoringActor;
const NConfig::TLoadControlConfig& loadConfig = config.GetLoadControlConfig().GetEnable()
? Config.GetYdb().GetLoadControlConfig()
: globalLoadConfig;
if (loadConfig.GetEnable()) {
auto clientActor = Register(CreateMonitoringGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider()).release());
databaseMonitoringActor = Register(CreateDatabaseMonitoringActor(clientActor, loadConfig, Counters).release());
}
Clients->ScopeToDatabaseClient[scope] = {clientActor, config, cacheActor, databaseMonitoringActor};
}
}

Expand All @@ -350,20 +387,23 @@ class TComputeDatabaseControlPlaneServiceActor : public NActors::TActorBootstrap
for (const auto& config: mapping.GetCommon()) {
const auto clientActor = Register(CreateYdbcpGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider()).release());
const auto cacheActor = Register(CreateComputeDatabasesCacheActor(clientActor, databasesCacheReloadPeriod, Counters).release());
Clients->CommonDatabaseClients.push_back({clientActor, config, cacheActor});
Clients->CommonDatabaseClients.push_back({clientActor, config, cacheActor, {}});
}

Y_ABORT_UNLESS(Clients->CommonDatabaseClients);

for (const auto& [scope, config]: mapping.GetScopeToComputeDatabase()) {
const auto clientActor = Register(CreateYdbcpGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider()).release());
const auto cacheActor = Register(CreateComputeDatabasesCacheActor(clientActor, databasesCacheReloadPeriod, Counters).release());
Clients->ScopeToDatabaseClient[scope] = {clientActor, config, cacheActor};
Clients->ScopeToDatabaseClient[scope] = {clientActor, config, cacheActor, {}};
}
}

STRICT_STFUNC(StateFunc,
hFunc(TEvYdbCompute::TEvCreateDatabaseRequest, Handle);
hFunc(TEvYdbCompute::TEvCpuLoadRequest, Handle);
hFunc(TEvYdbCompute::TEvCpuQuotaRequest, Handle);
hFunc(TEvYdbCompute::TEvCpuQuotaAdjust, Handle);
)

void Handle(TEvYdbCompute::TEvCreateDatabaseRequest::TPtr& ev) {
Expand All @@ -374,7 +414,38 @@ class TComputeDatabaseControlPlaneServiceActor : public NActors::TActorBootstrap
Register(new TCreateDatabaseRequestActor(Clients, SynchronizationServiceActorId, Config, ev));
}

void Handle(TEvYdbCompute::TEvCpuLoadRequest::TPtr& ev) {
auto actorId = GetMonitoringActorIdByScope(ev.Get()->Get()->Scope);
if (actorId != TActorId{}) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

        if (actorId != TActorId{}) {
            Send(ev->Forward(actorId));
            return;
        }
        Send(ev->Sender, new TEvYdbCompute::TEvCpuLoadResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Cluster load monitoring disabled"}}), 0, ev->Cookie);

to get rid of else

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why it is better?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does not matter in this case imho
but yes, no "else" - simplier code

Send(ev->Forward(actorId));
} else {
Send(ev->Sender, new TEvYdbCompute::TEvCpuLoadResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Cluster load monitoring disabled"}}), 0, ev->Cookie);
}
}

void Handle(TEvYdbCompute::TEvCpuQuotaRequest::TPtr& ev) {
auto actorId = GetMonitoringActorIdByScope(ev.Get()->Get()->Scope);
if (actorId != TActorId{}) {
Send(ev->Forward(actorId));
} else {
Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(), 0, ev->Cookie);
}
}

void Handle(TEvYdbCompute::TEvCpuQuotaAdjust::TPtr& ev) {
auto actorId = GetMonitoringActorIdByScope(ev.Get()->Get()->Scope);
if (actorId != TActorId{}) {
Send(ev->Forward(actorId));
}
}

private:
TActorId GetMonitoringActorIdByScope(const TString& scope) {
return Config.GetYdb().GetControlPlane().HasSingle()
? MonitoringActorId
: Clients->GetClient(scope).MonitoringActorId;
}

TActorId SynchronizationServiceActorId;
NFq::NConfig::TComputeConfig Config;
std::shared_ptr<TDatabaseClients> Clients;
Expand All @@ -383,6 +454,8 @@ class TComputeDatabaseControlPlaneServiceActor : public NActors::TActorBootstrap
TYqSharedResources::TPtr YqSharedResources;
NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory;
::NMonitoring::TDynamicCounterPtr Counters;
TActorId MonitoringClientActorId;
TActorId MonitoringActorId;
};

std::unique_ptr<NActors::IActor> CreateComputeDatabaseControlPlaneServiceActor(const NFq::NConfig::TComputeConfig& config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,8 @@ std::unique_ptr<NActors::IActor> CreateCmsGrpcClientActor(const NCloud::TGrpcCli

std::unique_ptr<NActors::IActor> CreateComputeDatabasesCacheActor(const NActors::TActorId& databaseClientActorId, const TString& databasesCacheReloadPeriod, const ::NMonitoring::TDynamicCounterPtr& counters);

std::unique_ptr<NActors::IActor> CreateMonitoringGrpcClientActor(const NCloud::TGrpcClientSettings& settings, const NYdb::TCredentialsProviderPtr& credentialsProvider);

std::unique_ptr<NActors::IActor> CreateDatabaseMonitoringActor(const NActors::TActorId& monitoringClientActorId, NFq::NConfig::TLoadControlConfig config, const ::NMonitoring::TDynamicCounterPtr& counters);

}
Loading