diff --git a/ydb/core/formats/arrow/hash/calcer.cpp b/ydb/core/formats/arrow/hash/calcer.cpp index 9db679c4b724..f5d2dc3515ea 100644 --- a/ydb/core/formats/arrow/hash/calcer.cpp +++ b/ydb/core/formats/arrow/hash/calcer.cpp @@ -9,6 +9,25 @@ namespace NKikimr::NArrow::NHash { +void TXX64::AppendField(const std::shared_ptr& scalar, NXX64::TStreamStringHashCalcer& hashCalcer) { + AFL_VERIFY(scalar); + NArrow::SwitchType(scalar->type->id(), [&](const auto& type) { + using TWrap = std::decay_t; + using T = typename TWrap::T; + using TScalar = typename arrow::TypeTraits::ScalarType; + + auto& typedScalar = static_cast(*scalar); + if constexpr (arrow::has_string_view()) { + hashCalcer.Update((const ui8*)typedScalar.value->data(), typedScalar.value->size()); + } else if constexpr (arrow::has_c_type()) { + hashCalcer.Update((const ui8*)(typedScalar.data()), sizeof(T)); + } else { + static_assert(arrow::is_decimal_type()); + } + return true; + }); +} + void TXX64::AppendField(const std::shared_ptr& array, const int row, NArrow::NHash::NXX64::TStreamStringHashCalcer& hashCalcer) { NArrow::SwitchType(array->type_id(), [&](const auto& type) { using TWrap = std::decay_t; @@ -21,12 +40,7 @@ void TXX64::AppendField(const std::shared_ptr& array, const int ro if constexpr (arrow::has_string_view()) { hashCalcer.Update((const ui8*)value.data(), value.size()); } else if constexpr (arrow::has_c_type()) { - if constexpr (arrow::is_physical_integer_type()) { - hashCalcer.Update(reinterpret_cast(&value), sizeof(value)); - } else { - // Do not use bool or floats for sharding - static_assert(arrow::is_boolean_type() || arrow::is_floating_type()); - } + hashCalcer.Update(reinterpret_cast(&value), sizeof(value)); } else { static_assert(arrow::is_decimal_type()); } diff --git a/ydb/core/formats/arrow/hash/calcer.h b/ydb/core/formats/arrow/hash/calcer.h index 066ba0cb135f..511c8c401874 100644 --- a/ydb/core/formats/arrow/hash/calcer.h +++ b/ydb/core/formats/arrow/hash/calcer.h @@ -30,6 +30,7 @@ class TXX64 { TXX64(const std::vector& columnNames, const ENoColumnPolicy noColumnPolicy, const ui64 seed = 0); static void AppendField(const std::shared_ptr& array, const int row, NXX64::TStreamStringHashCalcer& hashCalcer); + static void AppendField(const std::shared_ptr& scalar, NXX64::TStreamStringHashCalcer& hashCalcer); std::optional> Execute(const std::shared_ptr& batch) const; std::shared_ptr ExecuteToArray(const std::shared_ptr& batch, const std::string& hashFieldName) const; }; diff --git a/ydb/core/formats/arrow/program.cpp b/ydb/core/formats/arrow/program.cpp index 424d82dc01d8..fadd6bbf76b4 100644 --- a/ydb/core/formats/arrow/program.cpp +++ b/ydb/core/formats/arrow/program.cpp @@ -20,7 +20,7 @@ enum class AggFunctionId { AGG_MAX = 4, AGG_SUM = 5, }; -struct GroupByOptions : public arrow::compute::ScalarAggregateOptions { +struct GroupByOptions: public arrow::compute::ScalarAggregateOptions { struct Assign { AggFunctionId function = AggFunctionId::AGG_UNSPECIFIED; std::string result_column; @@ -43,6 +43,7 @@ struct GroupByOptions : public arrow::compute::ScalarAggregateOptions { #include #include #include +#include namespace NKikimr::NSsa { @@ -605,6 +606,32 @@ IStepFunction::TPtr TAssign::GetFunction(arrow::compute::ExecContext* c return std::make_shared(ctx); } +TString TAssign::DebugString() const { + TStringBuilder sb; + sb << "{"; + if (Operation != EOperation::Unspecified) { + sb << "op=" << Operation << ";"; + } + if (YqlOperationId) { + sb << "yql_op=" << (NYql::TKernelRequestBuilder::EBinaryOp)*YqlOperationId << ";"; + } + if (Arguments.size()) { + sb << "arguments=["; + for (auto&& i : Arguments) { + sb << i.DebugString() << ";"; + } + sb << "];"; + } + if (Constant) { + sb << "const=" << Constant->ToString() << ";"; + } + if (KernelFunction) { + sb << "kernel=" << KernelFunction->name() << ";"; + } + sb << "column=" << Column.DebugString() << ";"; + sb << "}"; + return sb; +} IStepFunction::TPtr TAggregateAssign::GetFunction(arrow::compute::ExecContext* ctx) const { if (KernelFunction) { diff --git a/ydb/core/formats/arrow/program.h b/ydb/core/formats/arrow/program.h index 31254144fa93..000bd447b1e5 100644 --- a/ydb/core/formats/arrow/program.h +++ b/ydb/core/formats/arrow/program.h @@ -108,6 +108,8 @@ class IStepFunction { }; class TAssign { +private: + YDB_ACCESSOR_DEF(std::optional, YqlOperationId); public: using TOperationType = EOperation; @@ -237,12 +239,7 @@ class TAssign { const arrow::compute::FunctionOptions* GetOptions() const { return FuncOpts.get(); } IStepFunction::TPtr GetFunction(arrow::compute::ExecContext* ctx) const; - TString DebugString() const { - return TStringBuilder() << - "{op=" << Operation << ";column=" << Column.DebugString() << ";" << (Constant ? "const=" + Constant->ToString() + ";" : "NO;") - << (KernelFunction ? ("kernel=" + KernelFunction->name() + ";") : "NO;") - << "}"; - } + TString DebugString() const; private: const TColumnInfo Column; EOperation Operation{EOperation::Unspecified}; @@ -325,13 +322,26 @@ class TProgramStep { TString DebugString() const { TStringBuilder sb; sb << "{"; - sb << "assignes=["; - for (auto&& i : Assignes) { - sb << i.DebugString() << ";"; + if (Assignes.size()) { + sb << "assignes=["; + for (auto&& i : Assignes) { + sb << i.DebugString() << ";"; + } + sb << "];"; + } + if (Filters.size()) { + sb << "filters=["; + for (auto&& i : Filters) { + sb << i.DebugString() << ";"; + } + sb << "];"; + } + if (GroupBy.size()) { + sb << "group_by_count=" << GroupBy.size() << "; "; + } + if (GroupByKeys.size()) { + sb << "group_by_keys_count=" << GroupByKeys.size() << ";"; } - sb << "];"; - sb << "group_by_count = " << GroupBy.size() << "; "; - sb << "group_by_keys_count=" << GroupByKeys.size() << ";"; sb << "projections=["; for (auto&& i : Projection) { @@ -396,6 +406,7 @@ class TProgramStep { }; struct TProgram { +public: std::vector> Steps; THashMap SourceColumns; diff --git a/ydb/core/formats/arrow/ya.make b/ydb/core/formats/arrow/ya.make index 32fc6e462d1f..146c5bae1e86 100644 --- a/ydb/core/formats/arrow/ya.make +++ b/ydb/core/formats/arrow/ya.make @@ -18,6 +18,7 @@ PEERDIR( ydb/library/binary_json ydb/library/dynumber ydb/library/services + ydb/library/yql/core/arrow_kernels/request ) IF (OS_WINDOWS) diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp index 0f9ff8525c00..b225b8987963 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp @@ -4,6 +4,9 @@ #include #include #include +#include +#include +#include #include #include @@ -45,8 +48,7 @@ TTaskMeta::TReadInfo::EReadType ReadTypeFromProto(const NKqpProto::TKqpPhyOpRead } -std::pair SerializeKqpTasksParametersForOlap(const TStageInfo& stageInfo, const TTask& task) -{ +std::pair SerializeKqpTasksParametersForOlap(const TStageInfo& stageInfo, const TTask& task) { const NKqpProto::TKqpPhyStage& stage = stageInfo.Meta.GetStage(stageInfo.Id); std::vector> columns; std::vector> data; @@ -917,11 +919,34 @@ void FillTaskMeta(const TStageInfo& stageInfo, const TTask& task, NYql::NDqProto if (tableInfo->TableKind == ETableKind::Olap) { auto* olapProgram = protoTaskMeta.MutableOlapProgram(); + auto [schema, parameters] = SerializeKqpTasksParametersForOlap(stageInfo, task); + olapProgram->SetProgram(task.Meta.ReadInfo.OlapProgram.Program); - auto [schema, parameters] = SerializeKqpTasksParametersForOlap(stageInfo, task); olapProgram->SetParametersSchema(schema); olapProgram->SetParameters(parameters); + + if (!!stageInfo.Meta.ColumnTableInfoPtr) { + std::shared_ptr olapSchema = std::make_shared(); + olapSchema->ParseFromLocalDB(stageInfo.Meta.ColumnTableInfoPtr->Description.GetSchema()); + if (olapSchema->GetIndexes().GetIndexes().size()) { + NOlap::TProgramContainer container; + NOlap::TSchemaResolverColumnsOnly resolver(olapSchema); + TString error; + YQL_ENSURE(container.Init(resolver, *olapProgram, error), "" << error); + auto data = NOlap::NIndexes::NRequest::TDataForIndexesCheckers::Build(container); + if (data) { + for (auto&& [indexId, i] : olapSchema->GetIndexes().GetIndexes()) { + AFL_VERIFY(!!i.GetIndexMeta()); + i.GetIndexMeta()->FillIndexCheckers(data, *olapSchema); + } + auto checker = data->GetCoverChecker(); + if (!!checker) { + checker.SerializeToProto(*olapProgram->MutableIndexChecker()); + } + } + } + } } else { YQL_ENSURE(task.Meta.ReadInfo.OlapProgram.Program.empty()); } diff --git a/ydb/core/kqp/gateway/behaviour/tablestore/operations/upsert_index.h b/ydb/core/kqp/gateway/behaviour/tablestore/operations/upsert_index.h index 753eade06d8c..267829a1a5f4 100644 --- a/ydb/core/kqp/gateway/behaviour/tablestore/operations/upsert_index.h +++ b/ydb/core/kqp/gateway/behaviour/tablestore/operations/upsert_index.h @@ -1,5 +1,5 @@ #include "abstract.h" -#include +#include namespace NKikimr::NKqp { diff --git a/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp index 0eecc157fd75..746c34ea4a49 100644 --- a/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp @@ -674,6 +674,7 @@ TTypedColumn CompileYqlKernelBinaryOperation(const TKqpOlapFilterBinaryOp& opera } const auto kernel = ctx.AddYqlKernelBinaryFunc(op, *leftColumn.Type, *rightColumn.Type, type); + cmpFunc->SetYqlOperationId((ui32)op); cmpFunc->SetFunctionType(TProgram::YQL_KERNEL); cmpFunc->SetKernelIdx(kernel.first); cmpFunc->AddArguments()->SetId(leftColumn.Id); @@ -706,8 +707,10 @@ const TTypedColumn BuildLogicalProgram(const TExprNode::TChildrenType& args, con const auto idx = ctx.GetKernelRequestBuilder().AddBinaryOp(function, block, block, block); logicalFunc->SetKernelIdx(idx); logicalFunc->SetFunctionType(TProgram::YQL_KERNEL); + logicalFunc->SetYqlOperationId((ui32)function); } else { logicalFunc->SetFunctionType(function); + logicalFunc->SetId((ui32)function); } return {logicalOp->GetColumn().GetId(), block}; diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp index adff2eb29ec5..e38258931b12 100644 --- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp @@ -762,16 +762,12 @@ Y_UNIT_TEST_SUITE(KqpOlap) { .SetWithSampleTables(false); TKikimrRunner kikimr(settings); - // EnableDebugLogging(kikimr); - TLocalHelper(kikimr).CreateTestOlapTable(); WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000, 2); auto client = kikimr.GetTableClient(); - // EnableDebugLogging(kikimr); - { TStreamExecScanQuerySettings settings; settings.CollectQueryStats(ECollectQueryStatsMode::Full); @@ -872,8 +868,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) { .SetWithSampleTables(false); TKikimrRunner kikimr(settings); - // EnableDebugLogging(kikimr); - TLocalHelper(kikimr).CreateTestOlapTable(); WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000, 2); @@ -902,8 +896,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) { .SetWithSampleTables(false); TKikimrRunner kikimr(settings); - // EnableDebugLogging(kikimr); - TLocalHelper(kikimr).CreateTestOlapTable(); WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000, 2); @@ -933,8 +925,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) { .SetWithSampleTables(false); TKikimrRunner kikimr(settings); - // EnableDebugLogging(kikimr); - TLocalHelper(kikimr).CreateTestOlapTable(); WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000, 2); @@ -1091,8 +1081,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) { .SetWithSampleTables(false); TKikimrRunner kikimr(settings); - // EnableDebugLogging(kikimr); - auto client = kikimr.GetTableClient(); TLocalHelper(kikimr).CreateTestOlapTable(); @@ -1123,8 +1111,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) { .SetWithSampleTables(false); TKikimrRunner kikimr(settings); - // EnableDebugLogging(kikimr); - TLocalHelper(kikimr).CreateTestOlapTable(); WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000, 3); @@ -1175,14 +1161,10 @@ Y_UNIT_TEST_SUITE(KqpOlap) { .SetWithSampleTables(false); TKikimrRunner kikimr(settings); - // EnableDebugLogging(kikimr); - TLocalHelper(kikimr).CreateTestOlapTable(); auto tableClient = kikimr.GetTableClient(); - // EnableDebugLogging(kikimr); - { auto it = tableClient.StreamExecuteScanQuery(R"( --!syntax_v1 @@ -1198,8 +1180,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) { CompareYson(result, R"([[0u;]])"); } - // EnableDebugLogging(kikimr); - { WriteTestData(kikimr, "/Root/olapStore/olapTable", 10000, 3000000, 1000); WriteTestData(kikimr, "/Root/olapStore/olapTable", 11000, 3001000, 1000); @@ -1210,8 +1190,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) { WriteTestData(kikimr, "/Root/olapStore/olapTable", 30000, 1000000, 11000); } - // EnableDebugLogging(kikimr); - { auto it = tableClient.StreamExecuteScanQuery(R"( --!syntax_v1 @@ -1243,6 +1221,111 @@ Y_UNIT_TEST_SUITE(KqpOlap) { } } + Y_UNIT_TEST(Indexes) { + auto settings = TKikimrSettings() + .SetWithSampleTables(false); + TKikimrRunner kikimr(settings); + + TLocalHelper(kikimr).CreateTestOlapTable(); + auto tableClient = kikimr.GetTableClient(); + + Tests::NCommon::TLoggerInit(kikimr).Initialize(); + + auto csController = NYDBTest::TControllers::RegisterCSControllerGuard(); + + { + auto alterQuery = TStringBuilder() << + R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER, + FEATURES=`{"column_names" : ["uid"], "false_positive_probability" : 0.1}`); + )"; + auto session = tableClient.CreateSession().GetValueSync().GetSession(); + auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString()); + } + { + auto alterQuery = TStringBuilder() << + R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_resource_id, TYPE=BLOOM_FILTER, + FEATURES=`{"column_names" : ["resource_id", "uid"], "false_positive_probability" : 0.2}`); + )"; + auto session = tableClient.CreateSession().GetValueSync().GetSession(); + auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString()); + } + + { + WriteTestData(kikimr, "/Root/olapStore/olapTable", 1000000, 300000000, 10000); + WriteTestData(kikimr, "/Root/olapStore/olapTable", 1100000, 300100000, 10000); + WriteTestData(kikimr, "/Root/olapStore/olapTable", 1200000, 300200000, 10000); + WriteTestData(kikimr, "/Root/olapStore/olapTable", 1300000, 300300000, 10000); + WriteTestData(kikimr, "/Root/olapStore/olapTable", 1400000, 300400000, 10000); + WriteTestData(kikimr, "/Root/olapStore/olapTable", 2000000, 200000000, 70000); + WriteTestData(kikimr, "/Root/olapStore/olapTable", 3000000, 100000000, 110000); + } + + { + auto it = tableClient.StreamExecuteScanQuery(R"( + --!syntax_v1 + + SELECT + COUNT(*) + FROM `/Root/olapStore/olapTable` + )").GetValueSync(); + + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + TString result = StreamResultToYson(it); + Cout << result << Endl; + CompareYson(result, R"([[230000u;]])"); + } + AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() == 0); + TInstant start = Now(); + ui32 compactionsStart = csController->GetCompactions().Val(); + while (Now() - start < TDuration::Seconds(10)) { + if (compactionsStart != csController->GetCompactions().Val()) { + compactionsStart = csController->GetCompactions().Val(); + start = Now(); + } + Cerr << "WAIT_COMPACTION: " << csController->GetCompactions().Val() << Endl; + Sleep(TDuration::Seconds(1)); + } + + { + auto it = tableClient.StreamExecuteScanQuery(R"( + --!syntax_v1 + + SELECT + COUNT(*) + FROM `/Root/olapStore/olapTable` + WHERE resource_id = '3000008' AND level = 3 AND uid = 'uid_100000008' + )").GetValueSync(); + + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + TString result = StreamResultToYson(it); + Cout << result << Endl; + CompareYson(result, R"([[1u;]])"); + AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val()); + AFL_VERIFY(!csController->GetIndexesApprovedOnSelect().Val()); + } + + { + const i64 before = csController->GetIndexesSkippingOnSelect().Val(); + auto it = tableClient.StreamExecuteScanQuery(R"( + --!syntax_v1 + + SELECT + COUNT(*) + FROM `/Root/olapStore/olapTable` + WHERE ((resource_id = '2' AND level = 222222) OR (resource_id = '1' AND level = 111111) OR (resource_id LIKE '%11dd%')) AND uid = '222' + )").GetValueSync(); + + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + TString result = StreamResultToYson(it); + AFL_VERIFY(before != csController->GetIndexesSkippingOnSelect().Val()); + Cout << result << Endl; + CompareYson(result, R"([[0u;]])"); + AFL_VERIFY(!csController->GetIndexesApprovedOnSelect().Val()); + } + } + Y_UNIT_TEST(PushdownFilter) { static bool enableLog = false; @@ -1427,7 +1510,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) { TLocalHelper(kikimr).CreateTestOlapTable(); auto csController = NYDBTest::TControllers::RegisterCSControllerGuard(); WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000, 2000); -// EnableDebugLogging(kikimr); auto tableClient = kikimr.GetTableClient(); auto selectQuery = TString(R"( @@ -1919,7 +2001,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) { .SetForceColumnTablesCompositeMarks(true); TKikimrRunner kikimr(settings); - // EnableDebugLogging(kikimr); TLocalHelper(kikimr).CreateTestOlapTable(); auto tableClient = kikimr.GetTableClient(); @@ -1964,7 +2045,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) { .SetForceColumnTablesCompositeMarks(true); TKikimrRunner kikimr(settings); - // EnableDebugLogging(kikimr); TLocalHelper(kikimr).CreateTestOlapTable(); auto tableClient = kikimr.GetTableClient(); @@ -2011,7 +2091,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) { .SetForceColumnTablesCompositeMarks(true); TKikimrRunner kikimr(settings); - // EnableDebugLogging(kikimr); TLocalHelper(kikimr).CreateTestOlapTable(); auto tableClient = kikimr.GetTableClient(); @@ -2056,7 +2135,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) { .SetForceColumnTablesCompositeMarks(true); TKikimrRunner kikimr(settings); - // EnableDebugLogging(kikimr); TLocalHelper(kikimr).CreateTestOlapTable(); auto tableClient = kikimr.GetTableClient(); @@ -2101,7 +2179,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) { .SetForceColumnTablesCompositeMarks(true); TKikimrRunner kikimr(settings); - // EnableDebugLogging(kikimr); TLocalHelper(kikimr).CreateTestOlapTable(); auto tableClient = kikimr.GetTableClient(); @@ -2289,7 +2366,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) { .SetForceColumnTablesCompositeMarks(true); TKikimrRunner kikimr(settings); - //EnableDebugLogging(kikimr); TLocalHelper(kikimr).CreateTestOlapTable(); auto tableClient = kikimr.GetTableClient(); @@ -2383,7 +2459,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) { .SetForceColumnTablesCompositeMarks(true); TKikimrRunner kikimr(settings); -// EnableDebugLogging(kikimr); TClickHelper(kikimr).CreateClickBenchTable(); auto tableClient = kikimr.GetTableClient(); @@ -2422,7 +2497,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) { auto sender = runtime->AllocateEdgeActor(); InitRoot(server, sender); -// EnableDebugLogging(runtime); TClickHelper(*server).CreateClickBenchTable(); @@ -3416,8 +3490,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) { WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000 + i*10000, 1000); } - // EnableDebugLogging(kikimr); - auto tableClient = kikimr.GetTableClient(); auto selectQuery = TString(R"( SELECT PathId, Kind, TabletId, Sum(Rows) as Rows @@ -3462,8 +3534,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) { WriteTestData(kikimr, "/Root/olapStore/olapTable_2", 0, 1000000 + i*10000, 2000); } - // EnableDebugLogging(kikimr); - auto tableClient = kikimr.GetTableClient(); { auto selectQuery = TString(R"( @@ -3918,8 +3988,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) { WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000 + i*10000, 2000); } - // EnableDebugLogging(kikimr); - auto tableClient = kikimr.GetTableClient(); { @@ -4068,8 +4136,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) { WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000 + i*10000, 2000); } - // EnableDebugLogging(kikimr); - auto tableClient = kikimr.GetTableClient(); { @@ -4470,8 +4536,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) { .SetWithSampleTables(false); TKikimrRunner kikimr(settings); - //EnableDebugLogging(kikimr); - auto tableClient = kikimr.GetTableClient(); auto session = tableClient.CreateSession().GetValueSync().GetSession(); @@ -4668,7 +4732,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) { Tests::TClient client(serverSettings); auto& runtime = *server->GetRuntime(); - EnableDebugLogging(&runtime); auto sender = runtime.AllocateEdgeActor(); server->SetupRootStoragePools(sender); diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp index 47fec6626cb7..aecd1b44ba2b 100644 --- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp @@ -5369,6 +5369,8 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { TTestHelper::TColumnSchema().SetName("resource_id").SetType(NScheme::NTypeIds::Utf8), TTestHelper::TColumnSchema().SetName("level").SetType(NScheme::NTypeIds::Int32) }; + + Tests::NCommon::TLoggerInit(testHelper.GetKikimr()).Initialize(); TTestHelper::TColumnTable testTable; testTable.SetName("/Root/ColumnTableTest").SetPrimaryKey({"id"}).SetSharding({"id"}).SetSchema(schema); diff --git a/ydb/core/protos/ssa.proto b/ydb/core/protos/ssa.proto index 69db3462922b..92e5738a3acc 100644 --- a/ydb/core/protos/ssa.proto +++ b/ydb/core/protos/ssa.proto @@ -40,6 +40,24 @@ message TProgram { } } + message TBloomFilterChecker { + repeated uint32 HashValues = 1; + } + + message TOlapIndexChecker { + optional uint32 IndexId = 1; + optional string ClassName = 2; + + message TCompositeChecker { + repeated TOlapIndexChecker ChildrenCheckers = 1; + } + + oneof Implementation { + TBloomFilterChecker BloomFilter = 40; + TCompositeChecker Composite = 41; + } + } + message TParameter { optional string Name = 1; } @@ -96,6 +114,7 @@ message TProgram { repeated TColumn Arguments = 2; optional EFunctionType FunctionType = 3 [ default = SIMPLE_ARROW ]; optional uint32 KernelIdx = 4; + optional uint32 YqlOperationId = 5; // TKernelRequestBuilder::EBinaryOp } message TExternalFunction { @@ -185,4 +204,5 @@ message TOlapProgram { // RecordBatch deserialization require arrow::Schema, thus store it here optional bytes ParametersSchema = 2; optional bytes Parameters = 3; + optional TProgram.TOlapIndexChecker IndexChecker = 4; } diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h index 36d7695ffe75..6dfe9d7e1e2c 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.h +++ b/ydb/core/tx/columnshard/columnshard_schema.h @@ -6,7 +6,6 @@ #include #include #include -#include #include #include @@ -39,6 +38,7 @@ struct Schema : NIceDb::Schema { ColumnsTableId, CountersTableId, OperationsTableId, + IndexesTableId }; enum class ETierTables: ui32 { @@ -290,6 +290,19 @@ struct Schema : NIceDb::Schema { using TColumns = TableColumns; }; + struct IndexIndexes: NIceDb::Schema::Table { + struct PathId: Column<1, NScheme::NTypeIds::Uint64> {}; + struct PortionId: Column<2, NScheme::NTypeIds::Uint64> {}; + struct IndexId: Column<3, NScheme::NTypeIds::Uint32> {}; + struct ChunkIdx: Column<4, NScheme::NTypeIds::Uint32> {}; + struct Blob: Column<5, NScheme::NTypeIds::String> {}; + struct Offset: Column<6, NScheme::NTypeIds::Uint32> {}; + struct Size: Column<7, NScheme::NTypeIds::Uint32> {}; + + using TKey = TableKey; + using TColumns = TableColumns; + }; + using TTables = SchemaTables< Value, TxInfo, @@ -310,7 +323,8 @@ struct Schema : NIceDb::Schema { OneToOneEvictedBlobs, Operations, TierBlobsDraft, - TierBlobsToDelete + TierBlobsToDelete, + IndexIndexes >; // @@ -600,4 +614,27 @@ class TColumnChunkLoadContext { } }; +class TIndexChunkLoadContext { +private: + YDB_READONLY_DEF(TBlobRange, BlobRange); + TChunkAddress Address; +public: + TIndexChunk BuildIndexChunk() const { + return TIndexChunk(Address.GetColumnId(), Address.GetChunkIdx(), BlobRange); + } + + template + TIndexChunkLoadContext(const TSource& rowset, const IBlobGroupSelector* dsGroupSelector) + : Address(rowset.template GetValue(), rowset.template GetValue()) { + AFL_VERIFY(Address.GetColumnId())("event", "incorrect address")("address", Address.DebugString()); + TString strBlobId = rowset.template GetValue(); + Y_ABORT_UNLESS(strBlobId.size() == sizeof(TLogoBlobID), "Size %" PRISZT " doesn't match TLogoBlobID", strBlobId.size()); + TLogoBlobID logoBlobId((const ui64*)strBlobId.data()); + BlobRange.BlobId = NOlap::TUnifiedBlobId(dsGroupSelector->GetGroup(logoBlobId), logoBlobId); + BlobRange.Offset = rowset.template GetValue(); + BlobRange.Size = rowset.template GetValue(); + AFL_VERIFY(BlobRange.BlobId.IsValid() && BlobRange.Size)("event", "incorrect blob")("blob", BlobRange.ToString()); + } +}; + } diff --git a/ydb/core/tx/columnshard/common/portion.cpp b/ydb/core/tx/columnshard/common/portion.cpp index 0359f5905d03..bc4c2ea7a075 100644 --- a/ydb/core/tx/columnshard/common/portion.cpp +++ b/ydb/core/tx/columnshard/common/portion.cpp @@ -1,4 +1,6 @@ #include "portion.h" +#include +#include namespace NKikimr::NOlap::NPortion { diff --git a/ydb/core/tx/columnshard/common/portion.h b/ydb/core/tx/columnshard/common/portion.h index 3b828374dc7b..ae35cbcfc23f 100644 --- a/ydb/core/tx/columnshard/common/portion.h +++ b/ydb/core/tx/columnshard/common/portion.h @@ -1,5 +1,6 @@ #pragma once #include +#include namespace NKikimr::NOlap::NPortion { // NOTE: These values are persisted in LocalDB so they must be stable @@ -12,4 +13,12 @@ enum EProduced: ui32 { EVICTED }; +class TSpecialColumns { +public: + static constexpr const char* SPEC_COL_PLAN_STEP = "_yql_plan_step"; + static constexpr const char* SPEC_COL_TX_ID = "_yql_tx_id"; + static const ui32 SPEC_COL_PLAN_STEP_INDEX = 0xffffff00; + static const ui32 SPEC_COL_TX_ID_INDEX = SPEC_COL_PLAN_STEP_INDEX + 1; +}; + } diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp index f492aacbd43e..631b380e320a 100644 --- a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp @@ -181,6 +181,7 @@ void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByChunks(TConstruc for (auto&& p : columnChunks) { portionColumns.emplace(p.first, p.second[i].GetChunks()); } + resultSchema->GetIndexInfo().AppendIndexes(portionColumns); batchSlices.emplace_back(portionColumns, schemaDetails, context.Counters.SplitterCounters, GetSplitSettings()); } TSimilarSlicer slicer(GetSplitSettings().GetExpectedPortionSize()); diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index c608915528c3..39482ad60fdc 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -182,6 +182,14 @@ bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db) { return false; } + if (!db.LoadIndexes([&](const ui64 pathId, const ui64 portionId, const TIndexChunkLoadContext& loadContext) { + auto portion = GetGranulePtrVerified(pathId)->GetPortionPtr(portionId); + AFL_VERIFY(portion); + portion->AddIndex(loadContext.BuildIndexChunk()); + })) { + return false; + }; + for (auto&& i : Tables) { i.second->OnAfterPortionsLoad(); } diff --git a/ydb/core/tx/columnshard/engines/db_wrapper.cpp b/ydb/core/tx/columnshard/engines/db_wrapper.cpp index f3446b1bde6f..20d5989e939b 100644 --- a/ydb/core/tx/columnshard/engines/db_wrapper.cpp +++ b/ydb/core/tx/columnshard/engines/db_wrapper.cpp @@ -88,8 +88,45 @@ bool TDbWrapper::LoadColumns(const std::function().Key(portion.GetPathId(), portion.GetPortionId(), row.GetIndexId(), row.GetChunkIdx()).Update( + NIceDb::TUpdate(row.GetBlobRange().BlobId.SerializeBinary()), + NIceDb::TUpdate(row.GetBlobRange().Offset), + NIceDb::TUpdate(row.GetBlobRange().Size) + ); +} + +void TDbWrapper::EraseIndex(const TPortionInfo& portion, const TIndexChunk& row) { + NIceDb::TNiceDb db(Database); + using IndexIndexes = NColumnShard::Schema::IndexIndexes; + db.Table().Key(portion.GetPathId(), portion.GetPortionId(), row.GetIndexId(), 0).Delete(); +} + +bool TDbWrapper::LoadIndexes(const std::function& callback) { + NIceDb::TNiceDb db(Database); + using IndexIndexes = NColumnShard::Schema::IndexIndexes; + auto rowset = db.Table().Select(); + if (!rowset.IsReady()) { + return false; + } + + while (!rowset.EndOfSet()) { + NOlap::TIndexChunkLoadContext chunkLoadContext(rowset, DsGroupSelector); + callback(rowset.GetValue(), rowset.GetValue(), chunkLoadContext); + + if (!rowset.Next()) { + return false; + } } return true; } diff --git a/ydb/core/tx/columnshard/engines/db_wrapper.h b/ydb/core/tx/columnshard/engines/db_wrapper.h index a90684dbffd7..743f43272b20 100644 --- a/ydb/core/tx/columnshard/engines/db_wrapper.h +++ b/ydb/core/tx/columnshard/engines/db_wrapper.h @@ -8,9 +8,11 @@ class TDatabase; namespace NKikimr::NOlap { class TColumnChunkLoadContext; +class TIndexChunkLoadContext; struct TInsertedData; class TInsertTableAccessor; struct TColumnRecord; +class TIndexChunk; struct TGranuleRecord; class IColumnEngine; class TPortionInfo; @@ -33,6 +35,10 @@ class IDbWrapper { virtual void EraseColumn(const TPortionInfo& portion, const TColumnRecord& row) = 0; virtual bool LoadColumns(const std::function& callback) = 0; + virtual void WriteIndex(const TPortionInfo& portion, const TIndexChunk& row) = 0; + virtual void EraseIndex(const TPortionInfo& portion, const TIndexChunk& row) = 0; + virtual bool LoadIndexes(const std::function& callback) = 0; + virtual void WriteCounter(ui32 counterId, ui64 value) = 0; virtual bool LoadCounters(const std::function& callback) = 0; }; @@ -57,6 +63,10 @@ class TDbWrapper : public IDbWrapper { void EraseColumn(const NOlap::TPortionInfo& portion, const TColumnRecord& row) override; bool LoadColumns(const std::function& callback) override; + virtual void WriteIndex(const TPortionInfo& portion, const TIndexChunk& row) override; + virtual void EraseIndex(const TPortionInfo& portion, const TIndexChunk& row) override; + virtual bool LoadIndexes(const std::function& callback) override; + void WriteCounter(ui32 counterId, ui64 value) override; bool LoadCounters(const std::function& callback) override; diff --git a/ydb/core/tx/columnshard/engines/portions/column_record.h b/ydb/core/tx/columnshard/engines/portions/column_record.h index 688ecdb3c20a..3917a0352271 100644 --- a/ydb/core/tx/columnshard/engines/portions/column_record.h +++ b/ydb/core/tx/columnshard/engines/portions/column_record.h @@ -16,6 +16,26 @@ namespace NKikimr::NOlap { class TColumnChunkLoadContext; struct TIndexInfo; +class TIndexChunk { +private: + YDB_READONLY(ui32, IndexId, 0); + YDB_READONLY(ui32, ChunkIdx, 0); + YDB_READONLY_DEF(TBlobRange, BlobRange); + +public: + TIndexChunk(const ui32 indexId, const ui32 chunkIdx, const TBlobRange& blobRange) + : IndexId(indexId) + , ChunkIdx(chunkIdx) + , BlobRange(blobRange) { + + } + + void RegisterBlobId(const TUnifiedBlobId& blobId) { +// AFL_VERIFY(!BlobRange.BlobId.GetTabletId())("original", BlobRange.BlobId.ToStringNew())("new", blobId.ToStringNew()); + BlobRange.BlobId = blobId; + } +}; + struct TChunkMeta: public TSimpleChunkMeta { private: using TBase = TSimpleChunkMeta; diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp index d2fdb926ed02..fdf35132ff1a 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp @@ -120,6 +120,16 @@ ui64 TPortionInfo::GetRawBytes(const std::set& entityIds) const { return sum; } +ui64 TPortionInfo::GetIndexBytes(const std::set& entityIds) const { + ui64 sum = 0; + for (auto&& r : Indexes) { + if (entityIds.contains(r.GetIndexId())) { + sum += r.GetBlobRange().Size; + } + } + return sum; +} + int TPortionInfo::CompareSelfMaxItemMinByPk(const TPortionInfo& item, const TIndexInfo& info) const { return CompareByColumnIdsImpl(item, info.KeyColumns); } @@ -211,12 +221,20 @@ void TPortionInfo::RemoveFromDatabase(IDbWrapper& db) const { for (auto& record : Records) { db.EraseColumn(*this, record); } + for (auto& record : Indexes) { + db.EraseIndex(*this, record); + } } + void TPortionInfo::SaveToDatabase(IDbWrapper& db) const { for (auto& record : Records) { db.WriteColumn(*this, record); } + for (auto& record : Indexes) { + db.WriteIndex(*this, record); + } } + std::shared_ptr TPortionInfo::TPreparedColumn::Assemble() const { Y_ABORT_UNLESS(!Blobs.empty()); diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h index a53c6734893e..225c1381c790 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h @@ -1,11 +1,13 @@ #pragma once #include "column_record.h" #include "meta.h" + +#include +#include +#include #include #include -#include -#include -#include + #include namespace NKikimr::NOlap { @@ -25,6 +27,8 @@ class TPortionInfo { TPortionMeta Meta; std::shared_ptr BlobsOperator; ui64 DeprecatedGranuleId = 0; + YDB_READONLY_DEF(std::vector, Indexes); + public: std::vector Records; @@ -37,21 +41,37 @@ class TPortionInfo { } void RegisterBlobId(const TChunkAddress& address, const TUnifiedBlobId& blobId) { - bool found = false; for (auto it = Records.begin(); it != Records.end(); ++it) { if (it->ColumnId == address.GetEntityId() && it->Chunk == address.GetChunkIdx()) { it->RegisterBlobId(blobId); - found = true; - break; + return; } } - AFL_VERIFY(found)("address", address.DebugString()); + for (auto it = Indexes.begin(); it != Indexes.end(); ++it) { + if (it->GetIndexId() == address.GetEntityId() && it->GetChunkIdx() == address.GetChunkIdx()) { + it->RegisterBlobId(blobId); + return; + } + } + AFL_VERIFY(false)("problem", "portion haven't address for blob registration")("address", address.DebugString()); } void RemoveFromDatabase(IDbWrapper& db) const; void SaveToDatabase(IDbWrapper& db) const; + void AddIndex(const TIndexChunk& chunk) { + ui32 chunkIdx = 0; + for (auto&& i : Indexes) { + if (i.GetIndexId() == chunk.GetIndexId()) { + AFL_VERIFY(chunkIdx == i.GetChunkIdx())("index_id", chunk.GetIndexId())("expected", chunkIdx)("real", i.GetChunkIdx()); + ++chunkIdx; + } + } + AFL_VERIFY(chunkIdx == chunk.GetChunkIdx())("index_id", chunk.GetIndexId())("expected", chunkIdx)("real", chunk.GetChunkIdx()); + Indexes.emplace_back(chunk); + } + bool OlderThen(const TPortionInfo& info) const { return RecordSnapshotMin() < info.RecordSnapshotMin(); } @@ -356,6 +376,8 @@ class TPortionInfo { return result; } + ui64 GetIndexBytes(const std::set& columnIds) const; + ui64 GetRawBytes(const std::vector& columnIds) const; ui64 GetRawBytes(const std::set& columnIds) const; ui64 GetRawBytes() const { diff --git a/ydb/core/tx/columnshard/engines/predicate/ya.make b/ydb/core/tx/columnshard/engines/predicate/ya.make index 10d5ff3b45b7..3de8343b0b54 100644 --- a/ydb/core/tx/columnshard/engines/predicate/ya.make +++ b/ydb/core/tx/columnshard/engines/predicate/ya.make @@ -13,4 +13,6 @@ PEERDIR( ydb/core/formats/arrow ) +YQL_LAST_ABI_VERSION() + END() diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h index 9b50c4cfaa12..97fef7f317eb 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h @@ -2,9 +2,35 @@ #include #include #include +#include namespace NKikimr::NOlap::NPlainReader { +class TIndexesSet { +private: + YDB_READONLY_DEF(std::vector, IndexIds); + YDB_READONLY_DEF(std::set, IndexIdsSet); +public: + TIndexesSet(const std::set& indexIds) + : IndexIds(indexIds.begin(), indexIds.end()) + , IndexIdsSet(indexIds) { + AFL_VERIFY(IndexIds.size() == IndexIdsSet.size())("indexes", JoinSeq(",", IndexIds)); + } + + TIndexesSet(const ui32& indexId) + : IndexIds({indexId}) + , IndexIdsSet({indexId}) { + } + + ui32 GetIndexesCount() const { + return IndexIds.size(); + } + + TString DebugString() const { + return TStringBuilder() << JoinSeq(",", IndexIds); + } +}; + class TColumnsSet { private: YDB_READONLY_DEF(std::set, ColumnIds); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp index 78a5da19a253..cf07daf49f8f 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp @@ -27,6 +27,12 @@ std::shared_ptr TSpecialReadContext } std::shared_ptr TSpecialReadContext::BuildColumnsFetchingPlan(const bool needSnapshots, const bool exclusiveSource) const { + std::shared_ptr result = std::make_shared(); + std::shared_ptr current = result; + if (!!IndexChecker) { + current = current->AttachNext(std::make_shared(std::make_shared(IndexChecker->GetIndexIds()))); + current = current->AttachNext(std::make_shared(IndexChecker)); + } if (!EFColumns->GetColumnsCount()) { TColumnsSet columnsFetch = *FFColumns; if (needSnapshots) { @@ -36,10 +42,8 @@ std::shared_ptr TSpecialReadContext columnsFetch = columnsFetch + *PKColumns + *SpecColumns; } if (columnsFetch.GetColumnsCount()) { - std::shared_ptr result = std::make_shared(std::make_shared(columnsFetch), "simple"); - std::shared_ptr current = result; + current = current->AttachNext(std::make_shared(std::make_shared(columnsFetch), "simple")); current = current->AttachNext(std::make_shared(std::make_shared(columnsFetch))); - return result; } else { return nullptr; } @@ -49,8 +53,7 @@ std::shared_ptr TSpecialReadContext columnsFetch = columnsFetch + *SpecColumns; } AFL_VERIFY(columnsFetch.GetColumnsCount()); - std::shared_ptr result = std::make_shared(std::make_shared(columnsFetch), "ef"); - std::shared_ptr current = result; + current = current->AttachNext(std::make_shared(std::make_shared(columnsFetch), "ef")); if (needSnapshots || FFColumns->Contains(SpecColumns)) { current = current->AttachNext(std::make_shared(SpecColumns)); @@ -72,13 +75,10 @@ std::shared_ptr TSpecialReadContext current = current->AttachNext(std::make_shared(std::make_shared(columnsAdditionalFetch))); current = current->AttachNext(std::make_shared(std::make_shared(columnsAdditionalFetch))); } - return result; } else { TColumnsSet columnsFetch = *MergeColumns + *EFColumns; AFL_VERIFY(columnsFetch.GetColumnsCount()); - std::shared_ptr result = std::make_shared(std::make_shared(columnsFetch), "full"); - std::shared_ptr current = result; - + current = current->AttachNext(std::make_shared(std::make_shared(columnsFetch), "full")); current = current->AttachNext(std::make_shared(SpecColumns)); if (needSnapshots) { current = current->AttachNext(std::make_shared()); @@ -100,8 +100,8 @@ std::shared_ptr TSpecialReadContext current = current->AttachNext(std::make_shared(std::make_shared(columnsAdditionalFetch))); current = current->AttachNext(std::make_shared(std::make_shared(columnsAdditionalFetch))); } - return result; } + return result->GetNextStep(); } TSpecialReadContext::TSpecialReadContext(const std::shared_ptr& commonContext) @@ -113,6 +113,7 @@ TSpecialReadContext::TSpecialReadContext(const std::shared_ptr& co auto readSchema = ReadMetadata->GetLoadSchema(ReadMetadata->GetSnapshot()); SpecColumns = std::make_shared(TIndexInfo::GetSpecialColumnIdsSet(), ReadMetadata->GetIndexInfo(), readSchema); + IndexChecker = ReadMetadata->GetProgram().GetIndexChecker(); { auto efColumns = ReadMetadata->GetEarlyFilterColumnIds(); if (efColumns.size()) { diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h index 3fe05f23d8b3..a8b727a5385a 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h @@ -19,6 +19,7 @@ class TSpecialReadContext { YDB_READONLY_DEF(std::shared_ptr, FFColumns); YDB_READONLY_DEF(std::shared_ptr, ProgramInputColumns); + NIndexes::TIndexCheckerContainer IndexChecker; TReadMetadata::TConstPtr ReadMetadata; std::shared_ptr EmptyColumns = std::make_shared(); std::shared_ptr PKFFColumns; diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h index 4c29f220d34f..3e3d5fd7173d 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h @@ -49,6 +49,9 @@ class TFetchedData { } std::shared_ptr GetBatch() const { + if (!Table) { + return nullptr; + } return NArrow::ToBatch(Table, true); } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp index 3058b64781c0..0773b0c50a45 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp @@ -15,6 +15,10 @@ bool TStepAction::DoApply(IDataReader& /*owner*/) const { bool TStepAction::DoExecute() { while (Step) { + if (Source->IsEmptyData()) { + FinishedFlag = true; + return true; + } if (!Step->ExecuteInplace(Source, Step)) { return true; } @@ -29,11 +33,20 @@ bool TStepAction::DoExecute() { } bool TBlobsFetchingStep::DoExecuteInplace(const std::shared_ptr& source, const std::shared_ptr& step) const { - return !source->StartFetchingColumns(source, step, Columns); + AFL_VERIFY((!!Columns) ^ (!!Indexes)); + + const bool startFetchingColumns = Columns ? source->StartFetchingColumns(source, step, Columns) : false; + const bool startFetchingIndexes = Indexes ? source->StartFetchingIndexes(source, step, Indexes) : false; + return !startFetchingColumns && !startFetchingIndexes; } ui64 TBlobsFetchingStep::PredictRawBytes(const std::shared_ptr& source) const { - return source->GetRawBytes(Columns->GetColumnIds()); + if (Columns) { + return source->GetRawBytes(Columns->GetColumnIds()); + } else { + AFL_VERIFY(Indexes); + return source->GetIndexBytes(Indexes->GetIndexIdsSet()); + } } bool TAssemblerStep::DoExecuteInplace(const std::shared_ptr& source, const std::shared_ptr& /*step*/) const { @@ -68,4 +81,9 @@ bool TBuildFakeSpec::DoExecuteInplace(const std::shared_ptr& source return true; } +bool TApplyIndexStep::DoExecuteInplace(const std::shared_ptr& source, const std::shared_ptr& /*step*/) const { + source->ApplyIndex(IndexChecker); + return true; +} + } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.h index 297863392a7b..6446538535da 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.h @@ -27,7 +27,9 @@ class IFetchingStep { AFL_VERIFY(nextStep); NextStep = nextStep; nextStep->Index = Index + 1; - nextStep->BranchName = BranchName; + if (!nextStep->BranchName) { + nextStep->BranchName = BranchName; + } return nextStep; } @@ -99,22 +101,66 @@ class TBuildFakeSpec: public IFetchingStep { } }; +class TFakeStep: public IFetchingStep { +private: + using TBase = IFetchingStep; +public: + virtual bool DoExecuteInplace(const std::shared_ptr& /*source*/, const std::shared_ptr& /*step*/) const override { + return true; + } + + TFakeStep() + : TBase("FAKE") + { + + } +}; + +class TApplyIndexStep: public IFetchingStep { +private: + using TBase = IFetchingStep; + const NIndexes::TIndexCheckerContainer IndexChecker; +protected: + virtual bool DoExecuteInplace(const std::shared_ptr& source, const std::shared_ptr& /*step*/) const override; +public: + TApplyIndexStep(const NIndexes::TIndexCheckerContainer& indexChecker) + : TBase("APPLY_INDEX") + , IndexChecker(indexChecker) + { + + } +}; + class TBlobsFetchingStep: public IFetchingStep { private: using TBase = IFetchingStep; std::shared_ptr Columns; + std::shared_ptr Indexes; protected: virtual bool DoExecuteInplace(const std::shared_ptr& source, const std::shared_ptr& step) const override; virtual ui64 PredictRawBytes(const std::shared_ptr& source) const override; virtual TString DoDebugString() const override { - return TStringBuilder() << "columns=" << Columns->DebugString() << ";"; + TStringBuilder sb; + if (Columns) { + sb << "columns=" << Columns->DebugString() << ";"; + } else { + sb << "indexes=" << Indexes->DebugString() << ";"; + } + return sb; } public: TBlobsFetchingStep(const std::shared_ptr& columns, const TString& nameBranch = "") : TBase("FETCHING", nameBranch) - , Columns(columns) - { + , Columns(columns) { AFL_VERIFY(Columns); + AFL_VERIFY(Columns->GetColumnsCount()); + } + + TBlobsFetchingStep(const std::shared_ptr& indexes, const TString& nameBranch = "") + : TBase("FETCHING", nameBranch) + , Indexes(indexes) { + AFL_VERIFY(Indexes); + AFL_VERIFY(Indexes->GetIndexesCount()); } }; diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp index e6a408933f39..e0118a290b0c 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp @@ -8,6 +8,7 @@ #include #include #include +#include namespace NKikimr::NOlap::NPlainReader { @@ -73,10 +74,10 @@ void TPortionDataSource::NeedFetchColumns(const std::set& columnIds, AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "chunks_stats")("fetch", fetchedChunks)("null", nullChunks)("reading_action", readingAction->GetStorageId())("columns", columnIds.size()); } -bool TPortionDataSource::DoStartFetchingColumns(const std::shared_ptr& sourcePtr, - const std::shared_ptr& step, const std::shared_ptr& columns) { +bool TPortionDataSource::DoStartFetchingColumns(const std::shared_ptr& sourcePtr, const std::shared_ptr& step, const std::shared_ptr& columns) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", step->GetName()); - Y_ABORT_UNLESS(columns->GetColumnsCount()); + AFL_VERIFY(columns->GetColumnsCount()); + AFL_VERIFY(!StageData->GetAppliedFilter() || !StageData->GetAppliedFilter()->IsTotalDenyFilter()); auto& columnIds = columns->GetColumnIds(); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", step->GetName())("fetching_info", step->DebugString()); @@ -88,6 +89,41 @@ bool TPortionDataSource::DoStartFetchingColumns(const std::shared_ptrAddNulls(std::move(nullBlocks)); } + if (!readAction->GetExpectedBlobsSize()) { + return false; + } + + std::vector> actions = {readAction}; + auto constructor = std::make_shared(actions, sourcePtr, step, GetContext(), "CS::READ::" + step->GetName(), ""); + NActors::TActivationContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(constructor)); + return true; +} + +bool TPortionDataSource::DoStartFetchingIndexes(const std::shared_ptr& sourcePtr, const std::shared_ptr& step, const std::shared_ptr& indexes) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", step->GetName()); + Y_ABORT_UNLESS(indexes->GetIndexesCount()); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", step->GetName())("fetching_info", step->DebugString()); + + auto readAction = Portion->GetBlobsStorage()->StartReadingAction("CS::READ::" + step->GetName()); + readAction->SetIsBackgroundProcess(false); + { + std::set indexIds; + for (auto&& i : Portion->GetIndexes()) { + if (!indexes->GetIndexIdsSet().contains(i.GetIndexId())) { + continue; + } + indexIds.emplace(i.GetIndexId()); + readAction->AddRange(i.GetBlobRange()); + } + if (indexes->GetIndexIdsSet().size() != indexIds.size()) { + return false; + } + } + + if (!readAction->GetExpectedBlobsSize()) { + return false; + } + std::vector> actions = {readAction}; auto constructor = std::make_shared(actions, sourcePtr, step, GetContext(), "CS::READ::" + step->GetName(), ""); NActors::TActivationContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(constructor)); @@ -97,6 +133,29 @@ bool TPortionDataSource::DoStartFetchingColumns(const std::shared_ptr> indexBlobs; + std::set indexIds = indexChecker->GetIndexIds(); + for (auto&& i : Portion->GetIndexes()) { + if (!indexIds.contains(i.GetIndexId())) { + continue; + } + indexBlobs[i.GetIndexId()].emplace_back(StageData->ExtractBlob(i.GetBlobRange())); + } + for (auto&& i : indexIds) { + if (!indexBlobs.contains(i)) { + return; + } + } + if (!indexChecker->Check(indexBlobs)) { + NYDBTest::TControllers::GetColumnShardController()->OnIndexSelectProcessed(false); + StageData->AddFilter(NArrow::TColumnFilter::BuildDenyFilter()); + } else { + NYDBTest::TControllers::GetColumnShardController()->OnIndexSelectProcessed(true); + } + return; +} + bool TCommittedDataSource::DoStartFetchingColumns(const std::shared_ptr& sourcePtr, const std::shared_ptr& step, const std::shared_ptr& /*columns*/) { if (ReadStarted) { return false; diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h index 34f6a1fe0412..b8765b373e7a 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h @@ -46,8 +46,10 @@ class IDataSource { } virtual bool DoStartFetchingColumns(const std::shared_ptr& sourcePtr, const std::shared_ptr& step, const std::shared_ptr& columns) = 0; + virtual bool DoStartFetchingIndexes(const std::shared_ptr& sourcePtr, const std::shared_ptr& step, const std::shared_ptr& indexes) = 0; virtual void DoAssembleColumns(const std::shared_ptr& columns) = 0; virtual void DoAbort() = 0; + virtual void DoApplyIndex(const NIndexes::TIndexCheckerContainer& indexMeta) = 0; public: void SetIsReady(); @@ -55,6 +57,10 @@ class IDataSource { return GetStageData().IsEmpty(); } + void ApplyIndex(const NIndexes::TIndexCheckerContainer& indexMeta) { + return DoApplyIndex(indexMeta); + } + void AssembleColumns(const std::shared_ptr& columns) { if (columns->IsEmpty()) { return; @@ -66,6 +72,11 @@ class IDataSource { AFL_VERIFY(columns); return DoStartFetchingColumns(sourcePtr, step, columns); } + + bool StartFetchingIndexes(const std::shared_ptr& sourcePtr, const std::shared_ptr& step, const std::shared_ptr& indexes) { + AFL_VERIFY(indexes); + return DoStartFetchingIndexes(sourcePtr, step, indexes); + } void InitFetchingPlan(const std::shared_ptr& fetchingFirstStep, const std::shared_ptr& sourcePtr, const bool isExclusive); std::shared_ptr GetLastPK() const { @@ -76,6 +87,7 @@ class IDataSource { } virtual ui64 GetRawBytes(const std::set& columnIds) const = 0; + virtual ui64 GetIndexBytes(const std::set& indexIds) const = 0; bool IsMergingStarted() const { return MergingStartedFlag; @@ -155,7 +167,9 @@ class TPortionDataSource: public IDataSource { const std::shared_ptr& readingAction, THashMap& nullBlocks, const std::shared_ptr& filter); + virtual void DoApplyIndex(const NIndexes::TIndexCheckerContainer& indexChecker) override; virtual bool DoStartFetchingColumns(const std::shared_ptr& sourcePtr, const std::shared_ptr& step, const std::shared_ptr& columns) override; + virtual bool DoStartFetchingIndexes(const std::shared_ptr& sourcePtr, const std::shared_ptr& step, const std::shared_ptr& indexes) override; virtual void DoAssembleColumns(const std::shared_ptr& columns) override { auto blobSchema = GetContext()->GetReadMetadata()->GetLoadSchema(Portion->GetMinSnapshot()); MutableStageData().AddBatch(Portion->PrepareForAssemble(*blobSchema, columns->GetFilteredSchemaVerified(), MutableStageData().MutableBlobs()).AssembleTable()); @@ -173,6 +187,10 @@ class TPortionDataSource: public IDataSource { return Portion->GetRawBytes(columnIds); } + virtual ui64 GetIndexBytes(const std::set& columnIds) const override { + return Portion->GetIndexBytes(columnIds); + } + const TPortionInfo& GetPortionInfo() const { return *Portion; } @@ -199,8 +217,14 @@ class TCommittedDataSource: public IDataSource { } - virtual bool DoStartFetchingColumns(const std::shared_ptr& sourcePtr, - const std::shared_ptr& step, const std::shared_ptr& columns) override; + virtual bool DoStartFetchingColumns(const std::shared_ptr& sourcePtr, const std::shared_ptr& step, const std::shared_ptr& columns) override; + virtual bool DoStartFetchingIndexes(const std::shared_ptr& /*sourcePtr*/, const std::shared_ptr& /*step*/, const std::shared_ptr& /*indexes*/) override { + return false; + } + virtual void DoApplyIndex(const NIndexes::TIndexCheckerContainer& /*indexMeta*/) override { + return; + } + virtual void DoAssembleColumns(const std::shared_ptr& columns) override; virtual NJson::TJsonValue DoDebugJson() const override { NJson::TJsonValue result = NJson::JSON_MAP; @@ -213,6 +237,10 @@ class TCommittedDataSource: public IDataSource { return CommittedBlob.GetBlobRange().Size; } + virtual ui64 GetIndexBytes(const std::set& /*columnIds*/) const override { + return 0; + } + const TCommittedBlob& GetCommitted() const { return CommittedBlob; } diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp index 9da8c6f3d3e2..6bac1a0af456 100644 --- a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp @@ -380,6 +380,12 @@ bool TIndexInfo::DeserializeFromProto(const NKikimrSchemeOp::TColumnTableSchema& return false; } + for (const auto& idx : schema.GetIndexes()) { + NIndexes::TIndexMetaContainer meta; + AFL_VERIFY(meta.DeserializeFromProto(idx)); + Indexes.emplace(meta->GetIndexId(), meta); + } + for (const auto& col : schema.GetColumns()) { const ui32 id = col.GetId(); const TString& name = col.GetName(); diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.h b/ydb/core/tx/columnshard/engines/scheme/index_info.h index 2446df2d2a43..b4293f6078fd 100644 --- a/ydb/core/tx/columnshard/engines/scheme/index_info.h +++ b/ydb/core/tx/columnshard/engines/scheme/index_info.h @@ -7,10 +7,12 @@ #include #include +#include #include #include #include #include +#include "indexes/abstract/meta.h" namespace arrow { class Array; @@ -34,6 +36,7 @@ struct TIndexInfo : public NTable::TScheme::TTableSchema { private: THashMap ColumnFeatures; THashMap> ArrowColumnByColumnIdCache; + THashMap Indexes; TIndexInfo(const TString& name); bool DeserializeFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema); TColumnFeatures& GetOrCreateColumnFeatures(const ui32 columnId) const; @@ -41,14 +44,18 @@ struct TIndexInfo : public NTable::TScheme::TTableSchema { void BuildArrowSchema(); void InitializeCaches(); public: - static constexpr const char* SPEC_COL_PLAN_STEP = "_yql_plan_step"; - static constexpr const char* SPEC_COL_TX_ID = "_yql_tx_id"; + static constexpr const char* SPEC_COL_PLAN_STEP = NOlap::NPortion::TSpecialColumns::SPEC_COL_PLAN_STEP; + static constexpr const char* SPEC_COL_TX_ID = NOlap::NPortion::TSpecialColumns::SPEC_COL_TX_ID; static const TString STORE_INDEX_STATS_TABLE; static const TString TABLE_INDEX_STATS_TABLE; + const THashMap& GetIndexes() const { + return Indexes; + } + enum class ESpecialColumn : ui32 { - PLAN_STEP = 0xffffff00, - TX_ID, + PLAN_STEP = NOlap::NPortion::TSpecialColumns::SPEC_COL_PLAN_STEP_INDEX, + TX_ID = NOlap::NPortion::TSpecialColumns::SPEC_COL_TX_ID_INDEX }; TString DebugString() const { @@ -110,6 +117,13 @@ struct TIndexInfo : public NTable::TScheme::TTableSchema { std::shared_ptr GetColumnLoaderOptional(const ui32 columnId) const; std::shared_ptr GetColumnLoaderVerified(const ui32 columnId) const; + void AppendIndexes(std::map>>& originalData) const { + for (auto&& i : Indexes) { + std::shared_ptr chunk = i.second->BuildIndex(i.first, originalData, *this); + AFL_VERIFY(originalData.emplace(i.first, std::vector>({chunk})).second); + } + } + /// Returns an id of the column located by name. The name should exists in the schema. ui32 GetColumnId(const std::string& name) const; std::optional GetColumnIdOptional(const std::string& name) const; diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract.cpp deleted file mode 100644 index e57c82228544..000000000000 --- a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract.cpp +++ /dev/null @@ -1,10 +0,0 @@ -#include "abstract.h" -#include -#include -#include -#include -#include - -namespace NKikimr::NOlap::NIndexes { - -} // namespace NKikimr::NOlap::NIndexes \ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract.h b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract.h deleted file mode 100644 index cb2cb5665ebb..000000000000 --- a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract.h +++ /dev/null @@ -1,147 +0,0 @@ -#pragma once - -#include -#include - -#include -#include -#include -#include -#include - -#include -#include - -namespace NKikimr::NOlap { -struct TIndexInfo; -} - -namespace NKikimr::NSchemeShard { -class TOlapSchema; -class IErrorCollector; -} - -namespace NKikimr::NOlap::NIndexes { - -class IIndexChecker { -protected: - virtual bool DoCheck(std::vector&& blobs) const = 0; -public: - virtual ~IIndexChecker() = default; - bool Check(std::vector&& blobs) const { - return DoCheck(std::move(blobs)); - } -}; - -class TIndexCheckerContainer { -private: - YDB_READONLY(ui32, IndexId, 0); - YDB_READONLY_DEF(std::shared_ptr, Object); -public: - TIndexCheckerContainer(const ui32 indexId, const std::shared_ptr& object) - : IndexId(indexId) - , Object(object) { - AFL_VERIFY(IndexId); - AFL_VERIFY(Object); - } - - const IIndexChecker* operator->() const { - return Object.get(); - } -}; - -class IIndexMeta { -protected: - virtual std::shared_ptr DoBuildIndex(const ui32 indexId, std::map>>& data, const TIndexInfo& indexInfo) const = 0; - virtual std::shared_ptr DoBuildIndexChecker(const TProgramContainer& program) const = 0; - virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& proto) = 0; - virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexDescription& proto) const = 0; - -public: - using TFactory = NObjectFactory::TObjectFactory; - using TProto = NKikimrSchemeOp::TOlapIndexDescription; - - virtual ~IIndexMeta() = default; - - std::shared_ptr BuildIndex(const ui32 indexId, std::map>>& data, const TIndexInfo& indexInfo) const { - return DoBuildIndex(indexId, data, indexInfo); - } - - std::shared_ptr BuildIndexChecker(const TProgramContainer& program) const { - return DoBuildIndexChecker(program); - } - - bool DeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& proto) { - return DoDeserializeFromProto(proto); - } - - void SerializeToProto(NKikimrSchemeOp::TOlapIndexDescription& proto) const { - return DoSerializeToProto(proto); - } - - virtual TString GetClassName() const = 0; -}; - -class IIndexMetaConstructor { -protected: - virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) = 0; - virtual std::shared_ptr DoCreateIndexMeta(const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const = 0; - virtual TConclusionStatus DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexRequested& proto) = 0; - virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& proto) const = 0; -public: - using TFactory = NObjectFactory::TObjectFactory; - using TProto = NKikimrSchemeOp::TOlapIndexRequested; - - virtual ~IIndexMetaConstructor() = default; - - TConclusionStatus DeserializeFromJson(const NJson::TJsonValue& jsonInfo) { - return DoDeserializeFromJson(jsonInfo); - } - - std::shared_ptr CreateIndexMeta(const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const { - return DoCreateIndexMeta(currentSchema, errors); - } - - TConclusionStatus DeserializeFromProto(const NKikimrSchemeOp::TOlapIndexRequested& proto) { - return DoDeserializeFromProto(proto); - } - - void SerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& proto) const { - return DoSerializeToProto(proto); - } - - virtual TString GetClassName() const = 0; -}; - -class TIndexMetaContainer: public NBackgroundTasks::TInterfaceProtoContainer { -private: - using TBase = NBackgroundTasks::TInterfaceProtoContainer; - YDB_READONLY(ui32, IndexId, 0); -public: - TIndexMetaContainer() = default; - TIndexMetaContainer(const ui32 indexId, const std::shared_ptr& object) - : TBase(object) - , IndexId(indexId) - { - AFL_VERIFY(IndexId); - AFL_VERIFY(Object); - } - - bool DeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& proto) { - if (!TBase::DeserializeFromProto(proto)) { - return false; - } - IndexId = proto.GetId(); - return true; - } - - std::optional BuildIndexChecker(const TProgramContainer& program) const { - auto checker = GetObjectPtr()->BuildIndexChecker(program); - if (!checker) { - return {}; - } - return TIndexCheckerContainer(IndexId, checker); - } -}; - -} // namespace NKikimr::NOlap::NIndexes \ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/checker.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/checker.cpp new file mode 100644 index 000000000000..6e58ded13325 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/checker.cpp @@ -0,0 +1,5 @@ +#include "checker.h" + +namespace NKikimr::NOlap::NIndexes { + +} // namespace NKikimr::NOlap::NIndexes \ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/checker.h b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/checker.h new file mode 100644 index 000000000000..6258ad293389 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/checker.h @@ -0,0 +1,49 @@ +#pragma once +#include +#include +#include +#include + +namespace NKikimr::NOlap::NIndexes { + +class IIndexChecker { +protected: + virtual bool DoCheck(const THashMap>& blobs) const = 0; + virtual bool DoDeserializeFromProto(const NKikimrSSA::TProgram::TOlapIndexChecker& proto) = 0; + virtual void DoSerializeToProto(NKikimrSSA::TProgram::TOlapIndexChecker& proto) const = 0; + virtual std::set DoGetIndexIds() const = 0; +public: + using TFactory = NObjectFactory::TObjectFactory; + using TProto = NKikimrSSA::TProgram::TOlapIndexChecker; + virtual ~IIndexChecker() = default; + bool Check(const THashMap>& blobs) const { + return DoCheck(blobs); + } + + bool DeserializeFromProto(const NKikimrSSA::TProgram::TOlapIndexChecker& proto) { + return DoDeserializeFromProto(proto); + } + + void SerializeToProto(NKikimrSSA::TProgram::TOlapIndexChecker& proto) const { + return DoSerializeToProto(proto); + } + + std::set GetIndexIds() const { + return DoGetIndexIds(); + } + + virtual TString GetClassName() const = 0; +}; + +class TIndexCheckerContainer: public NBackgroundTasks::TInterfaceProtoContainer { +private: + using TBase = NBackgroundTasks::TInterfaceProtoContainer; +public: + TIndexCheckerContainer() = default; + TIndexCheckerContainer(const std::shared_ptr& object) + : TBase(object) { + AFL_VERIFY(Object); + } +}; + +} // namespace NKikimr::NOlap::NIndexes \ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/composite.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/composite.cpp new file mode 100644 index 000000000000..c25fb8f16097 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/composite.cpp @@ -0,0 +1,5 @@ +#include "composite.h" + +namespace NKikimr::NOlap::NIndexes { + +} // namespace NKikimr::NOlap::NIndexes \ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/composite.h b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/composite.h new file mode 100644 index 000000000000..04d6646c9b63 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/composite.h @@ -0,0 +1,96 @@ +#pragma once +#include "checker.h" + +namespace NKikimr::NOlap::NIndexes { + +class TCompositeIndexChecker: public IIndexChecker { +protected: + std::vector Checkers; +protected: + virtual bool DoDeserializeFromProto(const NKikimrSSA::TProgram::TOlapIndexChecker& proto) override { + for (auto&& i : proto.GetComposite().GetChildrenCheckers()) { + TIndexCheckerContainer container; + AFL_VERIFY(container.DeserializeFromProto(i)); + Checkers.emplace_back(std::move(container)); + } + return true; + } + virtual void DoSerializeToProto(NKikimrSSA::TProgram::TOlapIndexChecker& proto) const override { + for (auto&& i : Checkers) { + i.SerializeToProto(*proto.MutableComposite()->AddChildrenCheckers()); + } + } + virtual std::set DoGetIndexIds() const override { + std::set result; + for (auto&& i : Checkers) { + auto ids = i->GetIndexIds(); + result.insert(ids.begin(), ids.end()); + } + return result; + } +public: + TCompositeIndexChecker() = default; + TCompositeIndexChecker(const std::vector& checkers) + : Checkers(checkers) { + + } + TCompositeIndexChecker(const std::vector>& checkers) { + for (auto&& i : checkers) { + Checkers.emplace_back(i); + } + } +}; + +class TAndIndexChecker: public TCompositeIndexChecker { +private: + using TBase = TCompositeIndexChecker; +public: + static TString GetClassNameStatic() { + return "AND_FILTERS"; + } +private: + static inline auto Registrator = TFactory::TRegistrator(GetClassNameStatic()); +protected: + virtual bool DoCheck(const THashMap>& blobsByIndexId) const override { + for (auto&& i : Checkers) { + if (!i->Check(blobsByIndexId)) { + return false; + } + } + return true; + } +public: + using TBase::TBase; + + virtual TString GetClassName() const override { + return GetClassNameStatic(); + } +}; + +class TOrIndexChecker: public TCompositeIndexChecker { +private: + using TBase = TCompositeIndexChecker; +public: + static TString GetClassNameStatic() { + return "OR_FILTERS"; + } +private: + static inline auto Registrator = TFactory::TRegistrator(GetClassNameStatic()); +protected: + virtual bool DoCheck(const THashMap>& blobsByIndexId) const override { + for (auto&& i : Checkers) { + if (i->Check(blobsByIndexId)) { + return true; + } + } + return false; + } +public: + using TBase::TBase; + + virtual TString GetClassName() const override { + return GetClassNameStatic(); + } +}; + +} // namespace NKikimr::NOlap::NIndexes \ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/constructor.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/constructor.cpp new file mode 100644 index 000000000000..a93507bec06f --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/constructor.cpp @@ -0,0 +1,5 @@ +#include "constructor.h" + +namespace NKikimr::NOlap::NIndexes { + +} // namespace NKikimr::NOlap::NIndexes \ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/constructor.h b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/constructor.h new file mode 100644 index 000000000000..d8c820cffa2c --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/constructor.h @@ -0,0 +1,48 @@ +#pragma once +#include "meta.h" + +#include +#include +#include + +#include + +namespace NKikimr::NSchemeShard { +class TOlapSchema; +} + +namespace NKikimr::NOlap::NIndexes { + +class IIndexMetaConstructor { +protected: + virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) = 0; + virtual std::shared_ptr DoCreateIndexMeta(const ui32 indexId, const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const = 0; + virtual TConclusionStatus DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexRequested& proto) = 0; + virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& proto) const = 0; +public: + using TFactory = NObjectFactory::TObjectFactory; + using TProto = NKikimrSchemeOp::TOlapIndexRequested; + + virtual ~IIndexMetaConstructor() = default; + + TConclusionStatus DeserializeFromJson(const NJson::TJsonValue& jsonInfo) { + return DoDeserializeFromJson(jsonInfo); + } + + std::shared_ptr CreateIndexMeta(const ui32 indexId, const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const { + return DoCreateIndexMeta(indexId, currentSchema, errors); + } + + TConclusionStatus DeserializeFromProto(const NKikimrSchemeOp::TOlapIndexRequested& proto) { + return DoDeserializeFromProto(proto); + } + + void SerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& proto) const { + return DoSerializeToProto(proto); + } + + virtual TString GetClassName() const = 0; +}; + + +} // namespace NKikimr::NOlap::NIndexes \ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.cpp new file mode 100644 index 000000000000..c64d9b11a298 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.cpp @@ -0,0 +1,40 @@ +#include "meta.h" +#include +#include +#include +#include +#include +#include + +namespace NKikimr::NOlap::NIndexes { + +void TPortionIndexChunk::DoAddIntoPortion(const TBlobRange& bRange, TPortionInfo& portionInfo) const { + portionInfo.AddIndex(TIndexChunk(GetEntityId(), GetChunkIdx(), bRange)); +} + +std::shared_ptr TIndexByColumns::DoBuildIndex(const ui32 indexId, std::map>>& data, const TIndexInfo& indexInfo) const { + std::vector columnReaders; + for (auto&& i : ColumnIds) { + auto it = data.find(i); + AFL_VERIFY(it != data.end()); + columnReaders.emplace_back(it->second, indexInfo.GetColumnLoaderVerified(i)); + } + TChunkedBatchReader reader(std::move(columnReaders)); + std::shared_ptr indexBatch = DoBuildIndexImpl(reader); + const TString indexData = TColumnSaver(nullptr, Serializer).Apply(indexBatch); + return std::make_shared(indexId, indexData); +} + +bool TIndexByColumns::DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& /*proto*/) { + Serializer = std::make_shared(arrow::ipc::IpcWriteOptions::Defaults()); + return true; +} + +TIndexByColumns::TIndexByColumns(const ui32 indexId, const std::set& columnIds) + : TBase(indexId) + , ColumnIds(columnIds) +{ + Serializer = std::make_shared(arrow::ipc::IpcWriteOptions::Defaults()); +} + +} // namespace NKikimr::NOlap::NIndexes \ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.h b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.h new file mode 100644 index 000000000000..d80da770d1b6 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.h @@ -0,0 +1,134 @@ +#pragma once +#include "checker.h" +#include "program.h" + +#include +#include +#include + +#include + +namespace NYql::NNodes { +class TExprBase; +} + +namespace NKikimr::NOlap { +struct TIndexInfo; +class TProgramContainer; +} + +namespace NKikimr::NSchemeShard { +class TOlapSchema; +} + +namespace NKikimr::NOlap::NIndexes { + +class IIndexMeta { +private: + YDB_READONLY(ui32, IndexId, 0); +protected: + virtual std::shared_ptr DoBuildIndex(const ui32 indexId, std::map>>& data, const TIndexInfo& indexInfo) const = 0; + virtual void DoFillIndexCheckers(const std::shared_ptr& info, const NSchemeShard::TOlapSchema& schema) const = 0; + virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& proto) = 0; + virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexDescription& proto) const = 0; + +public: + using TFactory = NObjectFactory::TObjectFactory; + using TProto = NKikimrSchemeOp::TOlapIndexDescription; + + IIndexMeta() = default; + IIndexMeta(const ui32 indexId) + : IndexId(indexId) + { + + } + + virtual ~IIndexMeta() = default; + + std::shared_ptr BuildIndex(const ui32 indexId, std::map>>& data, const TIndexInfo& indexInfo) const { + return DoBuildIndex(indexId, data, indexInfo); + } + + void FillIndexCheckers(const std::shared_ptr& info, const NSchemeShard::TOlapSchema& schema) const { + return DoFillIndexCheckers(info, schema); + } + + bool DeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& proto) { + IndexId = proto.GetId(); + AFL_VERIFY(IndexId); + return DoDeserializeFromProto(proto); + } + + void SerializeToProto(NKikimrSchemeOp::TOlapIndexDescription& proto) const { + AFL_VERIFY(IndexId); + proto.SetId(IndexId); + return DoSerializeToProto(proto); + } + + virtual TString GetClassName() const = 0; +}; + +class TIndexMetaContainer: public NBackgroundTasks::TInterfaceProtoContainer { +private: + using TBase = NBackgroundTasks::TInterfaceProtoContainer; +public: + TIndexMetaContainer() = default; + TIndexMetaContainer(const std::shared_ptr& object) + : TBase(object) + { + AFL_VERIFY(Object); + } +}; + +class TPortionIndexChunk: public IPortionDataChunk { +private: + using TBase = IPortionDataChunk; + const TString Data; +protected: + virtual const TString& DoGetData() const override { + return Data; + } + virtual TString DoDebugString() const override { + return ""; + } + virtual std::vector> DoInternalSplit(const TColumnSaver& /*saver*/, const std::shared_ptr& /*counters*/, const std::vector& /*splitSizes*/) const override { + return {}; + } + virtual bool DoIsSplittable() const override { + return false; + } + virtual std::optional DoGetRecordsCount() const override { + return {}; + } + virtual std::shared_ptr DoGetFirstScalar() const override { + return nullptr; + } + virtual std::shared_ptr DoGetLastScalar() const override { + return nullptr; + } + virtual void DoAddIntoPortion(const TBlobRange& bRange, TPortionInfo& portionInfo) const override; +public: + TPortionIndexChunk(const ui32 entityId, const TString& data) + : TBase(entityId, 0) + , Data(data) + { + } + +}; + +class TIndexByColumns: public IIndexMeta { +private: + using TBase = IIndexMeta; + std::shared_ptr Serializer; +protected: + std::set ColumnIds; + virtual std::shared_ptr DoBuildIndexImpl(TChunkedBatchReader& reader) const = 0; + + virtual std::shared_ptr DoBuildIndex(const ui32 indexId, std::map>>& data, const TIndexInfo& indexInfo) const override final; + virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& /*proto*/) override; +public: + TIndexByColumns() = default; + TIndexByColumns(const ui32 indexId, const std::set& columnIds); +}; + +} // namespace NKikimr::NOlap::NIndexes \ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/program.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/program.cpp new file mode 100644 index 000000000000..d16f5fcfb33f --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/program.cpp @@ -0,0 +1,512 @@ +#include "program.h" +#include "composite.h" +#include + +namespace NKikimr::NOlap::NIndexes::NRequest { + +class IRequestNode { +protected: + TString Name; + std::vector> Children; + IRequestNode* Parent = nullptr; + virtual bool DoCollapse() = 0; + + virtual NJson::TJsonValue DoSerializeToJson() const = 0; + virtual std::shared_ptr DoCopy() const = 0; + +public: + template + T* FindFirst() const { + for (auto&& c : Children) { + if (auto* result = c->As()) { + return result; + } + } + return nullptr; + } + + std::shared_ptr Copy() const { + auto selfCopy = DoCopy(); + selfCopy->Parent = nullptr; + selfCopy->Name = GetNextId(Name); + AFL_VERIFY(selfCopy); + for (auto&& i : Children) { + selfCopy->Children.emplace_back(i->Copy()); + } + for (auto&& i : selfCopy->Children) { + i->Parent = selfCopy.get(); + } + return selfCopy; + } + + const TString& GetName() const { + return Name; + } + const std::vector>& GetChildren() const { + return Children; + } + + static TString GetNextId(const TString& originalName) { + static TAtomic Counter = 0; + TStringBuf sb(originalName.data(), originalName.size()); + TStringBuf left; + TStringBuf right; + if (sb.TrySplit('$', left, right)) { + return TString(left.data(), left.size()) + "$" + ::ToString(AtomicIncrement(Counter)); + } else { + return originalName + "$" + ::ToString(AtomicIncrement(Counter)); + } + } + + IRequestNode(const TString& name) + : Name(name) { + + } + + IRequestNode(const std::string& name) + : Name(name.data(), name.size()) { + + } + + IRequestNode(const char* name) + : Name(name) { + + } + + virtual ~IRequestNode() = default; + + template + bool Is() const { + return dynamic_cast(this); + } + + template + T* As() { + return dynamic_cast(this); + } + + void RemoveChildren(const TString& name) { + auto nameCopy = name; + const auto pred = [nameCopy](const std::shared_ptr& child) { + if (child->GetNodeName() == nameCopy) { + child->Parent = nullptr; + return true; + } else { + return false; + } + }; + const ui32 sizeBefore = Children.size(); + Children.erase(std::remove_if(Children.begin(), Children.end(), pred), Children.end()); + AFL_VERIFY(sizeBefore == Children.size() + 1); + } + + const TString& GetNodeName() const { + return Name; + } + + virtual bool Collapse() { + for (auto&& i : Children) { + if (i->Collapse()) { + return true; + } + } + if (DoCollapse()) { + return true; + } + return false; + } + + void Attach(const std::vector>& children) { + auto copy = children; + for (auto&& c : copy) { + Attach(c); + } + } + + void Attach(const std::shared_ptr& children) { + auto copy = children; + if (copy->Parent) { + copy->Parent->RemoveChildren(copy->GetNodeName()); + } + copy->Parent = this; + for (auto&& i : Children) { + AFL_VERIFY(i->GetName() != copy->GetName()); + } + Children.emplace_back(copy); + } + + void Exchange(const TString& name, const std::shared_ptr& children) { + auto copy = children; + for (auto&& i : Children) { + if (i->GetName() == name) { + i = copy; + i->Parent = this; + return; + } + } + AFL_VERIFY(false); + } + + NJson::TJsonValue SerializeToJson() const { + NJson::TJsonValue result = NJson::JSON_MAP; + result.InsertValue(Name, DoSerializeToJson()); + if (Children.size()) { + auto& childrenJson = result.InsertValue("children", NJson::JSON_ARRAY); + for (auto&& i : Children) { + childrenJson.AppendValue(i->SerializeToJson()); + } + } + return result; + } +}; + +class TConstantNode: public IRequestNode { +private: + using TBase = IRequestNode; + YDB_READONLY_DEF(std::shared_ptr, Constant); +protected: + virtual NJson::TJsonValue DoSerializeToJson() const override { + NJson::TJsonValue result = NJson::JSON_MAP; + result.InsertValue("type", "const"); + result.InsertValue("const", Constant->ToString()); + return result; + } + virtual bool DoCollapse() override { + return false; + } + virtual std::shared_ptr DoCopy() const override { + return std::make_shared(GetName(), Constant); + } +public: + TConstantNode(const std::string& name, const std::shared_ptr& constant) + : TBase(name) + , Constant(constant) { + } +}; + +class TRootNode: public IRequestNode { +private: + using TBase = IRequestNode; +protected: + virtual bool DoCollapse() override { + return false; + } + virtual NJson::TJsonValue DoSerializeToJson() const override { + NJson::TJsonValue result = NJson::JSON_MAP; + result.InsertValue("type", "ROOT"); + return result; + } + + virtual std::shared_ptr DoCopy() const override { + return nullptr; + } +public: + TRootNode() + : TBase("ROOT") { + + } +}; + +class TOriginalColumn: public IRequestNode { +private: + using TBase = IRequestNode; + YDB_READONLY_DEF(TString, ColumnName); +protected: + virtual bool DoCollapse() override { + return false; + } + virtual NJson::TJsonValue DoSerializeToJson() const override { + NJson::TJsonValue result = NJson::JSON_MAP; + result.InsertValue("type", "column"); + result.InsertValue("column_name", ColumnName); + return result; + } + virtual std::shared_ptr DoCopy() const override { + return std::make_shared(GetName()); + } +public: + TOriginalColumn(const std::string& columnName) + : TBase(GetNextId(TString(columnName.data(), columnName.size()))) + , ColumnName(columnName.data(), columnName.size()) { + + } +}; + +class TPackAnd: public IRequestNode { +private: + using TBase = IRequestNode; + THashMap> Conditions; + bool IsEmptyFlag = false; +protected: + virtual bool DoCollapse() override { + return false; + } + virtual NJson::TJsonValue DoSerializeToJson() const override { + NJson::TJsonValue result = NJson::JSON_MAP; + result.InsertValue("type", "pack_and"); + if (IsEmptyFlag) { + result.InsertValue("empty", true); + } + auto& arrJson = result.InsertValue("conditions", NJson::JSON_ARRAY); + for (auto&& i : Conditions) { + auto& jsonCondition = arrJson.AppendValue(NJson::JSON_MAP); + jsonCondition.InsertValue(i.first, i.second->ToString()); + } + return result; + } + virtual std::shared_ptr DoCopy() const override { + return std::make_shared(*this); + } +public: + TPackAnd(const TPackAnd&) = default; + TPackAnd(const TString& cName, const std::shared_ptr& value) + : TBase(GetNextId("PackAnd")) { + AddCondition(cName, value); + } + + const THashMap>& GetEquals() const { + return Conditions; + } + + bool IsEmpty() const { + return IsEmptyFlag; + } + void AddCondition(const TString& cName, const std::shared_ptr& value) { + AFL_VERIFY(value); + auto it = Conditions.find(cName); + if (it == Conditions.end()) { + Conditions.emplace(cName, value); + } else if (it->second->Equals(*value)) { + return; + } else { + IsEmptyFlag = true; + } + } + void Merge(const TPackAnd& add) { + for (auto&& i : add.Conditions) { + AddCondition(i.first, i.second); + } + } +}; + +class TOperationNode: public IRequestNode { +private: + using TBase = IRequestNode; + NYql::TKernelRequestBuilder::EBinaryOp Operation; +protected: + virtual NJson::TJsonValue DoSerializeToJson() const override { + NJson::TJsonValue result = NJson::JSON_MAP; + result.InsertValue("type", "operation"); + result.InsertValue("operation", ::ToString(Operation)); + return result; + } + + virtual bool DoCollapse() override { + if (Operation == NYql::TKernelRequestBuilder::EBinaryOp::Coalesce) { + AFL_VERIFY(Children.size() == 2); + AFL_VERIFY(Children[1]->Is()); + Parent->Attach(Children[0]); + Parent->RemoveChildren(GetNodeName()); + return true; + } + if (Operation == NYql::TKernelRequestBuilder::EBinaryOp::Equals && Children.size() == 2 && Children[1]->Is() && Children[0]->Is()) { + Parent->Exchange(GetNodeName(), std::make_shared(Children[0]->As()->GetColumnName(), Children[1]->As()->GetConstant())); + return true; + } + if (Operation == NYql::TKernelRequestBuilder::EBinaryOp::And) { + if (Parent->Is() && Parent->As()->Operation == NYql::TKernelRequestBuilder::EBinaryOp::And) { + Parent->Attach(Children); + Parent->RemoveChildren(GetNodeName()); + return true; + } + } + if (Operation == NYql::TKernelRequestBuilder::EBinaryOp::Or) { + if (Parent->Is() && Parent->As()->Operation == NYql::TKernelRequestBuilder::EBinaryOp::Or) { + Parent->Attach(Children); + Parent->RemoveChildren(GetNodeName()); + return true; + } + } + if (Operation == NYql::TKernelRequestBuilder::EBinaryOp::And) { + auto copy = Children; + TPackAnd* baseSet = nullptr; + bool changed = false; + for (auto&& c : copy) { + if (c->Is()) { + if (baseSet) { + baseSet->Merge(*c->As()); + RemoveChildren(c->GetNodeName()); + changed = true; + } else { + baseSet = c->As(); + } + } + } + if (changed) { + return true; + } + } + + if (Operation == NYql::TKernelRequestBuilder::EBinaryOp::And && Children.size() == 1) { + AFL_VERIFY(Children.front()->Is()); + Parent->Exchange(GetNodeName(), Children.front()); + return true; + } + + if (Operation == NYql::TKernelRequestBuilder::EBinaryOp::And) { + std::vector> newNodes; + std::set cNames; + for (auto&& i : Children) { + if (i->Is() && i->As()->Operation == NYql::TKernelRequestBuilder::EBinaryOp::Or) { + auto orNode = i; + RemoveChildren(i->GetNodeName()); + auto copy = orNode->GetChildren(); + auto copyChildren = Children; + for (auto&& orNodeChildren : copy) { + std::vector> producedChildren; + for (auto&& c : copyChildren) { + producedChildren.emplace_back(c->Copy()); + } + producedChildren.emplace_back(orNodeChildren->Copy()); + newNodes.emplace_back(std::make_shared(GetNextId(Name), NYql::TKernelRequestBuilder::EBinaryOp::And, producedChildren)); + } + Parent->Exchange(GetNodeName(), std::make_shared(GetNextId(orNode->GetName()), NYql::TKernelRequestBuilder::EBinaryOp::Or, newNodes)); + return true; + } + } + } + return false; + } + virtual std::shared_ptr DoCopy() const override { + std::vector> children; + return std::make_shared(GetName(), Operation, children); + } +public: + NYql::TKernelRequestBuilder::EBinaryOp GetOperation() const { + return Operation; + } + + TOperationNode(const std::string& name, const NYql::TKernelRequestBuilder::EBinaryOp& operation, const std::vector>& args) + : TBase(name) + , Operation(operation) { + for (auto&& i : args) { + Attach(i); + } + } +}; + +class TNormalForm { +private: + std::map> Nodes; +public: + TNormalForm() = default; + + bool Add(const NSsa::TAssign& assign) { + std::vector> argNodes; + for (auto&& arg : assign.GetArguments()) { + if (arg.IsGenerated()) { + auto it = Nodes.find(arg.GetColumnName()); + AFL_VERIFY(it != Nodes.end()); + argNodes.emplace_back(it->second); + } else { + argNodes.emplace_back(std::make_shared(arg.GetColumnName())); + } + } + for (auto&& i : argNodes) { + Nodes.erase(i->GetNodeName()); + } + + if (assign.IsConstant()) { + AFL_VERIFY(argNodes.size() == 0); + Nodes.emplace(assign.GetName(), std::make_shared(assign.GetName(), assign.GetConstant())); + } else if (!!assign.GetYqlOperationId()) { + Nodes.emplace(assign.GetName(), std::make_shared(assign.GetName(), (NYql::TKernelRequestBuilder::EBinaryOp)*assign.GetYqlOperationId(), argNodes)); + } else { + return false; + } + return true; + } + + std::shared_ptr GetRootNode() { + if (Nodes.empty()) { + return nullptr; + } + AFL_VERIFY(Nodes.size() == 1); + auto result = std::make_shared(); + result->Attach(Nodes.begin()->second); + return result; + } +}; + +std::shared_ptr TDataForIndexesCheckers::Build(const TProgramContainer& program) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("program", program.DebugString()); + auto fStep = program.GetSteps().front(); + TNormalForm nForm; + for (auto&& s : fStep->GetAssignes()) { + if (!nForm.Add(s)) { + return nullptr; + } + } + auto rootNode = nForm.GetRootNode(); + if (!rootNode) { + return nullptr; + } + while (rootNode->Collapse()) { + } + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("collapsed_program", rootNode->SerializeToJson()); + if (rootNode->GetChildren().size() != 1) { + return nullptr; + } + std::shared_ptr result = std::make_shared(); + if (auto* orNode = rootNode->GetChildren().front()->As()) { + if (orNode->GetOperation() == NYql::TKernelRequestBuilder::EBinaryOp::Or) { + for (auto&& i : orNode->GetChildren()) { + if (auto* andPackNode = i->As()) { + result->AddBranch(andPackNode->GetEquals()); + } else if (auto* operationNode = i->As()) { + if (operationNode->GetOperation() == NYql::TKernelRequestBuilder::EBinaryOp::And) { + TPackAnd* pack = operationNode->FindFirst(); + if (!pack) { + return nullptr; + } + result->AddBranch(pack->GetEquals()); + } + } else { + return nullptr; + } + } + } + } else if (auto* andPackNode = rootNode->GetChildren().front()->As()) { + result->AddBranch(andPackNode->GetEquals()); + } else { + return nullptr; + } + return result; +} + +TIndexCheckerContainer TDataForIndexesCheckers::GetCoverChecker() const { + std::vector> andCheckers; + for (auto&& i : Branches) { + auto andChecker = i->GetAndChecker(); + if (!andChecker) { + return TIndexCheckerContainer(); + } + andCheckers.emplace_back(andChecker); + } + if (andCheckers.size() == 0) { + return TIndexCheckerContainer(); + } else if (andCheckers.size() == 1) { + return andCheckers.front(); + } else { + return TIndexCheckerContainer(std::make_shared(andCheckers)); + } +} + +std::shared_ptr TBranchCoverage::GetAndChecker() const { + if (Indexes.empty()) { + return nullptr; + } + return std::make_shared(Indexes); +} + +} // namespace NKikimr::NOlap::NIndexes::NRequest \ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/program.h b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/program.h new file mode 100644 index 000000000000..898c4210b035 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/program.h @@ -0,0 +1,37 @@ +#pragma once +#include + +namespace NKikimr::NOlap::NIndexes::NRequest { + +class TBranchCoverage { +private: + THashMap> Equals; + YDB_ACCESSOR_DEF(std::vector>, Indexes); +public: + TBranchCoverage(const THashMap>& equals) + : Equals(equals) + { + + } + + const THashMap>& GetEquals() const { + return Equals; + } + + std::shared_ptr GetAndChecker() const; +}; + +class TDataForIndexesCheckers { +private: + YDB_READONLY_DEF(std::vector>, Branches); +public: + void AddBranch(const THashMap>& equalsData) { + Branches.emplace_back(std::make_shared(equalsData)); + } + + static std::shared_ptr Build(const TProgramContainer& program); + + TIndexCheckerContainer GetCoverChecker() const; +}; + +} // namespace NKikimr::NOlap::NIndexes::NRequest \ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/simple.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/simple.cpp new file mode 100644 index 000000000000..3c80059ff05b --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/simple.cpp @@ -0,0 +1,5 @@ +#include "simple.h" + +namespace NKikimr::NOlap::NIndexes { + +} // namespace NKikimr::NOlap::NIndexes \ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/simple.h b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/simple.h new file mode 100644 index 000000000000..6fa589899a10 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/simple.h @@ -0,0 +1,42 @@ +#pragma once +#include "checker.h" + +namespace NKikimr::NOlap::NIndexes { + +class TSimpleIndexChecker: public IIndexChecker { +private: + YDB_READONLY(ui32, IndexId, 0); +protected: + virtual bool DoCheckImpl(const std::vector& blobs) const = 0; + + virtual bool DoCheck(const THashMap>& blobs) const override final { + auto it = blobs.find(IndexId); + AFL_VERIFY(it != blobs.end()); + return DoCheckImpl(std::move(it->second)); + } + virtual bool DoDeserializeFromProtoImpl(const NKikimrSSA::TProgram::TOlapIndexChecker& proto) = 0; + virtual void DoSerializeToProtoImpl(NKikimrSSA::TProgram::TOlapIndexChecker& proto) const = 0; + + virtual bool DoDeserializeFromProto(const NKikimrSSA::TProgram::TOlapIndexChecker& proto) override final { + IndexId = proto.GetIndexId(); + AFL_VERIFY(IndexId); + return DoDeserializeFromProtoImpl(proto); + } + virtual void DoSerializeToProto(NKikimrSSA::TProgram::TOlapIndexChecker& proto) const override final { + AFL_VERIFY(IndexId); + proto.SetIndexId(IndexId); + return DoSerializeToProtoImpl(proto); + } + virtual std::set DoGetIndexIds() const override final { + return {IndexId}; + } +public: + TSimpleIndexChecker() = default; + TSimpleIndexChecker(const ui32 indexId) + : IndexId(indexId) + { + + } +}; + +} // namespace NKikimr::NOlap::NIndexes \ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/ya.make b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/ya.make new file mode 100644 index 000000000000..f5babe949751 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/ya.make @@ -0,0 +1,19 @@ +LIBRARY() + +SRCS( + constructor.cpp + meta.cpp + checker.cpp + program.cpp + GLOBAL composite.cpp + simple.cpp +) + +PEERDIR( + ydb/core/protos + ydb/core/formats/arrow +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/checker.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/checker.cpp new file mode 100644 index 000000000000..ac368bf8a467 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/checker.cpp @@ -0,0 +1,49 @@ +#include "checker.h" +#include +#include +#include +#include + +namespace NKikimr::NOlap::NIndexes { + +void TBloomFilterChecker::DoSerializeToProtoImpl(NKikimrSSA::TProgram::TOlapIndexChecker& proto) const { + for (auto&& i : HashValues) { + proto.MutableBloomFilter()->AddHashValues(i); + } +} + +bool TBloomFilterChecker::DoCheckImpl(const std::vector& blobs) const { + for (auto&& blob : blobs) { + auto rb = NArrow::TStatusValidator::GetValid(NArrow::NSerialization::TFullDataDeserializer().Deserialize(blob)); + AFL_VERIFY(rb); + AFL_VERIFY(rb->schema()->num_fields() == 1); + AFL_VERIFY(rb->schema()->field(0)->type()->id() == arrow::Type::BOOL); + auto& bArray = static_cast(*rb->column(0)); + bool found = true; + for (auto&& i : HashValues) { + if (!bArray.Value(i % bArray.length())) { + found = false; + break; + } + } + if (found) { + return true; + } + } + return false; +} + +bool TBloomFilterChecker::DoDeserializeFromProtoImpl(const NKikimrSSA::TProgram::TOlapIndexChecker& proto) { + if (!proto.HasBloomFilter()) { + return false; + } + for (auto&& i : proto.GetBloomFilter().GetHashValues()) { + HashValues.emplace(i); + } + if (HashValues.empty()) { + return false; + } + return true; +} + +} // namespace NKikimr::NOlap::NIndexes \ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/checker.h b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/checker.h new file mode 100644 index 000000000000..92ecf9534d29 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/checker.h @@ -0,0 +1,32 @@ +#pragma once +#include +namespace NKikimr::NOlap::NIndexes { + +class TBloomFilterChecker: public TSimpleIndexChecker { +public: + static TString GetClassNameStatic() { + return "BLOOM_FILTER"; + } +private: + using TBase = TSimpleIndexChecker; + std::set HashValues; + static inline auto Registrator = TFactory::TRegistrator(GetClassNameStatic()); +protected: + virtual bool DoDeserializeFromProtoImpl(const NKikimrSSA::TProgram::TOlapIndexChecker& proto) override; + virtual void DoSerializeToProtoImpl(NKikimrSSA::TProgram::TOlapIndexChecker& proto) const override; + + virtual bool DoCheckImpl(const std::vector& blobs) const override; +public: + TBloomFilterChecker() = default; + TBloomFilterChecker(const ui32 indexId, std::set&& hashes) + : TBase(indexId) + , HashValues(std::move(hashes)) + { + + } + virtual TString GetClassName() const override { + return GetClassNameStatic(); + } +}; + +} // namespace NKikimr::NOlap::NIndexes \ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/bloom.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/constructor.cpp similarity index 70% rename from ydb/core/tx/columnshard/engines/scheme/indexes/bloom.cpp rename to ydb/core/tx/columnshard/engines/scheme/indexes/bloom/constructor.cpp index 62b8d5ddc259..4ea787eb7d94 100644 --- a/ydb/core/tx/columnshard/engines/scheme/indexes/bloom.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/constructor.cpp @@ -1,35 +1,11 @@ -#include "bloom.h" -#include -#include +#include "constructor.h" +#include "meta.h" + #include -#include namespace NKikimr::NOlap::NIndexes { -std::shared_ptr TBloomIndexMeta::DoBuildIndexImpl(TChunkedBatchReader& reader) const { - std::vector flags; - flags.resize(BitsCount, false); - for (ui32 i = 0; i < HashesCount; ++i) { - NArrow::NHash::NXX64::TStreamStringHashCalcer hashCalcer(3 * i); - for (; reader.IsCorrect(); reader.ReadNext()) { - hashCalcer.Start(); - for (auto&& i : reader) { - NArrow::NHash::TXX64::AppendField(i.GetCurrentChunk(), i.GetCurrentRecordIndex(), hashCalcer); - } - flags[hashCalcer.Finish() % BitsCount] = true; - } - } - - arrow::BooleanBuilder builder; - auto res = builder.Reserve(flags.size()); - NArrow::TStatusValidator::Validate(builder.AppendValues(flags)); - std::shared_ptr out; - NArrow::TStatusValidator::Validate(builder.Finish(&out)); - - return arrow::RecordBatch::Make(ResultSchema, BitsCount, {out}); -} - -std::shared_ptr TBloomIndexConstructor::DoCreateIndexMeta(const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const { +std::shared_ptr TBloomIndexConstructor::DoCreateIndexMeta(const ui32 indexId, const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const { std::set columnIds; for (auto&& i : ColumnNames) { auto* columnInfo = currentSchema.GetColumns().GetByName(i); @@ -39,7 +15,7 @@ std::shared_ptr TBloomIndexConstructor::Do } AFL_VERIFY(columnIds.emplace(columnInfo->GetId()).second); } - return std::make_shared(columnIds, FalsePositiveProbability); + return std::make_shared(indexId, columnIds, FalsePositiveProbability); } NKikimr::TConclusionStatus TBloomIndexConstructor::DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) { diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/constructor.h b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/constructor.h new file mode 100644 index 000000000000..0aeb4dd9809c --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/constructor.h @@ -0,0 +1,30 @@ +#pragma once +#include +namespace NKikimr::NOlap::NIndexes { + +class TBloomIndexConstructor: public IIndexMetaConstructor { +public: + static TString GetClassNameStatic() { + return "BLOOM_FILTER"; + } +private: + std::set ColumnNames; + double FalsePositiveProbability = 0.1; + static inline auto Registrator = TFactory::TRegistrator(GetClassNameStatic()); +protected: + virtual std::shared_ptr DoCreateIndexMeta(const ui32 indexId, const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const override; + + virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) override; + + virtual TConclusionStatus DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexRequested& proto) override; + virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& proto) const override; + +public: + TBloomIndexConstructor() = default; + + virtual TString GetClassName() const override { + return GetClassNameStatic(); + } +}; + +} // namespace NKikimr::NOlap::NIndexes \ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/meta.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/meta.cpp new file mode 100644 index 000000000000..f24100e81d32 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/meta.cpp @@ -0,0 +1,67 @@ +#include "meta.h" +#include "checker.h" +#include +#include +#include +#include + +#include +#include + +namespace NKikimr::NOlap::NIndexes { + +std::shared_ptr TBloomIndexMeta::DoBuildIndexImpl(TChunkedBatchReader& reader) const { + std::vector flags; + flags.resize(BitsCount, false); + for (ui32 i = 0; i < HashesCount; ++i) { + NArrow::NHash::NXX64::TStreamStringHashCalcer hashCalcer(3 * i); + for (; reader.IsCorrect(); reader.ReadNext()) { + hashCalcer.Start(); + for (auto&& i : reader) { + NArrow::NHash::TXX64::AppendField(i.GetCurrentChunk(), i.GetCurrentRecordIndex(), hashCalcer); + } + flags[hashCalcer.Finish() % BitsCount] = true; + } + } + + arrow::BooleanBuilder builder; + auto res = builder.Reserve(flags.size()); + NArrow::TStatusValidator::Validate(builder.AppendValues(flags)); + std::shared_ptr out; + NArrow::TStatusValidator::Validate(builder.Finish(&out)); + + return arrow::RecordBatch::Make(ResultSchema, BitsCount, {out}); +} + +void TBloomIndexMeta::DoFillIndexCheckers(const std::shared_ptr& info, const NSchemeShard::TOlapSchema& schema) const { + for (auto&& branch : info->GetBranches()) { + std::map> foundColumns; + for (auto&& cId : ColumnIds) { + auto c = schema.GetColumns().GetById(cId); + if (!c) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("error", "incorrect index column")("id", cId); + return; + } + auto itEqual = branch->GetEquals().find(c->GetName()); + if (itEqual == branch->GetEquals().end()) { + break; + } + foundColumns.emplace(cId, itEqual->second); + } + if (foundColumns.size() != ColumnIds.size()) { + continue; + } + std::set hashes; + for (ui32 i = 0; i < HashesCount; ++i) { + NArrow::NHash::NXX64::TStreamStringHashCalcer calcer(3 * i); + calcer.Start(); + for (auto&& i : foundColumns) { + NArrow::NHash::TXX64::AppendField(i.second, calcer); + } + hashes.emplace(calcer.Finish()); + } + branch->MutableIndexes().emplace_back(std::make_shared(GetIndexId(), std::move(hashes))); + } +} + +} // namespace NKikimr::NOlap::NIndexes \ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/bloom.h b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/meta.h similarity index 51% rename from ydb/core/tx/columnshard/engines/scheme/indexes/bloom.h rename to ydb/core/tx/columnshard/engines/scheme/indexes/bloom/meta.h index 15ed36ff3e56..bc4db867d607 100644 --- a/ydb/core/tx/columnshard/engines/scheme/indexes/bloom.h +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/meta.h @@ -1,45 +1,7 @@ #pragma once -#include "abstract.h" - -#include -#include - -#include -#include -#include -#include -#include - -#include -#include - +#include namespace NKikimr::NOlap::NIndexes { -class TBloomIndexConstructor: public IIndexMetaConstructor { -public: - static TString GetClassNameStatic() { - return "BLOOM_FILTER"; - } -private: - std::set ColumnNames; - double FalsePositiveProbability = 0.1; - static inline auto Registrator = TFactory::TRegistrator(GetClassNameStatic()); -protected: - virtual std::shared_ptr DoCreateIndexMeta(const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const override; - - virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) override; - - virtual TConclusionStatus DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexRequested& proto) override; - virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& proto) const override; - -public: - TBloomIndexConstructor() = default; - - virtual TString GetClassName() const override { - return GetClassNameStatic(); - } -}; - class TBloomIndexMeta: public TIndexByColumns { public: static TString GetClassNameStatic() { @@ -54,18 +16,20 @@ class TBloomIndexMeta: public TIndexByColumns { ui32 BitsCount = 0; static inline auto Registrator = TFactory::TRegistrator(GetClassNameStatic()); void Initialize() { + AFL_VERIFY(!ResultSchema); + std::vector> fields = {std::make_shared("", arrow::TypeTraits::type_singleton())}; + ResultSchema = std::make_shared(fields); AFL_VERIFY(FalsePositiveProbability < 1 && FalsePositiveProbability > 0.01); HashesCount = -1 * std::log(FalsePositiveProbability) / std::log(2); - BitsCount = RowsCountExpectation * HashesCount / (std::log(2)); + BitsCount = RowsCountExpectation * HashesCount / std::log(2); } protected: - virtual std::shared_ptr DoBuildIndexChecker(const TProgramContainer& /*program*/) const override { - return nullptr; - } + virtual void DoFillIndexCheckers(const std::shared_ptr& info, const NSchemeShard::TOlapSchema& schema) const override; virtual std::shared_ptr DoBuildIndexImpl(TChunkedBatchReader& reader) const override; virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& proto) override { + AFL_VERIFY(TBase::DoDeserializeFromProto(proto)); AFL_VERIFY(proto.HasBloomFilter()); auto& bFilter = proto.GetBloomFilter(); FalsePositiveProbability = bFilter.GetFalsePositiveProbability(); @@ -85,8 +49,8 @@ class TBloomIndexMeta: public TIndexByColumns { public: TBloomIndexMeta() = default; - TBloomIndexMeta(const std::set& columnIds, const double fpProbability) - : TBase(columnIds) + TBloomIndexMeta(const ui32 indexId, const std::set& columnIds, const double fpProbability) + : TBase(indexId, columnIds) , FalsePositiveProbability(fpProbability) { Initialize(); } diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/ya.make b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/ya.make new file mode 100644 index 000000000000..e333fcd3ef97 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/ya.make @@ -0,0 +1,14 @@ +LIBRARY() + +SRCS( + GLOBAL constructor.cpp + GLOBAL meta.cpp + GLOBAL checker.cpp +) + +PEERDIR( + ydb/core/protos + ydb/core/formats/arrow +) + +END() diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/ya.make b/ydb/core/tx/columnshard/engines/scheme/indexes/ya.make index 138eb4d5e12d..1bf1e550c89a 100644 --- a/ydb/core/tx/columnshard/engines/scheme/indexes/ya.make +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/ya.make @@ -1,12 +1,8 @@ LIBRARY() -SRCS( - abstract.cpp -) - PEERDIR( - ydb/core/protos - ydb/core/formats/arrow + ydb/core/tx/columnshard/engines/scheme/indexes/abstract + ydb/core/tx/columnshard/engines/scheme/indexes/bloom ) END() diff --git a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp index ee10d7674b53..e54f8248f3d6 100644 --- a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp +++ b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp @@ -12,7 +12,7 @@ using namespace NOlap; namespace { -class TTestInsertTableDB : public IDbWrapper { +class TTestInsertTableDB: public IDbWrapper { public: void Insert(const TInsertedData&) override {} void Commit(const TInsertedData&) override {} @@ -22,8 +22,7 @@ class TTestInsertTableDB : public IDbWrapper { void EraseAborted(const TInsertedData&) override {} bool Load(TInsertTableAccessor&, - const TInstant&) override - { + const TInstant&) override { return true; } @@ -31,6 +30,10 @@ class TTestInsertTableDB : public IDbWrapper { void EraseColumn(const TPortionInfo&, const TColumnRecord&) override {} bool LoadColumns(const std::function&) override { return true; } + virtual void WriteIndex(const TPortionInfo& /*portion*/, const TIndexChunk& /*row*/) override {} + virtual void EraseIndex(const TPortionInfo& /*portion*/, const TIndexChunk& /*row*/) override {} + virtual bool LoadIndexes(const std::function& /*callback*/) override { return true; } + void WriteCounter(ui32, ui64) override {} bool LoadCounters(const std::function&) override { return true; } }; diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp index 229100ecab5e..8f1bdf36fc4e 100644 --- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp @@ -137,6 +137,10 @@ class TTestDbWrapper : public IDbWrapper { return true; } + virtual void WriteIndex(const TPortionInfo& /*portion*/, const TIndexChunk& /*row*/) override {} + virtual void EraseIndex(const TPortionInfo& /*portion*/, const TIndexChunk& /*row*/) override {} + virtual bool LoadIndexes(const std::function& /*callback*/) override { return true; } + void WriteCounter(ui32 counterId, ui64 value) override { auto& counters = Indices[0].Counters; counters[counterId] = value; diff --git a/ydb/core/tx/columnshard/hooks/abstract/abstract.h b/ydb/core/tx/columnshard/hooks/abstract/abstract.h index 6b5f70674c89..8a3530fc7315 100644 --- a/ydb/core/tx/columnshard/hooks/abstract/abstract.h +++ b/ydb/core/tx/columnshard/hooks/abstract/abstract.h @@ -82,6 +82,8 @@ class ICSController { bool OnStartCompaction(std::shared_ptr& changes) { return DoOnStartCompaction(changes); } + virtual void OnIndexSelectProcessed(const bool /*result*/) { + } virtual EOptimizerCompactionWeightControl GetCompactionControl() const { return EOptimizerCompactionWeightControl::Force; } diff --git a/ydb/core/tx/columnshard/hooks/testing/controller.h b/ydb/core/tx/columnshard/hooks/testing/controller.h index eed2aa61ce67..a66803954727 100644 --- a/ydb/core/tx/columnshard/hooks/testing/controller.h +++ b/ydb/core/tx/columnshard/hooks/testing/controller.h @@ -8,6 +8,8 @@ class TController: public ICSController { YDB_READONLY(TAtomicCounter, FilteredRecordsCount, 0); YDB_READONLY(TAtomicCounter, Compactions, 0); YDB_READONLY(TAtomicCounter, Indexations, 0); + YDB_READONLY(TAtomicCounter, IndexesSkippingOnSelect, 0); + YDB_READONLY(TAtomicCounter, IndexesApprovedOnSelect, 0); YDB_ACCESSOR(std::optional, GuaranteeIndexationInterval, TDuration::Zero()); YDB_ACCESSOR(std::optional, PeriodicWakeupActivationPeriod, std::nullopt); YDB_ACCESSOR(std::optional, StatsReportInterval, std::nullopt); @@ -38,6 +40,13 @@ class TController: public ICSController { } public: + virtual void OnIndexSelectProcessed(const bool result) override { + if (result) { + IndexesApprovedOnSelect.Inc(); + } else { + IndexesSkippingOnSelect.Inc(); + } + } void SetCompactionControl(const EOptimizerCompactionWeightControl value) { CompactionControl = value; } diff --git a/ydb/core/tx/program/program.cpp b/ydb/core/tx/program/program.cpp index 6ee25067bdd4..34f40d6578f8 100644 --- a/ydb/core/tx/program/program.cpp +++ b/ydb/core/tx/program/program.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include namespace NKikimr::NOlap { @@ -115,7 +116,11 @@ TAssign TProgramBuilder::MakeFunction(const NSsa::TColumnInfo& name, Error = TStringBuilder() << "Unknown kernel for " << name.GetColumnName() << ";kernel_idx=" << func.GetKernelIdx(); return TAssign(name, EOperation::Unspecified, std::move(arguments)); } - return TAssign(name, kernelFunction, std::move(arguments), nullptr); + TAssign result(name, kernelFunction, std::move(arguments), nullptr); + if (func.HasYqlOperationId()) { + result.SetYqlOperationId(func.GetYqlOperationId()); + } + return result; } switch (func.GetId()) { @@ -241,6 +246,7 @@ TAssign TProgramBuilder::MakeFunction(const NSsa::TColumnInfo& name, case TId::FUNC_UNSPECIFIED: break; } + return TAssign(name, EOperation::Unspecified, std::move(arguments)); } @@ -430,6 +436,26 @@ bool TProgramBuilder::ExtractGroupBy(NSsa::TProgramStep& step, const NKikimrSSA: return true; } + +} + +TString TSchemaResolverColumnsOnly::GetColumnName(ui32 id, bool required /*= true*/) const { + auto* column = Schema->GetColumns().GetById(id); + AFL_VERIFY(!required || !!column); + if (column) { + return column->GetName(); + } else { + return ""; + } +} + +std::optional TSchemaResolverColumnsOnly::GetColumnIdOptional(const TString& name) const { + auto* column = Schema->GetColumns().GetByName(name); + if (!column) { + return {}; + } else { + return column->GetId(); + } } const THashMap& TProgramContainer::GetSourceColumns() const { @@ -450,11 +476,60 @@ std::set TProgramContainer::GetEarlyFilterColumns() const { return Default>(); } +bool TProgramContainer::Init(const IColumnResolver& columnResolver, const NKikimrSSA::TProgram& programProto, TString& error) { + ProgramProto = programProto; + if (IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD)) { + TString out; + ::google::protobuf::TextFormat::PrintToString(programProto, &out); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("program", out); + } + + if (programProto.HasKernels()) { + KernelsRegistry.Parse(programProto.GetKernels()); + } + + if (!ParseProgram(columnResolver, programProto, error)) { + if (!error) { + error = TStringBuilder() << "Wrong olap program"; + } + return false; + } + + return true; +} + +bool TProgramContainer::Init(const IColumnResolver& columnResolver, const NKikimrSSA::TOlapProgram& olapProgramProto, TString& error) { + NKikimrSSA::TProgram programProto; + if (!programProto.ParseFromString(olapProgramProto.GetProgram())) { + error = TStringBuilder() << "Can't parse TProgram"; + return false; + } + + if (olapProgramProto.HasParameters()) { + Y_ABORT_UNLESS(olapProgramProto.HasParametersSchema(), "Parameters are present, but there is no schema."); + + auto schema = NArrow::DeserializeSchema(olapProgramProto.GetParametersSchema()); + ProgramParameters = NArrow::DeserializeBatch(olapProgramProto.GetParameters(), schema); + } + + ProgramProto = programProto; + + if (!Init(columnResolver, ProgramProto, error)) { + return false; + } + if (olapProgramProto.HasIndexChecker()) { + if (!IndexChecker.DeserializeFromProto(olapProgramProto.GetIndexChecker())) { + AFL_VERIFY_DEBUG(false); + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("problem", "cannot_parse_index_checker")("data", olapProgramProto.GetIndexChecker().DebugString()); + } + } + return true; +} + bool TProgramContainer::Init(const IColumnResolver& columnResolver, NKikimrSchemeOp::EOlapProgramType programType, TString serializedProgram, TString& error) { Y_ABORT_UNLESS(serializedProgram); Y_ABORT_UNLESS(!OverrideProcessingColumnsVector); - NKikimrSSA::TProgram programProto; NKikimrSSA::TOlapProgram olapProgramProto; switch (programType) { @@ -464,42 +539,13 @@ bool TProgramContainer::Init(const IColumnResolver& columnResolver, NKikimrSchem return false; } - if (!programProto.ParseFromString(olapProgramProto.GetProgram())) { - error = TStringBuilder() << "Can't parse TProgram"; - return false; - } - break; default: error = TStringBuilder() << "Unsupported olap program version: " << (ui32)programType; return false; } - ProgramProto = programProto; - if (IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD)) { - TString out; - ::google::protobuf::TextFormat::PrintToString(programProto, &out); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("program", out); - } - - if (olapProgramProto.HasParameters()) { - Y_ABORT_UNLESS(olapProgramProto.HasParametersSchema(), "Parameters are present, but there is no schema."); - - auto schema = NArrow::DeserializeSchema(olapProgramProto.GetParametersSchema()); - ProgramParameters = NArrow::DeserializeBatch(olapProgramProto.GetParameters(), schema); - } - - if (programProto.HasKernels()) { - KernelsRegistry.Parse(programProto.GetKernels()); - } - - if (!ParseProgram(columnResolver, programProto, error)) { - if (!error) { - error = TStringBuilder() << "Wrong olap program"; - } - return false; - } - return true; + return Init(columnResolver, olapProgramProto, error); } bool TProgramContainer::ParseProgram(const IColumnResolver& columnResolver, const NKikimrSSA::TProgram& program, TString& error) { diff --git a/ydb/core/tx/program/program.h b/ydb/core/tx/program/program.h index b595fc1f6100..dae9ae4d94a4 100644 --- a/ydb/core/tx/program/program.h +++ b/ydb/core/tx/program/program.h @@ -7,7 +7,12 @@ #include #include #include +#include +#include +namespace NKikimr::NSchemeShard { +class TOlapSchema; +} namespace NKikimr::NOlap { class IColumnResolver { @@ -19,6 +24,26 @@ class IColumnResolver { virtual NSsa::TColumnInfo GetDefaultColumn() const = 0; }; +class TSchemaResolverColumnsOnly: public IColumnResolver { +private: + std::shared_ptr Schema; +public: + TSchemaResolverColumnsOnly(const std::shared_ptr& schema) + : Schema(schema) { + AFL_VERIFY(Schema); + } + + virtual TString GetColumnName(ui32 id, bool required = true) const override; + virtual std::optional GetColumnIdOptional(const TString& name) const override; + virtual const NTable::TScheme::TTableSchema& GetSchema() const override { + AFL_VERIFY(false); + return Default(); + } + virtual NSsa::TColumnInfo GetDefaultColumn() const override { + return NSsa::TColumnInfo::Original((ui32)NOlap::NPortion::TSpecialColumns::SPEC_COL_PLAN_STEP_INDEX, NOlap::NPortion::TSpecialColumns::SPEC_COL_PLAN_STEP); + } +}; + class TProgramContainer { private: NKikimrSSA::TProgram ProgramProto; @@ -27,6 +52,7 @@ class TProgramContainer { TKernelsRegistry KernelsRegistry; std::optional> OverrideProcessingColumnsSet; std::optional> OverrideProcessingColumnsVector; + YDB_READONLY_DEF(NIndexes::TIndexCheckerContainer, IndexChecker); public: TString ProtoDebugString() const { return ProgramProto.DebugString(); @@ -53,6 +79,8 @@ class TProgramContainer { } bool Init(const IColumnResolver& columnResolver, NKikimrSchemeOp::EOlapProgramType programType, TString serializedProgram, TString& error); + bool Init(const IColumnResolver& columnResolver, const NKikimrSSA::TOlapProgram& olapProgramProto, TString& error); + bool Init(const IColumnResolver& columnResolver, const NKikimrSSA::TProgram& programProto, TString& error); const std::vector>& GetSteps() const { if (!Program) { diff --git a/ydb/core/tx/program/registry.cpp b/ydb/core/tx/program/registry.cpp index de2bd4cde4ea..f8ba71e37d78 100644 --- a/ydb/core/tx/program/registry.cpp +++ b/ydb/core/tx/program/registry.cpp @@ -31,4 +31,12 @@ bool TKernelsRegistry::Parse(const TString& serialized) { } return true; } + +NKikimr::NSsa::TFunctionPtr TKernelsRegistry::GetFunction(const size_t index) const { + if (index < Functions.size()) { + return Functions[index]; + } + return nullptr; +} + } diff --git a/ydb/core/tx/program/registry.h b/ydb/core/tx/program/registry.h index ba24f343555e..bc4f3a99e634 100644 --- a/ydb/core/tx/program/registry.h +++ b/ydb/core/tx/program/registry.h @@ -14,13 +14,7 @@ class TKernelsRegistry { public: bool Parse(const TString& serialized); - - NSsa::TFunctionPtr GetFunction(const size_t index) const { - if (index <= Functions.size()) { - return Functions[index]; - } - return nullptr; - } + NSsa::TFunctionPtr GetFunction(const size_t index) const; }; } diff --git a/ydb/core/tx/schemeshard/olap/indexes/schema.cpp b/ydb/core/tx/schemeshard/olap/indexes/schema.cpp index e715b1349625..af35646873dc 100644 --- a/ydb/core/tx/schemeshard/olap/indexes/schema.cpp +++ b/ydb/core/tx/schemeshard/olap/indexes/schema.cpp @@ -15,6 +15,21 @@ void TOlapIndexSchema::DeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDes AFL_VERIFY(IndexMeta.DeserializeFromProto(indexSchema))("incorrect_proto", indexSchema.DebugString()); } +bool TOlapIndexSchema::ApplyUpdate(const TOlapSchema& currentSchema, const TOlapIndexUpsert& upsert, IErrorCollector& errors) { + AFL_VERIFY(upsert.GetName() == GetName()); + AFL_VERIFY(!!upsert.GetIndexConstructor()); + if (upsert.GetIndexConstructor().GetClassName() != IndexMeta.GetClassName()) { + errors.AddError("different index classes: " + upsert.GetIndexConstructor().GetClassName() + " vs " + IndexMeta.GetClassName()); + return false; + } + auto object = upsert.GetIndexConstructor()->CreateIndexMeta(GetId(), currentSchema, errors); + if (!object) { + return false; + } + IndexMeta = NBackgroundTasks::TInterfaceProtoContainer(object); + return true; +} + bool TOlapIndexesDescription::ApplyUpdate(const TOlapSchema& currentSchema, const TOlapIndexesUpdate& schemaUpdate, IErrorCollector& errors, ui32& nextEntityId) { for (auto&& index : schemaUpdate.GetUpsertIndexes()) { auto* currentIndex = MutableByName(index.GetName()); @@ -23,11 +38,11 @@ bool TOlapIndexesDescription::ApplyUpdate(const TOlapSchema& currentSchema, cons return false; } } else { - auto meta = index.GetIndexConstructor()->CreateIndexMeta(currentSchema, errors); + const ui32 id = nextEntityId++; + auto meta = index.GetIndexConstructor()->CreateIndexMeta(id, currentSchema, errors); if (!meta) { return false; } - const ui32 id = nextEntityId++; TOlapIndexSchema newIndex(id, index.GetName(), meta); Y_ABORT_UNLESS(IndexesByName.emplace(index.GetName(), id).second); Y_ABORT_UNLESS(Indexes.emplace(id, std::move(newIndex)).second); diff --git a/ydb/core/tx/schemeshard/olap/indexes/schema.h b/ydb/core/tx/schemeshard/olap/indexes/schema.h index d5aaa8573a20..630016fe96a5 100644 --- a/ydb/core/tx/schemeshard/olap/indexes/schema.h +++ b/ydb/core/tx/schemeshard/olap/indexes/schema.h @@ -3,12 +3,14 @@ namespace NKikimr::NSchemeShard { +class TOlapSchema; + class TOlapIndexSchema { private: using TBase = TOlapIndexUpsert; YDB_READONLY(ui32, Id, Max()); YDB_READONLY_DEF(TString, Name); - NBackgroundTasks::TInterfaceProtoContainer IndexMeta; + YDB_READONLY_DEF(NBackgroundTasks::TInterfaceProtoContainer, IndexMeta); public: TOlapIndexSchema() = default; @@ -20,20 +22,7 @@ class TOlapIndexSchema { } - bool ApplyUpdate(const TOlapSchema& currentSchema, const TOlapIndexUpsert& upsert, IErrorCollector& errors) { - AFL_VERIFY(upsert.GetName() == GetName()); - AFL_VERIFY(!!upsert.GetIndexConstructor()); - if (upsert.GetIndexConstructor().GetClassName() != IndexMeta.GetClassName()) { - errors.AddError("different index classes: " + upsert.GetIndexConstructor().GetClassName() + " vs " + IndexMeta.GetClassName()); - return false; - } - auto object = upsert.GetIndexConstructor()->CreateIndexMeta(currentSchema, errors); - if (!object) { - return false; - } - IndexMeta = NBackgroundTasks::TInterfaceProtoContainer(object); - return true; - } + bool ApplyUpdate(const TOlapSchema& currentSchema, const TOlapIndexUpsert& upsert, IErrorCollector& errors); void SerializeToProto(NKikimrSchemeOp::TOlapIndexDescription& indexSchema) const; void DeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& indexSchema); diff --git a/ydb/core/tx/schemeshard/olap/indexes/update.h b/ydb/core/tx/schemeshard/olap/indexes/update.h index 0f1db75b0b8b..f6d0f88fa312 100644 --- a/ydb/core/tx/schemeshard/olap/indexes/update.h +++ b/ydb/core/tx/schemeshard/olap/indexes/update.h @@ -3,7 +3,7 @@ #include #include #include -#include +#include namespace NKikimr::NSchemeShard { diff --git a/ydb/core/tx/schemeshard/olap/operations/create_store.cpp b/ydb/core/tx/schemeshard/olap/operations/create_store.cpp index a50f1193b1a4..776b8d2737ed 100644 --- a/ydb/core/tx/schemeshard/olap/operations/create_store.cpp +++ b/ydb/core/tx/schemeshard/olap/operations/create_store.cpp @@ -7,11 +7,10 @@ #include #include -namespace { - using namespace NKikimr; -using namespace NSchemeShard; +using namespace NKikimr::NSchemeShard; +namespace { void ApplySharding(TTxId txId, TPathId pathId, TOlapStoreInfo::TPtr storeInfo, const TChannelsBindings& channelsBindings, diff --git a/ydb/library/arrow_kernels/operations.h b/ydb/library/arrow_kernels/operations.h index f9dafc4b50b5..bfe891274e02 100644 --- a/ydb/library/arrow_kernels/operations.h +++ b/ydb/library/arrow_kernels/operations.h @@ -1,3 +1,5 @@ +#pragma once + namespace NKikimr::NKernels { enum class EOperation { diff --git a/ydb/library/arrow_kernels/ya.make b/ydb/library/arrow_kernels/ya.make index 00b387e8b17b..f10d21e53b1e 100644 --- a/ydb/library/arrow_kernels/ya.make +++ b/ydb/library/arrow_kernels/ya.make @@ -8,6 +8,8 @@ PEERDIR( contrib/libs/apache/arrow ) +GENERATE_ENUM_SERIALIZATION(operations.h) + SRCS( func_cast.cpp ut_common.cpp diff --git a/ydb/library/yql/core/arrow_kernels/request/request.h b/ydb/library/yql/core/arrow_kernels/request/request.h index c6398b459106..8a5efa8e118f 100644 --- a/ydb/library/yql/core/arrow_kernels/request/request.h +++ b/ydb/library/yql/core/arrow_kernels/request/request.h @@ -16,7 +16,7 @@ class TKernelRequestBuilder { }; enum class EBinaryOp { - And, + And = 0, Or, Xor,