Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions ydb/core/kqp/gateway/actors/analyze_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,8 @@ void TAnalyzeActor::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr&
TStringBuilder() << "Can't get statistics aggregator ID.", {}
)
);
this->Die(ctx);
}

this->Die(ctx);
return;
}

Expand Down
12 changes: 7 additions & 5 deletions ydb/core/statistics/aggregator/tx_analyze_table_response.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,30 +24,32 @@ struct TStatisticsAggregator::TTxAnalyzeTableResponse : public TTxBase {
const TPathId pathId = PathIdFromPathId(Record.GetPathId());
auto operationTable = Self->ForceTraversalTable(operationId, pathId);
if (!operationTable) {
SA_LOG_E("[" << Self->TabletID() << "] TTxAnalyzeTableResponse::Complete. Unknown OperationTable. Record: " << Record.ShortDebugString());
SA_LOG_E("[" << Self->TabletID() << "] TTxAnalyzeTableResponse::Execute. Unknown OperationTable. Record: " << Record.ShortDebugString());
return true;
}

auto analyzedShard = std::find_if(operationTable->AnalyzedShards.begin(), operationTable->AnalyzedShards.end(),
[tabletId = Record.GetShardTabletId()] (TAnalyzedShard& analyzedShard) { return analyzedShard.ShardTabletId == tabletId;});
if (analyzedShard == operationTable->AnalyzedShards.end()) {
SA_LOG_E("[" << Self->TabletID() << "] TTxAnalyzeTableResponse::Complete. Unknown AnalyzedShards. Record: " << Record.ShortDebugString() << ", ShardTabletId " << Record.GetShardTabletId());
SA_LOG_E("[" << Self->TabletID() << "] TTxAnalyzeTableResponse::Execute. Unknown AnalyzedShards. Record: " << Record.ShortDebugString() << ", ShardTabletId " << Record.GetShardTabletId());
return true;
}
if (analyzedShard->Status != TAnalyzedShard::EStatus::AnalyzeStarted) {
SA_LOG_E("[" << Self->TabletID() << "] TTxAnalyzeTableResponse::Complete. Unknown AnalyzedShards Status. Record: " << Record.ShortDebugString() << ", ShardTabletId " << Record.GetShardTabletId());
SA_LOG_E("[" << Self->TabletID() << "] TTxAnalyzeTableResponse::Execute. Unknown AnalyzedShards Status. Record: " << Record.ShortDebugString() << ", ShardTabletId " << Record.GetShardTabletId());
}

analyzedShard->Status = TAnalyzedShard::EStatus::AnalyzeFinished;

bool completeResponse = std::any_of(operationTable->AnalyzedShards.begin(), operationTable->AnalyzedShards.end(),
[] (const TAnalyzedShard& analyzedShard) { return analyzedShard.Status == TAnalyzedShard::EStatus::AnalyzeFinished;});

if (!completeResponse)
if (!completeResponse) {
SA_LOG_D("[" << Self->TabletID() << "] TTxAnalyzeTableResponse::Execute. There are shards which are not analyzed");
return true;

}
NIceDb::TNiceDb db(txc.DB);
Self->UpdateForceTraversalTableStatus(TForceTraversalTable::EStatus::AnalyzeFinished, operationId, *operationTable, db);
SA_LOG_D("[" << Self->TabletID() << "] TTxAnalyzeTableResponse::Execute. All shards are analyzed");
return true;
}

Expand Down
6 changes: 2 additions & 4 deletions ydb/core/statistics/aggregator/tx_navigate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,8 @@ struct TStatisticsAggregator::TTxNavigate : public TTxBase {
}

