From aa1075547fe4052363e7ff8446bd476b5ca99b1d Mon Sep 17 00:00:00 2001 From: Alexander Dmitriev Date: Wed, 4 Sep 2024 16:34:36 +0000 Subject: [PATCH 1/3] fix analyze for serverless case --- ydb/core/kqp/gateway/actors/analyze_actor.cpp | 3 +- .../aggregator/tx_analyze_table_response.cpp | 12 ++-- .../statistics/aggregator/tx_navigate.cpp | 7 +- .../tx_response_tablet_distribution.cpp | 4 +- .../aggregator/ut/ut_analyze_columnshard.cpp | 10 ++- ydb/core/statistics/ut_common/ut_common.cpp | 68 +++++++++++++++++-- ydb/core/statistics/ut_common/ut_common.h | 1 + ydb/core/tx/scheme_cache/scheme_cache.h | 13 ++++ 8 files changed, 98 insertions(+), 20 deletions(-) diff --git a/ydb/core/kqp/gateway/actors/analyze_actor.cpp b/ydb/core/kqp/gateway/actors/analyze_actor.cpp index 326d39a44002..84f18e3047d5 100644 --- a/ydb/core/kqp/gateway/actors/analyze_actor.cpp +++ b/ydb/core/kqp/gateway/actors/analyze_actor.cpp @@ -105,9 +105,8 @@ void TAnalyzeActor::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& TStringBuilder() << "Can't get statistics aggregator ID.", {} ) ); + this->Die(ctx); } - - this->Die(ctx); return; } diff --git a/ydb/core/statistics/aggregator/tx_analyze_table_response.cpp b/ydb/core/statistics/aggregator/tx_analyze_table_response.cpp index 8834c6649698..2bbade5ed146 100644 --- a/ydb/core/statistics/aggregator/tx_analyze_table_response.cpp +++ b/ydb/core/statistics/aggregator/tx_analyze_table_response.cpp @@ -24,18 +24,18 @@ struct TStatisticsAggregator::TTxAnalyzeTableResponse : public TTxBase { const TPathId pathId = PathIdFromPathId(Record.GetPathId()); auto operationTable = Self->ForceTraversalTable(operationId, pathId); if (!operationTable) { - SA_LOG_E("[" << Self->TabletID() << "] TTxAnalyzeTableResponse::Complete. Unknown OperationTable. Record: " << Record.ShortDebugString()); + SA_LOG_E("[" << Self->TabletID() << "] TTxAnalyzeTableResponse::Execute. Unknown OperationTable. Record: " << Record.ShortDebugString()); return true; } auto analyzedShard = std::find_if(operationTable->AnalyzedShards.begin(), operationTable->AnalyzedShards.end(), [tabletId = Record.GetShardTabletId()] (TAnalyzedShard& analyzedShard) { return analyzedShard.ShardTabletId == tabletId;}); if (analyzedShard == operationTable->AnalyzedShards.end()) { - SA_LOG_E("[" << Self->TabletID() << "] TTxAnalyzeTableResponse::Complete. Unknown AnalyzedShards. Record: " << Record.ShortDebugString() << ", ShardTabletId " << Record.GetShardTabletId()); + SA_LOG_E("[" << Self->TabletID() << "] TTxAnalyzeTableResponse::Execute. Unknown AnalyzedShards. Record: " << Record.ShortDebugString() << ", ShardTabletId " << Record.GetShardTabletId()); return true; } if (analyzedShard->Status != TAnalyzedShard::EStatus::AnalyzeStarted) { - SA_LOG_E("[" << Self->TabletID() << "] TTxAnalyzeTableResponse::Complete. Unknown AnalyzedShards Status. Record: " << Record.ShortDebugString() << ", ShardTabletId " << Record.GetShardTabletId()); + SA_LOG_E("[" << Self->TabletID() << "] TTxAnalyzeTableResponse::Execute. Unknown AnalyzedShards Status. Record: " << Record.ShortDebugString() << ", ShardTabletId " << Record.GetShardTabletId()); } analyzedShard->Status = TAnalyzedShard::EStatus::AnalyzeFinished; @@ -43,11 +43,13 @@ struct TStatisticsAggregator::TTxAnalyzeTableResponse : public TTxBase { bool completeResponse = std::any_of(operationTable->AnalyzedShards.begin(), operationTable->AnalyzedShards.end(), [] (const TAnalyzedShard& analyzedShard) { return analyzedShard.Status == TAnalyzedShard::EStatus::AnalyzeFinished;}); - if (!completeResponse) + if (!completeResponse) { + SA_LOG_D("[" << Self->TabletID() << "] TTxAnalyzeTableResponse::Execute. There are shards which are not analyzed"); return true; - + } NIceDb::TNiceDb db(txc.DB); Self->UpdateForceTraversalTableStatus(TForceTraversalTable::EStatus::AnalyzeFinished, operationId, *operationTable, db); + SA_LOG_D("[" << Self->TabletID() << "] TTxAnalyzeTableResponse::Execute. All shards are analyzed"); return true; } diff --git a/ydb/core/statistics/aggregator/tx_navigate.cpp b/ydb/core/statistics/aggregator/tx_navigate.cpp index 35534d042b06..6d8d178c0881 100644 --- a/ydb/core/statistics/aggregator/tx_navigate.cpp +++ b/ydb/core/statistics/aggregator/tx_navigate.cpp @@ -59,12 +59,7 @@ struct TStatisticsAggregator::TTxNavigate : public TTxBase { } if (Self->TraversalIsColumnTable) { - // TODO: serverless case - if (entry.DomainInfo->Params.HasHive()) { - Self->HiveId = entry.DomainInfo->Params.GetHive(); - } else { - Self->HiveId = AppData()->DomainsInfo->GetHive(); - } + Self->HiveId = entry.DomainInfo->ExtractHive(); } return true; diff --git a/ydb/core/statistics/aggregator/tx_response_tablet_distribution.cpp b/ydb/core/statistics/aggregator/tx_response_tablet_distribution.cpp index 5faf80918c8f..323a8dec6afe 100644 --- a/ydb/core/statistics/aggregator/tx_response_tablet_distribution.cpp +++ b/ydb/core/statistics/aggregator/tx_response_tablet_distribution.cpp @@ -55,13 +55,14 @@ struct TStatisticsAggregator::TTxResponseTabletDistribution : public TTxBase { } bool Execute(TTransactionContext& txc, const TActorContext&) override { - SA_LOG_D("[" << Self->TabletID() << "] TTxResponseTabletDistribution::Execute"); + SA_LOG_D("[" << Self->TabletID() << "] TTxResponseTabletDistribution::Execute. Node count = " << HiveRecord.NodesSize()); auto distribution = Self->TabletsForReqDistribution; for (auto& inNode : HiveRecord.GetNodes()) { if (inNode.GetNodeId() == 0) { // these tablets are probably in Hive boot queue if (Self->HiveRequestRound < Self->MaxHiveRequestRoundCount) { + SA_LOG_W("[" << Self->TabletID() << "] TTxResponseTabletDistribution::Execute. Some tablets are probably in Hive boot queue"); Action = EAction::ScheduleReqDistribution; } continue; @@ -76,6 +77,7 @@ struct TStatisticsAggregator::TTxResponseTabletDistribution : public TTxBase { } if (!distribution.empty() && Self->ResolveRound < Self->MaxResolveRoundCount) { + SA_LOG_W("[" << Self->TabletID() << "] TTxResponseTabletDistribution::Execute. Some tablets do not exist in Hive anymore; tablet count = " << distribution.size()); // these tablets do not exist in Hive anymore Self->NavigatePathId = Self->TraversalPathId; Action = EAction::ScheduleResolve; diff --git a/ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp b/ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp index 4367cdc300be..d1c9f32ee43d 100644 --- a/ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp +++ b/ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp @@ -19,7 +19,7 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) { AnalyzeTable(runtime, tableInfo.ShardIds[0], tableInfo.PathId); } - + Y_UNIT_TEST(Analyze) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime(); @@ -28,6 +28,14 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) { Analyze(runtime, tableInfo.SaTabletId, {tableInfo.PathId}); } + Y_UNIT_TEST(AnalyzeServerless) { + TTestEnv env(1, 1); + auto& runtime = *env.GetServer().GetRuntime(); + auto tableInfo = CreateServerlessDatabaseColumnTables(env, 1, 1)[0]; + + Analyze(runtime, tableInfo.SaTabletId, {tableInfo.PathId}); + } + Y_UNIT_TEST(AnalyzeAnalyzeOneColumnTableSpecificColumns) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime(); diff --git a/ydb/core/statistics/ut_common/ut_common.cpp b/ydb/core/statistics/ut_common/ut_common.cpp index b00c70c4bdd5..fa27bd71e4ae 100644 --- a/ydb/core/statistics/ut_common/ut_common.cpp +++ b/ydb/core/statistics/ut_common/ut_common.cpp @@ -155,14 +155,36 @@ TPathId ResolvePathId(TTestActorRuntime& runtime, const TString& path, TPathId* *domainKey = resultEntry.DomainInfo->DomainKey; } - if (saTabletId && resultEntry.DomainInfo->Params.HasStatisticsAggregator()) { - *saTabletId = resultEntry.DomainInfo->Params.GetStatisticsAggregator(); + if (saTabletId) { + if (resultEntry.DomainInfo->Params.HasStatisticsAggregator()) { + *saTabletId = resultEntry.DomainInfo->Params.GetStatisticsAggregator(); + } else { + auto resourcesDomainKey = resultEntry.DomainInfo->ResourcesDomainKey; + auto request = std::make_unique(); + auto& entry = request->ResultSet.emplace_back(); + entry.TableId = TTableId(resourcesDomainKey.OwnerId, resourcesDomainKey.LocalPathId); + entry.RequestType = TNavigate::TEntry::ERequestType::ByTableId; + entry.Operation = TNavigate::EOp::OpPath; + entry.RedirectRequired = false; + runtime.Send(MakeSchemeCacheID(), sender, new TEvRequest(request.release())); + + auto ev = runtime.GrabEdgeEventRethrow(sender); + UNIT_ASSERT(ev); + UNIT_ASSERT(ev->Get()); + std::unique_ptr response(ev->Get()->Request.Release()); + UNIT_ASSERT(response->ResultSet.size() == 1); + auto& secondResultEntry = response->ResultSet[0]; + + if (secondResultEntry.DomainInfo->Params.HasStatisticsAggregator()) { + *saTabletId = secondResultEntry.DomainInfo->Params.GetStatisticsAggregator(); + } + } } return resultEntry.TableId.PathId; } -NKikimrScheme::TEvDescribeSchemeResult DescribeTable(TTestActorRuntime& runtime, TActorId sender, const TString &path) +NKikimrScheme::TEvDescribeSchemeResult DescribeTable(TTestActorRuntime& runtime, TActorId sender, const TString& path) { TAutoPtr handle; @@ -175,7 +197,7 @@ NKikimrScheme::TEvDescribeSchemeResult DescribeTable(TTestActorRuntime& runtime, return *reply->MutableRecord(); } -TVector GetTableShards(TTestActorRuntime& runtime, TActorId sender, const TString &path) +TVector GetTableShards(TTestActorRuntime& runtime, TActorId sender, const TString& path) { TVector shards; auto lsResult = DescribeTable(runtime, sender, path); @@ -185,7 +207,7 @@ TVector GetTableShards(TTestActorRuntime& runtime, TActorId sender, const return shards; } -TVector GetColumnTableShards(TTestActorRuntime& runtime, TActorId sender,const TString &path) +TVector GetColumnTableShards(TTestActorRuntime& runtime, TActorId sender, const TString& path) { TVector shards; auto lsResult = DescribeTable(runtime, sender, path); @@ -301,6 +323,42 @@ std::vector CreateDatabaseColumnTables(TTestEnv& env, ui8 tableCount return ret; } +std::vector CreateServerlessDatabaseColumnTables(TTestEnv& env, ui8 tableCount, ui8 shardCount) { + auto init = [&] () { + CreateDatabase(env, "Shared"); + }; + std::thread initThread(init); + + auto& runtime = *env.GetServer().GetRuntime(); + runtime.SimulateSleep(TDuration::Seconds(5)); + initThread.join(); + + TPathId domainKey; + ResolvePathId(runtime, "/Root/Shared", &domainKey); + + auto init2 = [&] () { + CreateServerlessDatabase(env, "Serverless", domainKey); + for (ui8 tableId = 1; tableId <= tableCount; tableId++) { + CreateColumnStoreTable(env, "Serverless", Sprintf("Table%u", tableId), shardCount); + } + }; + std::thread init2Thread(init2); + + runtime.SimulateSleep(TDuration::Seconds(5)); + init2Thread.join(); + + auto sender = runtime.AllocateEdgeActor(); + std::vector ret; + for (ui8 tableId = 1; tableId <= tableCount; tableId++) { + TTableInfo tableInfo; + const TString path = Sprintf("/Root/Serverless/Table%u", tableId); + tableInfo.ShardIds = GetColumnTableShards(runtime, sender, path); + tableInfo.PathId = ResolvePathId(runtime, path, &tableInfo.DomainKey, &tableInfo.SaTabletId); + ret.emplace_back(tableInfo); + } + return ret; +} + void DropTable(TTestEnv& env, const TString& databaseName, const TString& tableName) { TTableClient client(env.GetDriver()); auto session = client.CreateSession().GetValueSync().GetSession(); diff --git a/ydb/core/statistics/ut_common/ut_common.h b/ydb/core/statistics/ut_common/ut_common.h index d31dee1a4e1a..47132fc87390 100644 --- a/ydb/core/statistics/ut_common/ut_common.h +++ b/ydb/core/statistics/ut_common/ut_common.h @@ -82,6 +82,7 @@ struct TTableInfo { TPathId PathId; }; std::vector CreateDatabaseColumnTables(TTestEnv& env, ui8 tableCount, ui8 shardCount); +std::vector CreateServerlessDatabaseColumnTables(TTestEnv& env, ui8 tableCount, ui8 shardCount); TPathId ResolvePathId(TTestActorRuntime& runtime, const TString& path, TPathId* domainKey = nullptr, ui64* saTabletId = nullptr); diff --git a/ydb/core/tx/scheme_cache/scheme_cache.h b/ydb/core/tx/scheme_cache/scheme_cache.h index 570f47e67f4d..34e0210bdd8c 100644 --- a/ydb/core/tx/scheme_cache/scheme_cache.h +++ b/ydb/core/tx/scheme_cache/scheme_cache.h @@ -66,6 +66,10 @@ struct TDomainInfo : public TAtomicRefCount { if (descr.HasServerlessComputeResourcesMode()) { ServerlessComputeResourcesMode = descr.GetServerlessComputeResourcesMode(); } + + if (descr.HasSharedHive()) { + SharedHiveId = descr.GetSharedHive(); + } } inline ui64 GetVersion() const { @@ -80,6 +84,14 @@ struct TDomainInfo : public TAtomicRefCount { } } + inline ui64 ExtractHive() const { + if (IsServerless()) { + return SharedHiveId; + } else { + return Params.GetHive(); + } + } + inline bool IsServerless() const { return DomainKey != ResourcesDomainKey; } @@ -89,6 +101,7 @@ struct TDomainInfo : public TAtomicRefCount { NKikimrSubDomains::TProcessingParams Params; TCoordinators Coordinators; TMaybeServerlessComputeResourcesMode ServerlessComputeResourcesMode; + ui64 SharedHiveId = 0; TString ToString() const; From 45ff273b4376cef2d3e15c89558cac609326dee2 Mon Sep 17 00:00:00 2001 From: Alexander Dmitriev Date: Thu, 5 Sep 2024 09:34:13 +0000 Subject: [PATCH 2/3] correct hive id for tests --- ydb/core/statistics/aggregator/tx_navigate.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ydb/core/statistics/aggregator/tx_navigate.cpp b/ydb/core/statistics/aggregator/tx_navigate.cpp index 6d8d178c0881..79df2a01f778 100644 --- a/ydb/core/statistics/aggregator/tx_navigate.cpp +++ b/ydb/core/statistics/aggregator/tx_navigate.cpp @@ -60,6 +60,9 @@ struct TStatisticsAggregator::TTxNavigate : public TTxBase { if (Self->TraversalIsColumnTable) { Self->HiveId = entry.DomainInfo->ExtractHive(); + if (Self->HiveId == 0) { + Self->HiveId = AppData()->DomainsInfo->GetHive(); + } } return true; From 92ab4f7c2f80d29409ca13184bc4430e3453681b Mon Sep 17 00:00:00 2001 From: Alexander Dmitriev Date: Thu, 5 Sep 2024 10:01:15 +0000 Subject: [PATCH 3/3] small test refactoring --- ydb/core/statistics/ut_common/ut_common.cpp | 41 ++++++++++----------- 1 file changed, 19 insertions(+), 22 deletions(-) diff --git a/ydb/core/statistics/ut_common/ut_common.cpp b/ydb/core/statistics/ut_common/ut_common.cpp index fa27bd71e4ae..2737cb68a5ff 100644 --- a/ydb/core/statistics/ut_common/ut_common.cpp +++ b/ydb/core/statistics/ut_common/ut_common.cpp @@ -297,6 +297,21 @@ void CreateColumnStoreTable(TTestEnv& env, const TString& databaseName, const TS env.GetController()->WaitActualization(TDuration::Seconds(1)); } +std::vector GatherColumnTablesInfo(TTestEnv& env, ui8 tableCount) { + auto& runtime = *env.GetServer().GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + std::vector ret; + for (ui8 tableId = 1; tableId <= tableCount; tableId++) { + TTableInfo tableInfo; + const TString path = Sprintf("/Root/Database/Table%u", tableId); + tableInfo.ShardIds = GetColumnTableShards(runtime, sender, path); + tableInfo.PathId = ResolvePathId(runtime, path, &tableInfo.DomainKey, &tableInfo.SaTabletId); + ret.emplace_back(tableInfo); + } + return ret; +} + std::vector CreateDatabaseColumnTables(TTestEnv& env, ui8 tableCount, ui8 shardCount) { auto init = [&] () { CreateDatabase(env, "Database"); @@ -307,20 +322,11 @@ std::vector CreateDatabaseColumnTables(TTestEnv& env, ui8 tableCount std::thread initThread(init); auto& runtime = *env.GetServer().GetRuntime(); - auto sender = runtime.AllocateEdgeActor(); runtime.SimulateSleep(TDuration::Seconds(10)); initThread.join(); - std::vector ret; - for (ui8 tableId = 1; tableId <= tableCount; tableId++) { - TTableInfo tableInfo; - const TString path = Sprintf("/Root/Database/Table%u", tableId); - tableInfo.ShardIds = GetColumnTableShards(runtime, sender, path); - tableInfo.PathId = ResolvePathId(runtime, path, &tableInfo.DomainKey, &tableInfo.SaTabletId); - ret.emplace_back(tableInfo); - } - return ret; + return GatherColumnTablesInfo(env, tableCount); } std::vector CreateServerlessDatabaseColumnTables(TTestEnv& env, ui8 tableCount, ui8 shardCount) { @@ -337,9 +343,9 @@ std::vector CreateServerlessDatabaseColumnTables(TTestEnv& env, ui8 ResolvePathId(runtime, "/Root/Shared", &domainKey); auto init2 = [&] () { - CreateServerlessDatabase(env, "Serverless", domainKey); + CreateServerlessDatabase(env, "Database", domainKey); for (ui8 tableId = 1; tableId <= tableCount; tableId++) { - CreateColumnStoreTable(env, "Serverless", Sprintf("Table%u", tableId), shardCount); + CreateColumnStoreTable(env, "Database", Sprintf("Table%u", tableId), shardCount); } }; std::thread init2Thread(init2); @@ -347,16 +353,7 @@ std::vector CreateServerlessDatabaseColumnTables(TTestEnv& env, ui8 runtime.SimulateSleep(TDuration::Seconds(5)); init2Thread.join(); - auto sender = runtime.AllocateEdgeActor(); - std::vector ret; - for (ui8 tableId = 1; tableId <= tableCount; tableId++) { - TTableInfo tableInfo; - const TString path = Sprintf("/Root/Serverless/Table%u", tableId); - tableInfo.ShardIds = GetColumnTableShards(runtime, sender, path); - tableInfo.PathId = ResolvePathId(runtime, path, &tableInfo.DomainKey, &tableInfo.SaTabletId); - ret.emplace_back(tableInfo); - } - return ret; + return GatherColumnTablesInfo(env, tableCount); } void DropTable(TTestEnv& env, const TString& databaseName, const TString& tableName) {