Skip to content

Commit 30de794

Browse files
authored
Merge 01896af into b971ba4
2 parents b971ba4 + 01896af commit 30de794

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+999
-97
lines changed

ydb/core/kqp/gateway/behaviour/resource_pool/manager.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,18 +118,18 @@ void FillResourcePoolDescription(NKikimrSchemeOp::TResourcePoolDescription& reso
118118

119119
TPoolSettings resourcePoolSettings;
120120
auto& properties = *resourcePoolDescription.MutableProperties()->MutableProperties();
121-
for (const auto& [property, setting] : GetPropertiesMap(resourcePoolSettings, true)) {
121+
for (const auto& [property, setting] : resourcePoolSettings.GetPropertiesMap(true)) {
122122
if (std::optional<TString> value = featuresExtractor.Extract(property)) {
123123
try {
124-
std::visit(TSettingsParser{*value}, setting);
124+
std::visit(TPoolSettings::TParser{*value}, setting);
125125
} catch (...) {
126126
throw yexception() << "Failed to parse property " << property << ": " << CurrentExceptionMessage();
127127
}
128128
} else if (!featuresExtractor.ExtractResetFeature(property)) {
129129
continue;
130130
}
131131

132-
TString value = std::visit(TSettingsExtractor(), setting);
132+
const TString value = std::visit(TPoolSettings::TExtractor(), setting);
133133
properties.insert({property, value});
134134
}
135135

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
#include "behaviour.h"
2+
#include "initializer.h"
3+
#include "manager.h"
4+
5+
6+
namespace NKikimr::NKqp {
7+
8+
TResourcePoolClassifierBehaviour::TFactory::TRegistrator<TResourcePoolClassifierBehaviour> TResourcePoolClassifierBehaviour::Registrator(TResourcePoolClassifierConfig::GetTypeId());
9+
10+
NMetadata::NInitializer::IInitializationBehaviour::TPtr TResourcePoolClassifierBehaviour::ConstructInitializer() const {
11+
return std::make_shared<TResourcePoolClassifierInitializer>();
12+
}
13+
14+
NMetadata::NModifications::IOperationsManager::TPtr TResourcePoolClassifierBehaviour::ConstructOperationsManager() const {
15+
return std::make_shared<TResourcePoolClassifierManager>();
16+
}
17+
18+
TString TResourcePoolClassifierBehaviour::GetInternalStorageTablePath() const {
19+
return "resource_pools/resource_pools_classifiers";
20+
}
21+
22+
TString TResourcePoolClassifierBehaviour::GetTypeId() const {
23+
return TResourcePoolClassifierConfig::GetTypeId();
24+
}
25+
26+
NMetadata::IClassBehaviour::TPtr TResourcePoolClassifierBehaviour::GetInstance() {
27+
static std::shared_ptr<TResourcePoolClassifierBehaviour> result = std::make_shared<TResourcePoolClassifierBehaviour>();
28+
return result;
29+
}
30+
31+
} // namespace NKikimr::NKqp
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
#pragma once
2+
3+
#include "object.h"
4+
5+
#include <ydb/services/metadata/abstract/initialization.h>
6+
#include <ydb/services/metadata/abstract/kqp_common.h>
7+
8+
9+
namespace NKikimr::NKqp {
10+
11+
class TResourcePoolClassifierBehaviour : public NMetadata::TClassBehaviour<TResourcePoolClassifierConfig> {
12+
static TFactory::TRegistrator<TResourcePoolClassifierBehaviour> Registrator;
13+
14+
protected:
15+
virtual NMetadata::NInitializer::IInitializationBehaviour::TPtr ConstructInitializer() const override;
16+
virtual NMetadata::NModifications::IOperationsManager::TPtr ConstructOperationsManager() const override;
17+
virtual TString GetInternalStorageTablePath() const override;
18+
19+
public:
20+
virtual TString GetTypeId() const override;
21+
22+
static IClassBehaviour::TPtr GetInstance();
23+
};
24+
25+
} // namespace NKikimr::NKqp
Lines changed: 279 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,279 @@
1+
#include "checker.h"
2+
3+
#include <ydb/core/cms/console/configs_dispatcher.h>
4+
#include <ydb/core/protos/console_config.pb.h>
5+
#include <ydb/core/resource_pools/resource_pool_classifier_settings.h>
6+
7+
#include <ydb/library/query_actor/query_actor.h>
8+
9+
10+
namespace NKikimr::NKqp {
11+
12+
namespace {
13+
14+
using namespace NActors;
15+
using namespace NResourcePool;
16+
17+
18+
struct TEvPrivate {
19+
enum EEv : ui32 {
20+
EvRanksCheckerResponse = EventSpaceBegin(TEvents::ES_PRIVATE),
21+
22+
EvEnd
23+
};
24+
25+
static_assert(EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)");
26+
27+
struct TEvRanksCheckerResponse : public TEventLocal<TEvRanksCheckerResponse, EvRanksCheckerResponse> {
28+
TEvRanksCheckerResponse(Ydb::StatusIds::StatusCode status, i64 maxRank, ui64 numberClassifiers, NYql::TIssues issues)
29+
: Status(status)
30+
, MaxRank(maxRank)
31+
, NumberClassifiers(numberClassifiers)
32+
, Issues(std::move(issues))
33+
{}
34+
35+
const Ydb::StatusIds::StatusCode Status;
36+
const i64 MaxRank;
37+
const ui64 NumberClassifiers;
38+
const NYql::TIssues Issues;
39+
};
40+
};
41+
42+
class TRanksCheckerActor : public NKikimr::TQueryBase {
43+
using TBase = NKikimr::TQueryBase;
44+
45+
public:
46+
TRanksCheckerActor(const TString& database, const TString& sessionId, const TString& transactionId, const std::unordered_map<i64, TString>& ranksToCheck)
47+
: TBase(NKikimrServices::KQP_GATEWAY, sessionId)
48+
, Database(database)
49+
, RanksToCheck(ranksToCheck)
50+
{
51+
TxId = transactionId;
52+
SetOperationInfo(__func__, Database);
53+
}
54+
55+
void OnRunQuery() override {
56+
const auto& tablePath = TResourcePoolClassifierConfig::GetBehaviour()->GetStorageTablePath();
57+
58+
TString sql = TStringBuilder() << R"(
59+
-- TRanksCheckerActor::OnRunQuery
60+
PRAGMA AnsiInForEmptyOrNullableItemsCollections;
61+
62+
DECLARE $database AS Text;
63+
DECLARE $ranks AS List<Int64>;
64+
65+
SELECT
66+
rank, name
67+
FROM `)" << tablePath << R"(`
68+
WHERE database = $database
69+
AND rank IN $ranks;
70+
71+
SELECT
72+
MAX(rank) AS MaxRank,
73+
COUNT(*) AS NumberClassifiers
74+
FROM `)" << tablePath << R"(`
75+
WHERE database = $database;
76+
)";
77+
78+
NYdb::TParamsBuilder params;
79+
params
80+
.AddParam("$database")
81+
.Utf8(Database)
82+
.Build();
83+
84+
auto& param = params.AddParam("$ranks").BeginList();
85+
for (const auto& [rank, _] : RanksToCheck) {
86+
param.AddListItem().Int64(rank);
87+
}
88+
param.EndList().Build();
89+
90+
RunDataQuery(sql, &params, TTxControl::ContinueTx());
91+
}
92+
93+
void OnQueryResult() override {
94+
if (ResultSets.size() != 2) {
95+
Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response");
96+
return;
97+
}
98+
99+
{ // Ranks description
100+
NYdb::TResultSetParser result(ResultSets[0]);
101+
while (result.TryNextRow()) {
102+
TMaybe<i64> rank = result.ColumnParser("rank").GetOptionalInt64();
103+
if (!rank) {
104+
continue;
105+
}
106+
107+
TMaybe<TString> name = result.ColumnParser("name").GetOptionalUtf8();
108+
if (!name) {
109+
continue;
110+
}
111+
112+
if (auto it = RanksToCheck.find(*rank); it != RanksToCheck.end() && it->second != *name) {
113+
Finish(Ydb::StatusIds::ALREADY_EXISTS, TStringBuilder() << "Classifier with rank " << *rank << " already exists (classifier name " << it->second << ")");
114+
return;
115+
}
116+
}
117+
}
118+
119+
{ // Classifiers stats
120+
NYdb::TResultSetParser result(ResultSets[1]);
121+
if (!result.TryNextRow()) {
122+
Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response");
123+
return;
124+
}
125+
126+
MaxRank = result.ColumnParser("MaxRank").GetOptionalInt64().GetOrElse(0);
127+
NumberClassifiers = result.ColumnParser("NumberClassifiers").GetUint64();
128+
}
129+
130+
Finish();
131+
}
132+
133+
void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override {
134+
Send(Owner, new TEvPrivate::TEvRanksCheckerResponse(status, MaxRank, NumberClassifiers, std::move(issues)));
135+
}
136+
137+
private:
138+
const TString Database;
139+
const std::unordered_map<i64, TString> RanksToCheck;
140+
141+
i64 MaxRank = 0;
142+
ui64 NumberClassifiers = 0;
143+
};
144+
145+
class TResourcePoolClassifierPreparationActor : public TActorBootstrapped<TResourcePoolClassifierPreparationActor> {
146+
public:
147+
TResourcePoolClassifierPreparationActor(std::vector<TResourcePoolClassifierConfig>&& patchedObjects, NMetadata::NModifications::IAlterPreparationController<TResourcePoolClassifierConfig>::TPtr controller, const NMetadata::NModifications::IOperationsManager::TInternalModificationContext& context, const NMetadata::NModifications::TAlterOperationContext& alterContext)
148+
: Context(context)
149+
, AlterContext(alterContext)
150+
, Controller(std::move(controller))
151+
, PatchedObjects(std::move(patchedObjects))
152+
{}
153+
154+
void Bootstrap() {
155+
Send(NConsole::MakeConfigsDispatcherID(SelfId().NodeId()), new NConsole::TEvConfigsDispatcher::TEvGetConfigRequest(
156+
(ui32)NKikimrConsole::TConfigItem::FeatureFlagsItem
157+
), IEventHandle::FlagTrackDelivery);
158+
}
159+
160+
void Handle(TEvents::TEvUndelivered::TPtr& ev) {
161+
switch (ev->Get()->SourceType) {
162+
case NConsole::TEvConfigsDispatcher::EvGetConfigRequest:
163+
CheckFeatureFlag(AppData()->FeatureFlags);
164+
break;
165+
166+
default:
167+
break;
168+
}
169+
}
170+
171+
void Handle(NConsole::TEvConfigsDispatcher::TEvGetConfigResponse::TPtr& ev) {
172+
CheckFeatureFlag(ev->Get()->Config->GetFeatureFlags());
173+
}
174+
175+
void Handle(TEvPrivate::TEvRanksCheckerResponse::TPtr& ev) {
176+
if (ev->Get()->Status != Ydb::StatusIds::SUCCESS) {
177+
NYql::TIssue rootIssue(TStringBuilder() << "Resource pool classifier ranks check failed, " << ev->Get()->Status);
178+
for (const auto& issue : ev->Get()->Issues) {
179+
rootIssue.AddSubIssue(MakeIntrusive<NYql::TIssue>(issue));
180+
}
181+
FailAndPassAway(rootIssue.ToString(true));
182+
return;
183+
}
184+
185+
if (ev->Get()->NumberClassifiers >= CLASSIFIER_COUNT_LIMIT) {
186+
FailAndPassAway(TStringBuilder() << "Number of resource pool classifiers reached limit in " << CLASSIFIER_COUNT_LIMIT);
187+
return;
188+
}
189+
190+
i64 maxRank = ev->Get()->MaxRank;
191+
for (auto& object : PatchedObjects) {
192+
if (object.GetRank() != -1) {
193+
continue;
194+
}
195+
if (maxRank > std::numeric_limits<i64>::max() - CLASSIFIER_RANK_OFFSET) {
196+
FailAndPassAway(TStringBuilder() << "The rank could not be set automatically, the maximum rank of the resource pool classifier is too high " << ev->Get()->MaxRank);
197+
return;
198+
}
199+
200+
maxRank += CLASSIFIER_RANK_OFFSET;
201+
object.SetRank(maxRank);
202+
}
203+
204+
RanksChecked = true;
205+
TryFinish();
206+
}
207+
208+
STRICT_STFUNC(StateFunc,
209+
hFunc(TEvents::TEvUndelivered, Handle);
210+
hFunc(NConsole::TEvConfigsDispatcher::TEvGetConfigResponse, Handle);
211+
hFunc(TEvPrivate::TEvRanksCheckerResponse, Handle);
212+
)
213+
214+
private:
215+
void ValidateRanks() {
216+
if (Context.GetActivityType() == NMetadata::NModifications::IOperationsManager::EActivityType::Drop) {
217+
RanksChecked = true;
218+
TryFinish();
219+
return;
220+
}
221+
222+
std::unordered_map<i64, TString> ranksToNames;
223+
for (const auto& object : PatchedObjects) {
224+
const auto rank = object.GetRank();
225+
if (rank == -1) {
226+
continue;
227+
}
228+
if (!ranksToNames.insert({rank, object.GetName()}).second) {
229+
FailAndPassAway(TStringBuilder() << "Found duplicate rank " << rank);
230+
}
231+
}
232+
233+
Register(new TQueryRetryActor<TRanksCheckerActor, TEvPrivate::TEvRanksCheckerResponse, TString, TString, TString, std::unordered_map<i64, TString>>(
234+
SelfId(), Context.GetExternalData().GetDatabase(), AlterContext.SessionId, AlterContext.TransactionId, ranksToNames
235+
));
236+
}
237+
238+
void CheckFeatureFlag(const NKikimrConfig::TFeatureFlags& featureFlags) {
239+
if (!featureFlags.GetEnableResourcePools()) {
240+
FailAndPassAway("Resource pools are disabled. Please contact your system administrator to enable it");
241+
return;
242+
}
243+
244+
FeatureFlagChecked = true;
245+
TryFinish();
246+
}
247+
248+
void FailAndPassAway(const TString& message) {
249+
Controller->OnPreparationProblem(message);
250+
PassAway();
251+
}
252+
253+
void TryFinish() {
254+
if (!FeatureFlagChecked || !RanksChecked) {
255+
return;
256+
}
257+
258+
Controller->OnPreparationFinished(std::move(PatchedObjects));
259+
PassAway();
260+
}
261+
262+
private:
263+
const NMetadata::NModifications::IOperationsManager::TInternalModificationContext& Context;
264+
const NMetadata::NModifications::TAlterOperationContext& AlterContext;
265+
266+
bool FeatureFlagChecked = false;
267+
bool RanksChecked = false;
268+
269+
NMetadata::NModifications::IAlterPreparationController<TResourcePoolClassifierConfig>::TPtr Controller;
270+
std::vector<TResourcePoolClassifierConfig> PatchedObjects;
271+
};
272+
273+
} // anonymous namespace
274+
275+
IActor* CreateResourcePoolClassifierPreparationActor(std::vector<TResourcePoolClassifierConfig>&& patchedObjects, NMetadata::NModifications::IAlterPreparationController<TResourcePoolClassifierConfig>::TPtr controller, const NMetadata::NModifications::IOperationsManager::TInternalModificationContext& context, const NMetadata::NModifications::TAlterOperationContext& alterContext) {
276+
return new TResourcePoolClassifierPreparationActor(std::move(patchedObjects), std::move(controller), context, alterContext);
277+
}
278+
279+
} // namespace NKikimr::NKqp
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
#pragma once
2+
3+
#include "object.h"
4+
5+
#include <ydb/services/metadata/manager/generic_manager.h>
6+
7+
8+
namespace NKikimr::NKqp {
9+
10+
NActors::IActor* CreateResourcePoolClassifierPreparationActor(std::vector<TResourcePoolClassifierConfig>&& patchedObjects, NMetadata::NModifications::IAlterPreparationController<TResourcePoolClassifierConfig>::TPtr controller, const NMetadata::NModifications::IOperationsManager::TInternalModificationContext& context, const NMetadata::NModifications::TAlterOperationContext& alterContext);
11+
12+
} // namespace NKikimr::NKqp
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
#include "fetcher.h"
2+
3+
4+
namespace NKikimr::NKqp {
5+
6+
std::vector<NMetadata::IClassBehaviour::TPtr> TResourcePoolClassifierSnapshotsFetcher::DoGetManagers() const {
7+
return {TResourcePoolClassifierConfig::GetBehaviour()};
8+
}
9+
10+
} // namespace NKikimr::NKqp
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
#pragma once
2+
3+
#include "snapshot.h"
4+
5+
6+
namespace NKikimr::NKqp {
7+
8+
class TResourcePoolClassifierSnapshotsFetcher : public NMetadata::NFetcher::TSnapshotsFetcher<TResourcePoolClassifierSnapshot> {
9+
protected:
10+
virtual std::vector<NMetadata::IClassBehaviour::TPtr> DoGetManagers() const override;
11+
};
12+
13+
} // namespace NKikimr::NKqp

0 commit comments

Comments
 (0)