diff --git a/ydb/core/kqp/ut/common/columnshard.cpp b/ydb/core/kqp/ut/common/columnshard.cpp index 10f162f8baa6..212d49076d02 100644 --- a/ydb/core/kqp/ut/common/columnshard.cpp +++ b/ydb/core/kqp/ut/common/columnshard.cpp @@ -17,6 +17,10 @@ namespace NKqp { return Kikimr; } + TTestActorRuntime& TTestHelper::GetRuntime() { + return *Kikimr.GetTestServer().GetRuntime(); + } + NYdb::NTable::TSession& TTestHelper::GetSession() { return Session; } diff --git a/ydb/core/kqp/ut/common/columnshard.h b/ydb/core/kqp/ut/common/columnshard.h index e374b973edbf..92b45eb60759 100644 --- a/ydb/core/kqp/ut/common/columnshard.h +++ b/ydb/core/kqp/ut/common/columnshard.h @@ -70,6 +70,7 @@ namespace NKqp { public: TTestHelper(const TKikimrSettings& settings); TKikimrRunner& GetKikimr(); + TTestActorRuntime& GetRuntime(); NYdb::NTable::TSession& GetSession(); void CreateTable(const TColumnTableBase& table); void InsertData(const TColumnTable& table, TTestHelper::TUpdatesBuilder& updates, const std::function onBeforeCommit = {}, const NYdb::EStatus opStatus = NYdb::EStatus::SUCCESS); diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp index 811b4d03d72b..a91b40588cf0 100644 --- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp @@ -1,4 +1,5 @@ #include +#include #include #include @@ -5282,6 +5283,39 @@ Y_UNIT_TEST_SUITE(KqpOlap) { Cout << output << Endl; CompareYson(output, R"([[10u;]])"); } + + Y_UNIT_TEST(DuplicatesInIncomingBatch) { + auto csController = NYDBTest::TControllers::RegisterCSControllerGuard(); + TKikimrSettings runnerSettings; + runnerSettings.WithSampleTables = false; + TTestHelper testHelper(runnerSettings); + Tests::NCommon::TLoggerInit(testHelper.GetRuntime()).Initialize(); + TVector schema = { + TTestHelper::TColumnSchema().SetName("id").SetType(NScheme::NTypeIds::Int32).SetNullable(false), + TTestHelper::TColumnSchema().SetName("id_second").SetType(NScheme::NTypeIds::Utf8).SetNullable(false), + TTestHelper::TColumnSchema().SetName("resource_id").SetType(NScheme::NTypeIds::Utf8), + TTestHelper::TColumnSchema().SetName("level").SetType(NScheme::NTypeIds::Int32) + }; + TTestHelper::TColumnTable testTable; + + testTable.SetName("/Root/ColumnTableTest").SetPrimaryKey({"id", "id_second"}).SetSharding({"id"}).SetSchema(schema); + testHelper.CreateTable(testTable); + + { + TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema)); + tableInserter.AddRow().Add(1).Add("test_res_1").AddNull().AddNull(); + tableInserter.AddRow().Add(2).Add("test_res_2").Add("val1").AddNull(); + tableInserter.AddRow().Add(3).Add("test_res_3").Add("val3").AddNull(); + tableInserter.AddRow().Add(2).Add("test_res_2").Add("val2").AddNull(); + testHelper.InsertData(testTable, tableInserter); + } + while (csController->GetIndexations().Val() == 0) { + Cout << "Wait indexation..." << Endl; + Sleep(TDuration::Seconds(2)); + } + testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=2", "[[2;\"test_res_2\";#;[\"val1\"]]]"); + } + } } // namespace NKqp diff --git a/ydb/core/tx/columnshard/hooks/testing/controller.cpp b/ydb/core/tx/columnshard/hooks/testing/controller.cpp index bae9ac68ad4b..2fcf27ebaf76 100644 --- a/ydb/core/tx/columnshard/hooks/testing/controller.cpp +++ b/ydb/core/tx/columnshard/hooks/testing/controller.cpp @@ -1,6 +1,7 @@ #include "controller.h" #include #include +#include #include namespace NKikimr::NYDBTest::NColumnShard { @@ -12,6 +13,11 @@ bool TController::DoOnAfterFilterAssembling(const std::shared_ptr& changes) { if (auto compaction = dynamic_pointer_cast(changes)) { Compactions.Inc(); diff --git a/ydb/core/tx/columnshard/hooks/testing/controller.h b/ydb/core/tx/columnshard/hooks/testing/controller.h index 29bf1995ff73..eed2aa61ce67 100644 --- a/ydb/core/tx/columnshard/hooks/testing/controller.h +++ b/ydb/core/tx/columnshard/hooks/testing/controller.h @@ -7,6 +7,7 @@ class TController: public ICSController { private: YDB_READONLY(TAtomicCounter, FilteredRecordsCount, 0); YDB_READONLY(TAtomicCounter, Compactions, 0); + YDB_READONLY(TAtomicCounter, Indexations, 0); YDB_ACCESSOR(std::optional, GuaranteeIndexationInterval, TDuration::Zero()); YDB_ACCESSOR(std::optional, PeriodicWakeupActivationPeriod, std::nullopt); YDB_ACCESSOR(std::optional, StatsReportInterval, std::nullopt); @@ -16,6 +17,7 @@ class TController: public ICSController { protected: virtual bool DoOnAfterFilterAssembling(const std::shared_ptr& batch) override; virtual bool DoOnStartCompaction(std::shared_ptr& changes) override; + virtual bool DoOnWriteIndexComplete(const ui64 /*tabletId*/, const TString& /*changeClassName*/) override; virtual TDuration GetGuaranteeIndexationInterval(const TDuration defaultValue) const override { return GuaranteeIndexationInterval.value_or(defaultValue); }