if (Self->TraversalIsColumnTable) {
// TODO: serverless case
if (entry.DomainInfo->Params.HasHive()) {
Self->HiveId = entry.DomainInfo->Params.GetHive();
} else {
Self->HiveId = entry.DomainInfo->ExtractHive();
if (Self->HiveId == 0) {
Self->HiveId = AppData()->DomainsInfo->GetHive();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,14 @@ struct TStatisticsAggregator::TTxResponseTabletDistribution : public TTxBase {
}

bool Execute(TTransactionContext& txc, const TActorContext&) override {
SA_LOG_D("[" << Self->TabletID() << "] TTxResponseTabletDistribution::Execute");
SA_LOG_D("[" << Self->TabletID() << "] TTxResponseTabletDistribution::Execute. Node count = " << HiveRecord.NodesSize());

auto distribution = Self->TabletsForReqDistribution;
for (auto& inNode : HiveRecord.GetNodes()) {
if (inNode.GetNodeId() == 0) {
// these tablets are probably in Hive boot queue
if (Self->HiveRequestRound < Self->MaxHiveRequestRoundCount) {
SA_LOG_W("[" << Self->TabletID() << "] TTxResponseTabletDistribution::Execute. Some tablets are probably in Hive boot queue");
Action = EAction::ScheduleReqDistribution;
}
continue;
Expand All @@ -76,6 +77,7 @@ struct TStatisticsAggregator::TTxResponseTabletDistribution : public TTxBase {
}

if (!distribution.empty() && Self->ResolveRound < Self->MaxResolveRoundCount) {
SA_LOG_W("[" << Self->TabletID() << "] TTxResponseTabletDistribution::Execute. Some tablets do not exist in Hive anymore; tablet count = " << distribution.size());
// these tablets do not exist in Hive anymore
Self->NavigatePathId = Self->TraversalPathId;
Action = EAction::ScheduleResolve;
Expand Down
10 changes: 9 additions & 1 deletion ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) {

AnalyzeTable(runtime, tableInfo.ShardIds[0], tableInfo.PathId);
}

Y_UNIT_TEST(Analyze) {
TTestEnv env(1, 1);
auto& runtime = *env.GetServer().GetRuntime();
Expand All @@ -28,6 +28,14 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) {
Analyze(runtime, tableInfo.SaTabletId, {tableInfo.PathId});
}

Y_UNIT_TEST(AnalyzeServerless) {
TTestEnv env(1, 1);
auto& runtime = *env.GetServer().GetRuntime();
auto tableInfo = CreateServerlessDatabaseColumnTables(env, 1, 1)[0];

Analyze(runtime, tableInfo.SaTabletId, {tableInfo.PathId});
}

Y_UNIT_TEST(AnalyzeAnalyzeOneColumnTableSpecificColumns) {
TTestEnv env(1, 1);
auto& runtime = *env.GetServer().GetRuntime();
Expand Down
85 changes: 70 additions & 15 deletions ydb/core/statistics/ut_common/ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,36 @@ TPathId ResolvePathId(TTestActorRuntime& runtime, const TString& path, TPathId*
*domainKey = resultEntry.DomainInfo->DomainKey;
}

if (saTabletId && resultEntry.DomainInfo->Params.HasStatisticsAggregator()) {
*saTabletId = resultEntry.DomainInfo->Params.GetStatisticsAggregator();
if (saTabletId) {
if (resultEntry.DomainInfo->Params.HasStatisticsAggregator()) {
*saTabletId = resultEntry.DomainInfo->Params.GetStatisticsAggregator();
} else {
auto resourcesDomainKey = resultEntry.DomainInfo->ResourcesDomainKey;
auto request = std::make_unique<TNavigate>();
auto& entry = request->ResultSet.emplace_back();
entry.TableId = TTableId(resourcesDomainKey.OwnerId, resourcesDomainKey.LocalPathId);
entry.RequestType = TNavigate::TEntry::ERequestType::ByTableId;
entry.Operation = TNavigate::EOp::OpPath;
entry.RedirectRequired = false;
runtime.Send(MakeSchemeCacheID(), sender, new TEvRequest(request.release()));

auto ev = runtime.GrabEdgeEventRethrow<TEvResponse>(sender);
UNIT_ASSERT(ev);
UNIT_ASSERT(ev->Get());
std::unique_ptr<TNavigate> response(ev->Get()->Request.Release());
UNIT_ASSERT(response->ResultSet.size() == 1);
auto& secondResultEntry = response->ResultSet[0];

if (secondResultEntry.DomainInfo->Params.HasStatisticsAggregator()) {
*saTabletId = secondResultEntry.DomainInfo->Params.GetStatisticsAggregator();
}
}
}

return resultEntry.TableId.PathId;
}

NKikimrScheme::TEvDescribeSchemeResult DescribeTable(TTestActorRuntime& runtime, TActorId sender, const TString &path)
NKikimrScheme::TEvDescribeSchemeResult DescribeTable(TTestActorRuntime& runtime, TActorId sender, const TString& path)
{
TAutoPtr<IEventHandle> handle;

Expand All @@ -175,7 +197,7 @@ NKikimrScheme::TEvDescribeSchemeResult DescribeTable(TTestActorRuntime& runtime,
return *reply->MutableRecord();
}

TVector<ui64> GetTableShards(TTestActorRuntime& runtime, TActorId sender, const TString &path)
TVector<ui64> GetTableShards(TTestActorRuntime& runtime, TActorId sender, const TString& path)
{
TVector<ui64> shards;
auto lsResult = DescribeTable(runtime, sender, path);
Expand All @@ -185,7 +207,7 @@ TVector<ui64> GetTableShards(TTestActorRuntime& runtime, TActorId sender, const
return shards;
}

TVector<ui64> GetColumnTableShards(TTestActorRuntime& runtime, TActorId sender,const TString &path)
TVector<ui64> GetColumnTableShards(TTestActorRuntime& runtime, TActorId sender, const TString& path)
{
TVector<ui64> shards;
auto lsResult = DescribeTable(runtime, sender, path);
Expand Down Expand Up @@ -275,6 +297,21 @@ void CreateColumnStoreTable(TTestEnv& env, const TString& databaseName, const TS
env.GetController()->WaitActualization(TDuration::Seconds(1));
}

std::vector<TTableInfo> GatherColumnTablesInfo(TTestEnv& env, ui8 tableCount) {
auto& runtime = *env.GetServer().GetRuntime();
auto sender = runtime.AllocateEdgeActor();

std::vector<TTableInfo> ret;
for (ui8 tableId = 1; tableId <= tableCount; tableId++) {
TTableInfo tableInfo;
const TString path = Sprintf("/Root/Database/Table%u", tableId);
tableInfo.ShardIds = GetColumnTableShards(runtime, sender, path);
tableInfo.PathId = ResolvePathId(runtime, path, &tableInfo.DomainKey, &tableInfo.SaTabletId);
ret.emplace_back(tableInfo);
}
return ret;
}

std::vector<TTableInfo> CreateDatabaseColumnTables(TTestEnv& env, ui8 tableCount, ui8 shardCount) {
auto init = [&] () {
CreateDatabase(env, "Database");
Expand All @@ -285,20 +322,38 @@ std::vector<TTableInfo> CreateDatabaseColumnTables(TTestEnv& env, ui8 tableCount
std::thread initThread(init);

auto& runtime = *env.GetServer().GetRuntime();
auto sender = runtime.AllocateEdgeActor();

runtime.SimulateSleep(TDuration::Seconds(10));
initThread.join();

std::vector<TTableInfo> ret;
for (ui8 tableId = 1; tableId <= tableCount; tableId++) {
TTableInfo tableInfo;
const TString path = Sprintf("/Root/Database/Table%u", tableId);
tableInfo.ShardIds = GetColumnTableShards(runtime, sender, path);
tableInfo.PathId = ResolvePathId(runtime, path, &tableInfo.DomainKey, &tableInfo.SaTabletId);
ret.emplace_back(tableInfo);
}
return ret;
return GatherColumnTablesInfo(env, tableCount);
}

std::vector<TTableInfo> CreateServerlessDatabaseColumnTables(TTestEnv& env, ui8 tableCount, ui8 shardCount) {
auto init = [&] () {
CreateDatabase(env, "Shared");
};
std::thread initThread(init);

auto& runtime = *env.GetServer().GetRuntime();
runtime.SimulateSleep(TDuration::Seconds(5));
initThread.join();

TPathId domainKey;
ResolvePathId(runtime, "/Root/Shared", &domainKey);

auto init2 = [&] () {
CreateServerlessDatabase(env, "Database", domainKey);
for (ui8 tableId = 1; tableId <= tableCount; tableId++) {
CreateColumnStoreTable(env, "Database", Sprintf("Table%u", tableId), shardCount);
}
};
std::thread init2Thread(init2);

runtime.SimulateSleep(TDuration::Seconds(5));
init2Thread.join();

return GatherColumnTablesInfo(env, tableCount);
}

void DropTable(TTestEnv& env, const TString& databaseName, const TString& tableName) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/statistics/ut_common/ut_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ struct TTableInfo {
TPathId PathId;
};
std::vector<TTableInfo> CreateDatabaseColumnTables(TTestEnv& env, ui8 tableCount, ui8 shardCount);
std::vector<TTableInfo> CreateServerlessDatabaseColumnTables(TTestEnv& env, ui8 tableCount, ui8 shardCount);

TPathId ResolvePathId(TTestActorRuntime& runtime, const TString& path, TPathId* domainKey = nullptr, ui64* saTabletId = nullptr);

Expand Down
13 changes: 13 additions & 0 deletions ydb/core/tx/scheme_cache/scheme_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ struct TDomainInfo : public TAtomicRefCount<TDomainInfo> {
if (descr.HasServerlessComputeResourcesMode()) {
ServerlessComputeResourcesMode = descr.GetServerlessComputeResourcesMode();
}

if (descr.HasSharedHive()) {
SharedHiveId = descr.GetSharedHive();
}
}

inline ui64 GetVersion() const {
Expand All @@ -80,6 +84,14 @@ struct TDomainInfo : public TAtomicRefCount<TDomainInfo> {
}
}

inline ui64 ExtractHive() const {
if (IsServerless()) {
return SharedHiveId;
} else {
return Params.GetHive();
}
}

inline bool IsServerless() const {
return DomainKey != ResourcesDomainKey;
}
Expand All @@ -89,6 +101,7 @@ struct TDomainInfo : public TAtomicRefCount<TDomainInfo> {
NKikimrSubDomains::TProcessingParams Params;
TCoordinators Coordinators;
TMaybeServerlessComputeResourcesMode ServerlessComputeResourcesMode;
ui64 SharedHiveId = 0;

TString ToString() const;

Expand Down