From bc856901103728d2207deaa428d50c4c604641ee Mon Sep 17 00:00:00 2001 From: Oleg Geller Date: Fri, 12 Jan 2024 16:11:13 +0300 Subject: [PATCH 1/5] =?UTF-8?q?KIKIMR-20714=20=D0=A3=D0=B4=D0=B0=D0=BB?= =?UTF-8?q?=D1=8F=D0=B5=D0=BC=20LongTxService=20=D0=B2=20=D1=82=D0=B5?= =?UTF-8?q?=D1=81=D1=82=D0=B0=D1=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ydb/core/kqp/ut/common/columnshard.cpp | 22 +- ydb/core/kqp/ut/common/columnshard.h | 4 +- ydb/core/kqp/ut/olap/kqp_olap_ut.cpp | 49 +- ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp | 1 - ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp | 13 +- .../ydb/ut/CMakeLists.darwin-arm64.txt | 1 - .../ydb/ut/CMakeLists.darwin-x86_64.txt | 1 - .../ydb/ut/CMakeLists.linux-aarch64.txt | 1 - .../ydb/ut/CMakeLists.linux-x86_64.txt | 1 - .../ydb/ut/CMakeLists.windows-x86_64.txt | 1 - ydb/services/ydb/ut/ya.make | 1 - ydb/services/ydb/ydb_long_tx_ut.cpp | 456 ------------------ 12 files changed, 13 insertions(+), 538 deletions(-) delete mode 100644 ydb/services/ydb/ydb_long_tx_ut.cpp diff --git a/ydb/core/kqp/ut/common/columnshard.cpp b/ydb/core/kqp/ut/common/columnshard.cpp index 10f162f8baa6..cb049857dc32 100644 --- a/ydb/core/kqp/ut/common/columnshard.cpp +++ b/ydb/core/kqp/ut/common/columnshard.cpp @@ -9,7 +9,6 @@ namespace NKqp { TTestHelper::TTestHelper(const TKikimrSettings& settings) : Kikimr(settings) , TableClient(Kikimr.GetTableClient()) - , LongTxClient(Kikimr.GetDriver()) , Session(TableClient.CreateSession().GetValueSync().GetSession()) {} @@ -27,24 +26,9 @@ namespace NKqp { UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); } - void TTestHelper::InsertData(const TColumnTable& table, TTestHelper::TUpdatesBuilder& updates, const std::function onBeforeCommit /*= {}*/, const EStatus opStatus /*= EStatus::SUCCESS*/) { - NLongTx::TLongTxBeginResult resBeginTx = LongTxClient.BeginWriteTx().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(resBeginTx.Status().GetStatus(), EStatus::SUCCESS, resBeginTx.Status().GetIssues().ToString()); - - auto txId = resBeginTx.GetResult().tx_id(); - auto batch = updates.BuildArrow(); - TString data = NArrow::NSerialization::TFullDataSerializer(arrow::ipc::IpcWriteOptions::Defaults()).Serialize(batch); - - NLongTx::TLongTxWriteResult resWrite = - LongTxClient.Write(txId, table.GetName(), txId, data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(resWrite.Status().GetStatus(), opStatus, resWrite.Status().GetIssues().ToString()); - - if (onBeforeCommit) { - onBeforeCommit(); - } - - NLongTx::TLongTxCommitResult resCommitTx = LongTxClient.CommitTx(txId).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(resCommitTx.Status().GetStatus(), EStatus::SUCCESS, resCommitTx.Status().GetIssues().ToString()); + void TTestHelper::InsertData(const TColumnTable& table, TTestHelper::TUpdatesBuilder& updates, const std::function onBeforeCommit /*= {}*/, const Ydb::StatusIds_StatusCode& opStatus /*= Ydb::StatusIds::SUCCESS*/) { + Y_UNUSED(onBeforeCommit); + BulkUpsert(table, updates, opStatus); } void TTestHelper::BulkUpsert(const TColumnTable& table, TTestHelper::TUpdatesBuilder& updates, const Ydb::StatusIds_StatusCode& opStatus /*= Ydb::StatusIds::SUCCESS*/) { diff --git a/ydb/core/kqp/ut/common/columnshard.h b/ydb/core/kqp/ut/common/columnshard.h index e374b973edbf..794ab23825b6 100644 --- a/ydb/core/kqp/ut/common/columnshard.h +++ b/ydb/core/kqp/ut/common/columnshard.h @@ -3,7 +3,6 @@ #include "kqp_ut_common.h" #include #include -#include #include #include #include @@ -64,7 +63,6 @@ namespace NKqp { private: TKikimrRunner Kikimr; NYdb::NTable::TTableClient TableClient; - NYdb::NLongTx::TClient LongTxClient; NYdb::NTable::TSession Session; public: @@ -72,7 +70,7 @@ namespace NKqp { TKikimrRunner& GetKikimr(); NYdb::NTable::TSession& GetSession(); void CreateTable(const TColumnTableBase& table); - void InsertData(const TColumnTable& table, TTestHelper::TUpdatesBuilder& updates, const std::function onBeforeCommit = {}, const NYdb::EStatus opStatus = NYdb::EStatus::SUCCESS); + void InsertData(const TColumnTable& table, TTestHelper::TUpdatesBuilder& updates, const std::function onBeforeCommit = {}, const Ydb::StatusIds_StatusCode& opStatus = Ydb::StatusIds::SUCCESS); void BulkUpsert(const TColumnTable& table, TTestHelper::TUpdatesBuilder& updates, const Ydb::StatusIds_StatusCode& opStatus = Ydb::StatusIds::SUCCESS); void BulkUpsert(const TColumnTable& table, std::shared_ptr batch, const Ydb::StatusIds_StatusCode& opStatus = Ydb::StatusIds::SUCCESS); void ReadData(const TString& query, const TString& expected, const NYdb::EStatus opStatus = NYdb::EStatus::SUCCESS); diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp index 83373dbbca8e..80281f1d2d6c 100644 --- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp @@ -1,5 +1,4 @@ #include -#include #include #include @@ -519,65 +518,23 @@ Y_UNIT_TEST_SUITE(KqpOlap) { void WriteTestData(TKikimrRunner& kikimr, TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) { UNIT_ASSERT(testTable != "/Root/benchTable"); // TODO: check schema instead - TLocalHelper lHelper(kikimr); - NYdb::NLongTx::TClient client(kikimr.GetDriver()); - - NLongTx::TLongTxBeginResult resBeginTx = client.BeginWriteTx().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(resBeginTx.Status().GetStatus(), EStatus::SUCCESS, resBeginTx.Status().GetIssues().ToString()); - - auto txId = resBeginTx.GetResult().tx_id(); auto batch = lHelper.TestArrowBatch(pathIdBegin, tsBegin, rowCount); - - TString data = NArrow::NSerialization::TFullDataSerializer(arrow::ipc::IpcWriteOptions::Defaults()).Serialize(batch); - - NLongTx::TLongTxWriteResult resWrite = - client.Write(txId, testTable, txId, data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(resWrite.Status().GetStatus(), EStatus::SUCCESS, resWrite.Status().GetIssues().ToString()); - - NLongTx::TLongTxCommitResult resCommitTx = client.CommitTx(txId).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(resCommitTx.Status().GetStatus(), EStatus::SUCCESS, resCommitTx.Status().GetIssues().ToString()); + lHelper.SendDataViaActorSystem(testTable, batch); } void WriteTestDataForClickBench(TKikimrRunner& kikimr, TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) { UNIT_ASSERT(testTable == "/Root/benchTable"); // TODO: check schema instead - TClickHelper lHelper(kikimr.GetTestServer()); - NYdb::NLongTx::TClient client(kikimr.GetDriver()); - - NLongTx::TLongTxBeginResult resBeginTx = client.BeginWriteTx().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(resBeginTx.Status().GetStatus(), EStatus::SUCCESS, resBeginTx.Status().GetIssues().ToString()); - - auto txId = resBeginTx.GetResult().tx_id(); auto batch = lHelper.TestArrowBatch(pathIdBegin, tsBegin, rowCount); - TString data = NArrow::NSerialization::TFullDataSerializer(arrow::ipc::IpcWriteOptions::Defaults()).Serialize(batch); - - NLongTx::TLongTxWriteResult resWrite = - client.Write(txId, testTable, txId, data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(resWrite.Status().GetStatus(), EStatus::SUCCESS, resWrite.Status().GetIssues().ToString()); - - NLongTx::TLongTxCommitResult resCommitTx = client.CommitTx(txId).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(resCommitTx.Status().GetStatus(), EStatus::SUCCESS, resCommitTx.Status().GetIssues().ToString()); + lHelper.SendDataViaActorSystem(testTable, batch); } void WriteTestDataForTableWithNulls(TKikimrRunner& kikimr, TString testTable) { UNIT_ASSERT(testTable == "/Root/tableWithNulls"); // TODO: check schema instead TTableWithNullsHelper lHelper(kikimr.GetTestServer()); - NYdb::NLongTx::TClient client(kikimr.GetDriver()); - - NLongTx::TLongTxBeginResult resBeginTx = client.BeginWriteTx().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(resBeginTx.Status().GetStatus(), EStatus::SUCCESS, resBeginTx.Status().GetIssues().ToString()); - - auto txId = resBeginTx.GetResult().tx_id(); auto batch = lHelper.TestArrowBatch(); - TString data = NArrow::NSerialization::TFullDataSerializer(arrow::ipc::IpcWriteOptions::Defaults()).Serialize(batch); - - NLongTx::TLongTxWriteResult resWrite = - client.Write(txId, testTable, txId, data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(resWrite.Status().GetStatus(), EStatus::SUCCESS, resWrite.Status().GetIssues().ToString()); - - NLongTx::TLongTxCommitResult resCommitTx = client.CommitTx(txId).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(resCommitTx.Status().GetStatus(), EStatus::SUCCESS, resCommitTx.Status().GetIssues().ToString()); + lHelper.SendDataViaActorSystem(testTable, batch); } void CreateTableOfAllTypes(TKikimrRunner& kikimr) { diff --git a/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp index 5a8e89b75436..1dbbf79f3198 100644 --- a/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp @@ -5,7 +5,6 @@ #include #include #include -#include #include #include #include diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp index 536595ccf922..d71f4f8080fe 100644 --- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp @@ -5,7 +5,6 @@ #include #include #include -#include #include #include #include @@ -5444,7 +5443,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { } testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;#;#;[\"test_res_1\"]]]"); } - +/* Y_UNIT_TEST(AddColumnOnSchemeChange) { TKikimrSettings runnerSettings; runnerSettings.WithSampleTables = false; @@ -5471,7 +5470,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { } testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;#;#;[\"test_res_1\"]]]"); } - +*/ Y_UNIT_TEST(AddColumnWithStore) { TKikimrSettings runnerSettings; runnerSettings.WithSampleTables = false; @@ -5580,7 +5579,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schemaWithNull)); tableInserter.AddRow().Add(1).Add("test_res_1").AddNull(); tableInserter.AddRow().Add(2).Add("test_res_2").Add(123); - testHelper.InsertData(testTable, tableInserter, {}, EStatus::GENERIC_ERROR); + testHelper.InsertData(testTable, tableInserter, {}, Ydb::StatusIds::GENERIC_ERROR); } { TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schemaWithNull)); @@ -5628,7 +5627,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { } testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` ", "[[1];[2]]"); } - +/* Y_UNIT_TEST(DropColumnOnSchemeChange) { TKikimrSettings runnerSettings; runnerSettings.WithSampleTables = false; @@ -5654,7 +5653,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { } testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;#]]"); } - +*/ Y_UNIT_TEST(DropColumnOldScheme) { TKikimrSettings runnerSettings; runnerSettings.WithSampleTables = false; @@ -5678,7 +5677,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { { TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema)); tableInserter.AddRow().Add(1).Add("test_res_1").AddNull(); - testHelper.InsertData(testTable, tableInserter, {}, EStatus::SUCCESS); + testHelper.InsertData(testTable, tableInserter); } // testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;#]]"); } diff --git a/ydb/services/ydb/ut/CMakeLists.darwin-arm64.txt b/ydb/services/ydb/ut/CMakeLists.darwin-arm64.txt index 5b40b5c912bb..ec2af8ae754d 100644 --- a/ydb/services/ydb/ut/CMakeLists.darwin-arm64.txt +++ b/ydb/services/ydb/ut/CMakeLists.darwin-arm64.txt @@ -62,7 +62,6 @@ target_sources(ydb-services-ydb-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_scripting_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_table_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_stats_ut.cpp - ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_long_tx_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_logstore_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_olapstore_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_monitoring_ut.cpp diff --git a/ydb/services/ydb/ut/CMakeLists.darwin-x86_64.txt b/ydb/services/ydb/ut/CMakeLists.darwin-x86_64.txt index 7d3a542c5833..236ce002b2ac 100644 --- a/ydb/services/ydb/ut/CMakeLists.darwin-x86_64.txt +++ b/ydb/services/ydb/ut/CMakeLists.darwin-x86_64.txt @@ -63,7 +63,6 @@ target_sources(ydb-services-ydb-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_scripting_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_table_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_stats_ut.cpp - ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_long_tx_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_logstore_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_olapstore_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_monitoring_ut.cpp diff --git a/ydb/services/ydb/ut/CMakeLists.linux-aarch64.txt b/ydb/services/ydb/ut/CMakeLists.linux-aarch64.txt index 10f9ea9c59a8..22602791c90c 100644 --- a/ydb/services/ydb/ut/CMakeLists.linux-aarch64.txt +++ b/ydb/services/ydb/ut/CMakeLists.linux-aarch64.txt @@ -66,7 +66,6 @@ target_sources(ydb-services-ydb-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_scripting_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_table_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_stats_ut.cpp - ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_long_tx_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_logstore_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_olapstore_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_monitoring_ut.cpp diff --git a/ydb/services/ydb/ut/CMakeLists.linux-x86_64.txt b/ydb/services/ydb/ut/CMakeLists.linux-x86_64.txt index 4262e1a9d7d6..500d6550c29a 100644 --- a/ydb/services/ydb/ut/CMakeLists.linux-x86_64.txt +++ b/ydb/services/ydb/ut/CMakeLists.linux-x86_64.txt @@ -67,7 +67,6 @@ target_sources(ydb-services-ydb-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_scripting_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_table_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_stats_ut.cpp - ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_long_tx_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_logstore_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_olapstore_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_monitoring_ut.cpp diff --git a/ydb/services/ydb/ut/CMakeLists.windows-x86_64.txt b/ydb/services/ydb/ut/CMakeLists.windows-x86_64.txt index fc2ccf0ac109..99cd79461a8f 100644 --- a/ydb/services/ydb/ut/CMakeLists.windows-x86_64.txt +++ b/ydb/services/ydb/ut/CMakeLists.windows-x86_64.txt @@ -56,7 +56,6 @@ target_sources(ydb-services-ydb-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_scripting_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_table_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_stats_ut.cpp - ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_long_tx_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_logstore_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_olapstore_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_monitoring_ut.cpp diff --git a/ydb/services/ydb/ut/ya.make b/ydb/services/ydb/ut/ya.make index 7817f096c68a..669e766eed71 100644 --- a/ydb/services/ydb/ut/ya.make +++ b/ydb/services/ydb/ut/ya.make @@ -22,7 +22,6 @@ SRCS( ydb_scripting_ut.cpp ydb_table_ut.cpp ydb_stats_ut.cpp - ydb_long_tx_ut.cpp ydb_logstore_ut.cpp ydb_olapstore_ut.cpp ydb_monitoring_ut.cpp diff --git a/ydb/services/ydb/ydb_long_tx_ut.cpp b/ydb/services/ydb/ydb_long_tx_ut.cpp deleted file mode 100644 index b46b933a12a5..000000000000 --- a/ydb/services/ydb/ydb_long_tx_ut.cpp +++ /dev/null @@ -1,456 +0,0 @@ -#include "ydb_common_ut.h" - -#include -#include -#include -#include -#include -#include -#include -#include - -using namespace NYdb; - -namespace -{ - -static const constexpr char* TestTablePath = TTestOlap::TablePath; - -TString TestBlob() { - auto batch = TTestOlap::SampleBatch(); - return NArrow::NSerialization::TFullDataSerializer(arrow::ipc::IpcWriteOptions::Defaults()).Serialize(batch); -} - -TVector> SplitData(const TString& data, ui32 numBatches) { - auto batch = NArrow::NSerialization::TFullDataDeserializer().Deserialize(data); - UNIT_ASSERT(batch.ok()); - - NSharding::TLogsSharding sharding(numBatches, { "timestamp", "uid" }, numBatches); - std::vector rowSharding = sharding.MakeSharding(*batch); - Y_ABORT_UNLESS(rowSharding.size() == (size_t)batch->get()->num_rows()); - - std::vector> sharded = NArrow::ShardingSplit(*batch, rowSharding, numBatches); - Y_ABORT_UNLESS(sharded.size() == numBatches); - - TVector> out; - for (size_t i = 0; i < numBatches; ++i) { - if (sharded[i]) { - Y_ABORT_UNLESS(sharded[i]->ValidateFull().ok()); - out.emplace_back(sharded[i]); - } - } - - return out; -} - -bool EqualBatches(const TString& x, const TString& y) { - auto schema = TTestOlap::ArrowSchema(); - std::shared_ptr batchX = NArrow::DeserializeBatch(x, schema); - std::shared_ptr batchY = NArrow::DeserializeBatch(y, schema); - Y_ABORT_UNLESS(batchX && batchY); - if ((batchX->num_columns() != batchY->num_columns()) || - (batchX->num_rows() != batchY->num_rows())) { - Cerr << __FILE__ << ':' << __LINE__ << " " - << batchX->num_columns() << ':' << batchX->num_rows() << " vs " - << batchY->num_columns() << ':' << batchY->num_rows() << "\n"; - return false; - } - - for (auto& column : schema->field_names()) { - auto filedX = batchX->schema()->GetFieldByName(column); - auto filedY = batchY->schema()->GetFieldByName(column); - Y_ABORT_UNLESS(filedX->type()->id() == filedY->type()->id()); - - auto arrX = batchX->GetColumnByName(column); - auto arrY = batchY->GetColumnByName(column); - - switch (filedX->type()->id()) { - case arrow::Type::INT32: - if (!NArrow::ArrayEqualValue(arrX, arrY)) { - Cerr << __FILE__ << ':' << __LINE__ << " " << column << "\n"; - return false; - } - break; - case arrow::Type::UINT64: - if (!NArrow::ArrayEqualValue(arrX, arrY)) { - Cerr << __FILE__ << ':' << __LINE__ << " " << column << "\n"; - return false; - } - break; - case arrow::Type::TIMESTAMP: - if (!NArrow::ArrayEqualValue(arrX, arrY)) { - Cerr << __FILE__ << ':' << __LINE__ << " " << column << "\n"; - return false; - } - break; - case arrow::Type::BINARY: - if (!NArrow::ArrayEqualView(arrX, arrY)) { - Cerr << __FILE__ << ':' << __LINE__ << " " << column << "\n"; - return false; - } - break; - case arrow::Type::STRING: - if (!NArrow::ArrayEqualView(arrX, arrY)) { - Cerr << __FILE__ << ':' << __LINE__ << " " << column << "\n"; - return false; - } - break; - - default: - Cerr << __FILE__ << ':' << __LINE__ << "\n"; - return false; - } - } - - return true; -} - -} - - -Y_UNIT_TEST_SUITE(YdbLongTx) { - - Y_UNIT_TEST(BeginWriteCommit) { - NKikimrConfig::TAppConfig appConfig; - TKikimrWithGrpcAndRootSchema server(appConfig); - server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG); - - ui16 grpc = server.GetPort(); - TString location = TStringBuilder() << "localhost:" << grpc; - auto connection = NYdb::TDriver(TDriverConfig().SetEndpoint(location)); - - TTestOlap::CreateTable(*server.ServerSettings); - - NYdb::NLongTx::TClient client(connection); - - NLongTx::TLongTxBeginResult resBeginTx = client.BeginWriteTx().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS); - - auto txId = resBeginTx.GetResult().tx_id(); - TString data = TestBlob(); - - NLongTx::TLongTxWriteResult resWrite = - client.Write(txId, TestTablePath, "0", data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resWrite.Status().GetStatus(), EStatus::SUCCESS); - - NLongTx::TLongTxCommitResult resCommitTx = client.CommitTx(txId).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resCommitTx.Status().GetStatus(), EStatus::SUCCESS); - } - - Y_UNIT_TEST(BeginWriteRollback) { - NKikimrConfig::TAppConfig appConfig; - TKikimrWithGrpcAndRootSchema server(appConfig); - server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG); - - ui16 grpc = server.GetPort(); - TString location = TStringBuilder() << "localhost:" << grpc; - auto connection = NYdb::TDriver(TDriverConfig().SetEndpoint(location)); - - TTestOlap::CreateTable(*server.ServerSettings); - - NYdb::NLongTx::TClient client(connection); - - NLongTx::TLongTxBeginResult resBeginTx = client.BeginWriteTx().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS); - - auto txId = resBeginTx.GetResult().tx_id(); - TString data = TestBlob(); - - NLongTx::TLongTxWriteResult resWrite = - client.Write(txId, TestTablePath, "0", data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resWrite.Status().GetStatus(), EStatus::SUCCESS); - - NLongTx::TLongTxRollbackResult resRollbackTx = client.RollbackTx(txId).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resRollbackTx.Status().GetStatus(), EStatus::SUCCESS); - } - - Y_UNIT_TEST(BeginRead) { - NKikimrConfig::TAppConfig appConfig; - TKikimrWithGrpcAndRootSchema server(appConfig); - server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG); - - ui16 grpc = server.GetPort(); - TString location = TStringBuilder() << "localhost:" << grpc; - auto connection = NYdb::TDriver(TDriverConfig().SetEndpoint(location)); - - TTestOlap::CreateTable(*server.ServerSettings); - - NYdb::NLongTx::TClient client(connection); - - NLongTx::TLongTxBeginResult resBeginTx = client.BeginReadTx().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS); - - auto txId = resBeginTx.GetResult().tx_id(); - - NLongTx::TLongTxReadResult resRead = client.Read(txId, TestTablePath).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resRead.Status().GetStatus(), EStatus::SUCCESS); - UNIT_ASSERT_VALUES_EQUAL(resRead.GetResult().data().data(), ""); - } - - Y_UNIT_TEST(WriteThenRead) { - NKikimrConfig::TAppConfig appConfig; - TKikimrWithGrpcAndRootSchema server(appConfig); - server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG); - - ui16 grpc = server.GetPort(); - TString location = TStringBuilder() << "localhost:" << grpc; - auto connection = NYdb::TDriver(TDriverConfig().SetEndpoint(location)); - - TTestOlap::CreateTable(*server.ServerSettings); - - NYdb::NLongTx::TClient client(connection); - - // Read before write - TString beforeWriteTxId; - { - NLongTx::TLongTxBeginResult resBeginTx = client.BeginReadTx().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS); - - beforeWriteTxId = resBeginTx.GetResult().tx_id(); - - NLongTx::TLongTxReadResult resRead = client.Read(beforeWriteTxId, TestTablePath).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resRead.Status().GetStatus(), EStatus::SUCCESS); - UNIT_ASSERT_VALUES_EQUAL(resRead.GetResult().data().data(), ""); - } - - // Write - auto batch = TTestOlap::SampleBatch(false, 10000); - const TString data = NArrow::NSerialization::TFullDataSerializer(arrow::ipc::IpcWriteOptions::Defaults()).Serialize(batch); - - { - NLongTx::TLongTxBeginResult resBeginTx = client.BeginWriteTx().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS); - - auto txId = resBeginTx.GetResult().tx_id(); - - NLongTx::TLongTxWriteResult resWrite = - client.Write(txId, TestTablePath, "0", data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resWrite.Status().GetStatus(), EStatus::SUCCESS); - - NLongTx::TLongTxCommitResult resCommitTx = client.CommitTx(txId).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resCommitTx.Status().GetStatus(), EStatus::SUCCESS); - } - - // Read after write - auto sharded = SplitData(data, 2); - UNIT_ASSERT_VALUES_EQUAL(sharded.size(), 2); - UNIT_ASSERT_VALUES_EQUAL(sharded[0]->num_rows(), 4990); - UNIT_ASSERT_VALUES_EQUAL(sharded[1]->num_rows(), 5010); - - TVector expected; - for (auto batch : sharded) { - expected.push_back(NArrow::SerializeBatchNoCompression(batch)); - } - - TVector returned; - { - NLongTx::TLongTxBeginResult resBeginTx = client.BeginReadTx().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS); - - auto txId = resBeginTx.GetResult().tx_id(); - - NLongTx::TLongTxReadResult resRead = client.Read(txId, TestTablePath).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resRead.Status().GetStatus(), EStatus::SUCCESS); - returned.push_back(resRead.GetResult().data().data()); - // TODO: read both - - UNIT_ASSERT(EqualBatches(expected[0], returned[0]) || - EqualBatches(expected[1], returned[0])); - } - - // Read before write again - { - NLongTx::TLongTxReadResult resRead = client.Read(beforeWriteTxId, TestTablePath).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resRead.Status().GetStatus(), EStatus::SUCCESS); - UNIT_ASSERT_VALUES_EQUAL(resRead.GetResult().data().data(), ""); - } - } - - Y_UNIT_TEST(ReadFutureSnapshot) { - NKikimrConfig::TAppConfig appConfig; - TKikimrWithGrpcAndRootSchema server(appConfig); - server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG); - - ui16 grpc = server.GetPort(); - TString location = TStringBuilder() << "localhost:" << grpc; - auto connection = NYdb::TDriver(TDriverConfig().SetEndpoint(location)); - - TTestOlap::CreateTable(*server.ServerSettings, 1); - - NYdb::NLongTx::TClient client(connection); - - TString futureTxId; - NYdb::NLongTx::TClient::TAsyncReadResult futureRead; - { - NLongTx::TLongTxBeginResult resBeginTx = client.BeginReadTx().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS); - - TString txId = resBeginTx.GetResult().tx_id(); - NLongTxService::TLongTxId parsed; - Y_ABORT_UNLESS(parsed.ParseString(txId)); - Y_ABORT_UNLESS(parsed.Snapshot.Step > 0); - parsed.Snapshot.Step += 2000; // 2 seconds in the future - futureTxId = parsed.ToString(); - // Cerr << "Future txId " << futureTxId << Endl; - - futureRead = client.Read(futureTxId, TestTablePath); - } - - // Write - TString data = TestBlob(); - { - NLongTx::TLongTxBeginResult resBeginTx = client.BeginWriteTx().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS); - - auto txId = resBeginTx.GetResult().tx_id(); - - NLongTx::TLongTxWriteResult resWrite = - client.Write(txId, TestTablePath, "0", data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resWrite.Status().GetStatus(), EStatus::SUCCESS); - - NLongTx::TLongTxCommitResult resCommitTx = client.CommitTx(txId).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resCommitTx.Status().GetStatus(), EStatus::SUCCESS); - } - - // Await read - { - NLongTx::TLongTxReadResult resRead = futureRead.GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resRead.Status().GetStatus(), EStatus::SUCCESS); - - auto inputBatch = NArrow::NSerialization::TFullDataDeserializer().Deserialize(data); - UNIT_ASSERT(inputBatch.ok()); - auto readBatch = NArrow::NSerialization::TBatchPayloadDeserializer(inputBatch->get()->schema()).Deserialize(resRead.GetResult().data().data()); - UNIT_ASSERT(readBatch.ok()); - UNIT_ASSERT_VALUES_EQUAL(readBatch->get()->ToString(), inputBatch->get()->ToString()); - } - } - - Y_UNIT_TEST(WriteAclChecks) { - NKikimrConfig::TAppConfig appConfig; - appConfig.MutableDomainsConfig()->MutableSecurityConfig()->SetEnforceUserTokenRequirement(true); - TKikimrWithGrpcAndRootSchema server(appConfig); - server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG); - - ui16 grpc = server.GetPort(); - TString location = TStringBuilder() << "localhost:" << grpc; - auto connection1 = NYdb::TDriver(TDriverConfig() - .SetEndpoint(location) - .SetDatabase("/Root") - .SetAuthToken("user1@builtin")); - auto connection2 = NYdb::TDriver(TDriverConfig() - .SetEndpoint(location) - .SetDatabase("/Root") - .SetAuthToken("user2@builtin")); - - TTestOlap::CreateTable(*server.ServerSettings); - { - TClient annoyingClient(*server.ServerSettings); - annoyingClient.SetSecurityToken("root@builtin"); - NACLib::TDiffACL diff; - diff.AddAccess(NACLib::EAccessType::Allow, NACLib::UpdateRow, "user1@builtin"); - annoyingClient.ModifyACL("/Root/OlapStore", "OlapTable", diff.SerializeAsString()); - } - - // try user1 first - { - NYdb::NLongTx::TClient client(connection1); - - NLongTx::TLongTxBeginResult resBeginTx = client.BeginWriteTx().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS); - - auto txId = resBeginTx.GetResult().tx_id(); - TString data = TestBlob(); - - NLongTx::TLongTxWriteResult resWrite = - client.Write(txId, TestTablePath, "0", data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resWrite.Status().GetStatus(), EStatus::SUCCESS); - - NLongTx::TLongTxCommitResult resCommitTx = client.CommitTx(txId).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resCommitTx.Status().GetStatus(), EStatus::SUCCESS); - } - - // try user2 next - { - NYdb::NLongTx::TClient client(connection2); - - NLongTx::TLongTxBeginResult resBeginTx = client.BeginWriteTx().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS); - - auto txId = resBeginTx.GetResult().tx_id(); - TString data = TestBlob(); - - NLongTx::TLongTxWriteResult resWrite = - client.Write(txId, TestTablePath, "0", data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resWrite.Status().GetStatus(), EStatus::UNAUTHORIZED); - - NLongTx::TLongTxRollbackResult resRollbackTx = client.RollbackTx(txId).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resRollbackTx.Status().GetStatus(), EStatus::SUCCESS); - } - } - - Y_UNIT_TEST(ReadAclChecks) { - NKikimrConfig::TAppConfig appConfig; - appConfig.MutableDomainsConfig()->MutableSecurityConfig()->SetEnforceUserTokenRequirement(true); - TKikimrWithGrpcAndRootSchema server(appConfig); - server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG); - - ui16 grpc = server.GetPort(); - TString location = TStringBuilder() << "localhost:" << grpc; - auto connection1 = NYdb::TDriver(TDriverConfig() - .SetEndpoint(location) - .SetDatabase("/Root") - .SetAuthToken("user1@builtin")); - auto connection2 = NYdb::TDriver(TDriverConfig() - .SetEndpoint(location) - .SetDatabase("/Root") - .SetAuthToken("user2@builtin")); - - TTestOlap::CreateTable(*server.ServerSettings); - { - TClient annoyingClient(*server.ServerSettings); - annoyingClient.SetSecurityToken("root@builtin"); - NACLib::TDiffACL diff; - diff.AddAccess(NACLib::EAccessType::Allow, NACLib::SelectRow, "user1@builtin"); - annoyingClient.ModifyACL("/Root/OlapStore", "OlapTable", diff.SerializeAsString()); - } - - // try user1 first - { - NYdb::NLongTx::TClient client(connection1); - - NLongTx::TLongTxBeginResult resBeginTx = client.BeginReadTx().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS); - - auto txId = resBeginTx.GetResult().tx_id(); - - NLongTx::TLongTxReadResult resRead = client.Read(txId, TestTablePath).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resRead.Status().GetStatus(), EStatus::SUCCESS); - } - - // try user2 next - { - NYdb::NLongTx::TClient client(connection2); - - NLongTx::TLongTxBeginResult resBeginTx = client.BeginReadTx().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS); - - auto txId = resBeginTx.GetResult().tx_id(); - - NLongTx::TLongTxReadResult resRead = client.Read(txId, TestTablePath).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resRead.Status().GetStatus(), EStatus::UNAUTHORIZED); - } - } - - Y_UNIT_TEST(CreateOlapWithDirs) { - NKikimrConfig::TAppConfig appConfig; - TKikimrWithGrpcAndRootSchema server(appConfig); - server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG); - - ui16 grpc = server.GetPort(); - TString location = TStringBuilder() << "localhost:" << grpc; - auto connection = NYdb::TDriver(TDriverConfig().SetEndpoint(location)); - - TTestOlap::CreateTable(*server.ServerSettings, 1, "DirA/OlapStore", "DirB/OlapTable"); - } - -} From 408d79741c7ab49ce0ad03516a0f2a0cd405fc9e Mon Sep 17 00:00:00 2001 From: Oleg Geller Date: Fri, 12 Jan 2024 22:00:25 +0300 Subject: [PATCH 2/5] fix tests --- ydb/core/testlib/cs_helper.cpp | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/ydb/core/testlib/cs_helper.cpp b/ydb/core/testlib/cs_helper.cpp index a2cb1668991b..4cfa02c554ab 100644 --- a/ydb/core/testlib/cs_helper.cpp +++ b/ydb/core/testlib/cs_helper.cpp @@ -418,15 +418,14 @@ std::shared_ptr TTableWithNullsHelper::TestArrowBatch(ui64, Y_ABORT_UNLESS(bJsonDoc.AppendNull().ok()); } - auto maybeJsonDoc = NBinaryJson::SerializeToBinaryJson(R"({"col1": "val1", "obj": {"obj_col2_int": 16}})"); - Y_ABORT_UNLESS(maybeJsonDoc.Defined()); + const auto maybeJsonDoc = std::string(R"({"col1": "val1", "obj": {"obj_col2_int": 16}})"); for (size_t i = rowCount / 2 + 1; i <= rowCount; ++i) { Y_ABORT_UNLESS(bId.Append(i).ok()); Y_ABORT_UNLESS(bResourceId.Append(std::to_string(i)).ok()); Y_ABORT_UNLESS(bLevel.AppendNull().ok()); Y_ABORT_UNLESS(bBinaryStr.Append(std::to_string(i)).ok()); Y_ABORT_UNLESS(bJsonVal.AppendNull().ok()); - Y_ABORT_UNLESS(bJsonDoc.Append(maybeJsonDoc->Data(), maybeJsonDoc->Size()).ok()); + Y_ABORT_UNLESS(bJsonDoc.Append(maybeJsonDoc.data(), maybeJsonDoc.length()).ok()); } std::shared_ptr aId; From 9222552b593de3ebfc93e6ddb4153229dda58bdc Mon Sep 17 00:00:00 2001 From: Oleg Geller Date: Sat, 13 Jan 2024 09:41:03 +0300 Subject: [PATCH 3/5] Remove test DropColumnOldScheme --- ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp | 27 ------------------------ 1 file changed, 27 deletions(-) diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp index d71f4f8080fe..f11f62471f92 100644 --- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp @@ -5654,33 +5654,6 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;#]]"); } */ - Y_UNIT_TEST(DropColumnOldScheme) { - TKikimrSettings runnerSettings; - runnerSettings.WithSampleTables = false; - TTestHelper testHelper(runnerSettings); - - TVector schema = { - TTestHelper::TColumnSchema().SetName("id").SetType(NScheme::NTypeIds::Int32).SetNullable(false), - TTestHelper::TColumnSchema().SetName("resource_id").SetType(NScheme::NTypeIds::Utf8), - TTestHelper::TColumnSchema().SetName("level").SetType(NScheme::NTypeIds::Int32) - }; - - TTestHelper::TColumnTable testTable; - - testTable.SetName("/Root/ColumnTableTest").SetPrimaryKey({"id"}).SetSharding({"id"}).SetSchema(schema); - testHelper.CreateTable(testTable); - { - auto alterQuery = TStringBuilder() << "ALTER TABLE `" << testTable.GetName() << "`DROP COLUMN resource_id;"; - auto alterResult = testHelper.GetSession().ExecuteSchemeQuery(alterQuery).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString()); - } - { - TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema)); - tableInserter.AddRow().Add(1).Add("test_res_1").AddNull(); - testHelper.InsertData(testTable, tableInserter); - } - // testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;#]]"); - } Y_UNIT_TEST(DropColumnOldSchemeBulkUpsert) { TKikimrSettings runnerSettings; From b25eb993188128d8c3562c0e660ce51573b4b26c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9E=D0=BB=D0=B5=D0=B3?= Date: Sun, 14 Jan 2024 17:17:05 +0300 Subject: [PATCH 4/5] fix issues --- ydb/core/kqp/ut/common/columnshard.cpp | 5 - ydb/core/kqp/ut/common/columnshard.h | 1 - ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp | 14 +- .../ydb/ut/CMakeLists.darwin-arm64.txt | 1 + .../ydb/ut/CMakeLists.darwin-x86_64.txt | 1 + .../ydb/ut/CMakeLists.linux-aarch64.txt | 1 + .../ydb/ut/CMakeLists.linux-x86_64.txt | 1 + .../ydb/ut/CMakeLists.windows-x86_64.txt | 1 + ydb/services/ydb/ut/ya.make | 1 + ydb/services/ydb/ydb_long_tx_ut.cpp | 456 ++++++++++++++++++ 10 files changed, 469 insertions(+), 13 deletions(-) create mode 100644 ydb/services/ydb/ydb_long_tx_ut.cpp diff --git a/ydb/core/kqp/ut/common/columnshard.cpp b/ydb/core/kqp/ut/common/columnshard.cpp index cb049857dc32..a461644ed66f 100644 --- a/ydb/core/kqp/ut/common/columnshard.cpp +++ b/ydb/core/kqp/ut/common/columnshard.cpp @@ -26,11 +26,6 @@ namespace NKqp { UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); } - void TTestHelper::InsertData(const TColumnTable& table, TTestHelper::TUpdatesBuilder& updates, const std::function onBeforeCommit /*= {}*/, const Ydb::StatusIds_StatusCode& opStatus /*= Ydb::StatusIds::SUCCESS*/) { - Y_UNUSED(onBeforeCommit); - BulkUpsert(table, updates, opStatus); - } - void TTestHelper::BulkUpsert(const TColumnTable& table, TTestHelper::TUpdatesBuilder& updates, const Ydb::StatusIds_StatusCode& opStatus /*= Ydb::StatusIds::SUCCESS*/) { Y_UNUSED(opStatus); NKikimr::Tests::NCS::THelper helper(Kikimr.GetTestServer()); diff --git a/ydb/core/kqp/ut/common/columnshard.h b/ydb/core/kqp/ut/common/columnshard.h index 794ab23825b6..a1584d60f9b8 100644 --- a/ydb/core/kqp/ut/common/columnshard.h +++ b/ydb/core/kqp/ut/common/columnshard.h @@ -70,7 +70,6 @@ namespace NKqp { TKikimrRunner& GetKikimr(); NYdb::NTable::TSession& GetSession(); void CreateTable(const TColumnTableBase& table); - void InsertData(const TColumnTable& table, TTestHelper::TUpdatesBuilder& updates, const std::function onBeforeCommit = {}, const Ydb::StatusIds_StatusCode& opStatus = Ydb::StatusIds::SUCCESS); void BulkUpsert(const TColumnTable& table, TTestHelper::TUpdatesBuilder& updates, const Ydb::StatusIds_StatusCode& opStatus = Ydb::StatusIds::SUCCESS); void BulkUpsert(const TColumnTable& table, std::shared_ptr batch, const Ydb::StatusIds_StatusCode& opStatus = Ydb::StatusIds::SUCCESS); void ReadData(const TString& query, const TString& expected, const NYdb::EStatus opStatus = NYdb::EStatus::SUCCESS); diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp index f11f62471f92..5b6d5781a536 100644 --- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp @@ -5377,7 +5377,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema)); tableInserter.AddRow().Add(1).Add("test_res_1").AddNull(); tableInserter.AddRow().Add(2).Add("test_res_2").Add(123); - testHelper.InsertData(testTable, tableInserter); + testHelper.BulkUpsert(testTable, tableInserter); } testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;#;[\"test_res_1\"]]]"); @@ -5406,7 +5406,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { { TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema)); tableInserter.AddRow().Add(3).Add("test_res_3").Add(123).Add(200); - testHelper.InsertData(testTable, tableInserter); + testHelper.BulkUpsert(testTable, tableInserter); } testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=3", "[[3;[123];[200u];[\"test_res_3\"]]]"); @@ -5493,7 +5493,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema)); tableInserter.AddRow().Add(1).Add("test_res_1").AddNull(); tableInserter.AddRow().Add(2).Add("test_res_2").Add(123); - testHelper.InsertData(testTable, tableInserter); + testHelper.BulkUpsert(testTable, tableInserter); } testHelper.ReadData("SELECT * FROM `/Root/TableStoreTest/ColumnTableTest` WHERE id=1", "[[1;#;[\"test_res_1\"]]]"); @@ -5523,7 +5523,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { { TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema)); tableInserter.AddRow().Add(3).Add("test_res_3").Add(123).Add(200); - testHelper.InsertData(testTable, tableInserter); + testHelper.BulkUpsert(testTable, tableInserter); } testHelper.ReadData("SELECT * FROM `/Root/TableStoreTest/ColumnTableTest` WHERE id=3", "[[3;[123];[200u];[\"test_res_3\"]]]"); @@ -5579,7 +5579,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schemaWithNull)); tableInserter.AddRow().Add(1).Add("test_res_1").AddNull(); tableInserter.AddRow().Add(2).Add("test_res_2").Add(123); - testHelper.InsertData(testTable, tableInserter, {}, Ydb::StatusIds::GENERIC_ERROR); + testHelper.BulkUpsert(testTable, tableInserter, {}, Ydb::StatusIds::GENERIC_ERROR); } { TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schemaWithNull)); @@ -5610,7 +5610,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema)); tableInserter.AddRow().Add(1).Add("test_res_1").AddNull(); tableInserter.AddRow().Add(2).Add("test_res_2").Add(123); - testHelper.InsertData(testTable, tableInserter); + testHelper.BulkUpsert(testTable, tableInserter); } testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;#;[\"test_res_1\"]]]"); { @@ -5701,7 +5701,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema)); tableInserter.AddRow().Add(1).Add("test_res_1").AddNull(); tableInserter.AddRow().Add(2).Add("test_res_2").Add(123); - testHelper.InsertData(testTable, tableInserter); + testHelper.BulkUpsert(testTable, tableInserter); } testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;#;[\"test_res_1\"]]]"); { diff --git a/ydb/services/ydb/ut/CMakeLists.darwin-arm64.txt b/ydb/services/ydb/ut/CMakeLists.darwin-arm64.txt index ec2af8ae754d..5b40b5c912bb 100644 --- a/ydb/services/ydb/ut/CMakeLists.darwin-arm64.txt +++ b/ydb/services/ydb/ut/CMakeLists.darwin-arm64.txt @@ -62,6 +62,7 @@ target_sources(ydb-services-ydb-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_scripting_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_table_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_stats_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_long_tx_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_logstore_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_olapstore_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_monitoring_ut.cpp diff --git a/ydb/services/ydb/ut/CMakeLists.darwin-x86_64.txt b/ydb/services/ydb/ut/CMakeLists.darwin-x86_64.txt index 236ce002b2ac..7d3a542c5833 100644 --- a/ydb/services/ydb/ut/CMakeLists.darwin-x86_64.txt +++ b/ydb/services/ydb/ut/CMakeLists.darwin-x86_64.txt @@ -63,6 +63,7 @@ target_sources(ydb-services-ydb-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_scripting_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_table_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_stats_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_long_tx_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_logstore_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_olapstore_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_monitoring_ut.cpp diff --git a/ydb/services/ydb/ut/CMakeLists.linux-aarch64.txt b/ydb/services/ydb/ut/CMakeLists.linux-aarch64.txt index 22602791c90c..10f9ea9c59a8 100644 --- a/ydb/services/ydb/ut/CMakeLists.linux-aarch64.txt +++ b/ydb/services/ydb/ut/CMakeLists.linux-aarch64.txt @@ -66,6 +66,7 @@ target_sources(ydb-services-ydb-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_scripting_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_table_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_stats_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_long_tx_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_logstore_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_olapstore_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_monitoring_ut.cpp diff --git a/ydb/services/ydb/ut/CMakeLists.linux-x86_64.txt b/ydb/services/ydb/ut/CMakeLists.linux-x86_64.txt index 500d6550c29a..4262e1a9d7d6 100644 --- a/ydb/services/ydb/ut/CMakeLists.linux-x86_64.txt +++ b/ydb/services/ydb/ut/CMakeLists.linux-x86_64.txt @@ -67,6 +67,7 @@ target_sources(ydb-services-ydb-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_scripting_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_table_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_stats_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_long_tx_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_logstore_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_olapstore_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_monitoring_ut.cpp diff --git a/ydb/services/ydb/ut/CMakeLists.windows-x86_64.txt b/ydb/services/ydb/ut/CMakeLists.windows-x86_64.txt index 99cd79461a8f..fc2ccf0ac109 100644 --- a/ydb/services/ydb/ut/CMakeLists.windows-x86_64.txt +++ b/ydb/services/ydb/ut/CMakeLists.windows-x86_64.txt @@ -56,6 +56,7 @@ target_sources(ydb-services-ydb-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_scripting_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_table_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_stats_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_long_tx_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_logstore_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_olapstore_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_monitoring_ut.cpp diff --git a/ydb/services/ydb/ut/ya.make b/ydb/services/ydb/ut/ya.make index 669e766eed71..7817f096c68a 100644 --- a/ydb/services/ydb/ut/ya.make +++ b/ydb/services/ydb/ut/ya.make @@ -22,6 +22,7 @@ SRCS( ydb_scripting_ut.cpp ydb_table_ut.cpp ydb_stats_ut.cpp + ydb_long_tx_ut.cpp ydb_logstore_ut.cpp ydb_olapstore_ut.cpp ydb_monitoring_ut.cpp diff --git a/ydb/services/ydb/ydb_long_tx_ut.cpp b/ydb/services/ydb/ydb_long_tx_ut.cpp new file mode 100644 index 000000000000..b46b933a12a5 --- /dev/null +++ b/ydb/services/ydb/ydb_long_tx_ut.cpp @@ -0,0 +1,456 @@ +#include "ydb_common_ut.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace NYdb; + +namespace +{ + +static const constexpr char* TestTablePath = TTestOlap::TablePath; + +TString TestBlob() { + auto batch = TTestOlap::SampleBatch(); + return NArrow::NSerialization::TFullDataSerializer(arrow::ipc::IpcWriteOptions::Defaults()).Serialize(batch); +} + +TVector> SplitData(const TString& data, ui32 numBatches) { + auto batch = NArrow::NSerialization::TFullDataDeserializer().Deserialize(data); + UNIT_ASSERT(batch.ok()); + + NSharding::TLogsSharding sharding(numBatches, { "timestamp", "uid" }, numBatches); + std::vector rowSharding = sharding.MakeSharding(*batch); + Y_ABORT_UNLESS(rowSharding.size() == (size_t)batch->get()->num_rows()); + + std::vector> sharded = NArrow::ShardingSplit(*batch, rowSharding, numBatches); + Y_ABORT_UNLESS(sharded.size() == numBatches); + + TVector> out; + for (size_t i = 0; i < numBatches; ++i) { + if (sharded[i]) { + Y_ABORT_UNLESS(sharded[i]->ValidateFull().ok()); + out.emplace_back(sharded[i]); + } + } + + return out; +} + +bool EqualBatches(const TString& x, const TString& y) { + auto schema = TTestOlap::ArrowSchema(); + std::shared_ptr batchX = NArrow::DeserializeBatch(x, schema); + std::shared_ptr batchY = NArrow::DeserializeBatch(y, schema); + Y_ABORT_UNLESS(batchX && batchY); + if ((batchX->num_columns() != batchY->num_columns()) || + (batchX->num_rows() != batchY->num_rows())) { + Cerr << __FILE__ << ':' << __LINE__ << " " + << batchX->num_columns() << ':' << batchX->num_rows() << " vs " + << batchY->num_columns() << ':' << batchY->num_rows() << "\n"; + return false; + } + + for (auto& column : schema->field_names()) { + auto filedX = batchX->schema()->GetFieldByName(column); + auto filedY = batchY->schema()->GetFieldByName(column); + Y_ABORT_UNLESS(filedX->type()->id() == filedY->type()->id()); + + auto arrX = batchX->GetColumnByName(column); + auto arrY = batchY->GetColumnByName(column); + + switch (filedX->type()->id()) { + case arrow::Type::INT32: + if (!NArrow::ArrayEqualValue(arrX, arrY)) { + Cerr << __FILE__ << ':' << __LINE__ << " " << column << "\n"; + return false; + } + break; + case arrow::Type::UINT64: + if (!NArrow::ArrayEqualValue(arrX, arrY)) { + Cerr << __FILE__ << ':' << __LINE__ << " " << column << "\n"; + return false; + } + break; + case arrow::Type::TIMESTAMP: + if (!NArrow::ArrayEqualValue(arrX, arrY)) { + Cerr << __FILE__ << ':' << __LINE__ << " " << column << "\n"; + return false; + } + break; + case arrow::Type::BINARY: + if (!NArrow::ArrayEqualView(arrX, arrY)) { + Cerr << __FILE__ << ':' << __LINE__ << " " << column << "\n"; + return false; + } + break; + case arrow::Type::STRING: + if (!NArrow::ArrayEqualView(arrX, arrY)) { + Cerr << __FILE__ << ':' << __LINE__ << " " << column << "\n"; + return false; + } + break; + + default: + Cerr << __FILE__ << ':' << __LINE__ << "\n"; + return false; + } + } + + return true; +} + +} + + +Y_UNIT_TEST_SUITE(YdbLongTx) { + + Y_UNIT_TEST(BeginWriteCommit) { + NKikimrConfig::TAppConfig appConfig; + TKikimrWithGrpcAndRootSchema server(appConfig); + server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG); + + ui16 grpc = server.GetPort(); + TString location = TStringBuilder() << "localhost:" << grpc; + auto connection = NYdb::TDriver(TDriverConfig().SetEndpoint(location)); + + TTestOlap::CreateTable(*server.ServerSettings); + + NYdb::NLongTx::TClient client(connection); + + NLongTx::TLongTxBeginResult resBeginTx = client.BeginWriteTx().GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS); + + auto txId = resBeginTx.GetResult().tx_id(); + TString data = TestBlob(); + + NLongTx::TLongTxWriteResult resWrite = + client.Write(txId, TestTablePath, "0", data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(resWrite.Status().GetStatus(), EStatus::SUCCESS); + + NLongTx::TLongTxCommitResult resCommitTx = client.CommitTx(txId).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(resCommitTx.Status().GetStatus(), EStatus::SUCCESS); + } + + Y_UNIT_TEST(BeginWriteRollback) { + NKikimrConfig::TAppConfig appConfig; + TKikimrWithGrpcAndRootSchema server(appConfig); + server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG); + + ui16 grpc = server.GetPort(); + TString location = TStringBuilder() << "localhost:" << grpc; + auto connection = NYdb::TDriver(TDriverConfig().SetEndpoint(location)); + + TTestOlap::CreateTable(*server.ServerSettings); + + NYdb::NLongTx::TClient client(connection); + + NLongTx::TLongTxBeginResult resBeginTx = client.BeginWriteTx().GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS); + + auto txId = resBeginTx.GetResult().tx_id(); + TString data = TestBlob(); + + NLongTx::TLongTxWriteResult resWrite = + client.Write(txId, TestTablePath, "0", data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(resWrite.Status().GetStatus(), EStatus::SUCCESS); + + NLongTx::TLongTxRollbackResult resRollbackTx = client.RollbackTx(txId).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(resRollbackTx.Status().GetStatus(), EStatus::SUCCESS); + } + + Y_UNIT_TEST(BeginRead) { + NKikimrConfig::TAppConfig appConfig; + TKikimrWithGrpcAndRootSchema server(appConfig); + server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG); + + ui16 grpc = server.GetPort(); + TString location = TStringBuilder() << "localhost:" << grpc; + auto connection = NYdb::TDriver(TDriverConfig().SetEndpoint(location)); + + TTestOlap::CreateTable(*server.ServerSettings); + + NYdb::NLongTx::TClient client(connection); + + NLongTx::TLongTxBeginResult resBeginTx = client.BeginReadTx().GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS); + + auto txId = resBeginTx.GetResult().tx_id(); + + NLongTx::TLongTxReadResult resRead = client.Read(txId, TestTablePath).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(resRead.Status().GetStatus(), EStatus::SUCCESS); + UNIT_ASSERT_VALUES_EQUAL(resRead.GetResult().data().data(), ""); + } + + Y_UNIT_TEST(WriteThenRead) { + NKikimrConfig::TAppConfig appConfig; + TKikimrWithGrpcAndRootSchema server(appConfig); + server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG); + + ui16 grpc = server.GetPort(); + TString location = TStringBuilder() << "localhost:" << grpc; + auto connection = NYdb::TDriver(TDriverConfig().SetEndpoint(location)); + + TTestOlap::CreateTable(*server.ServerSettings); + + NYdb::NLongTx::TClient client(connection); + + // Read before write + TString beforeWriteTxId; + { + NLongTx::TLongTxBeginResult resBeginTx = client.BeginReadTx().GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS); + + beforeWriteTxId = resBeginTx.GetResult().tx_id(); + + NLongTx::TLongTxReadResult resRead = client.Read(beforeWriteTxId, TestTablePath).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(resRead.Status().GetStatus(), EStatus::SUCCESS); + UNIT_ASSERT_VALUES_EQUAL(resRead.GetResult().data().data(), ""); + } + + // Write + auto batch = TTestOlap::SampleBatch(false, 10000); + const TString data = NArrow::NSerialization::TFullDataSerializer(arrow::ipc::IpcWriteOptions::Defaults()).Serialize(batch); + + { + NLongTx::TLongTxBeginResult resBeginTx = client.BeginWriteTx().GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS); + + auto txId = resBeginTx.GetResult().tx_id(); + + NLongTx::TLongTxWriteResult resWrite = + client.Write(txId, TestTablePath, "0", data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(resWrite.Status().GetStatus(), EStatus::SUCCESS); + + NLongTx::TLongTxCommitResult resCommitTx = client.CommitTx(txId).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(resCommitTx.Status().GetStatus(), EStatus::SUCCESS); + } + + // Read after write + auto sharded = SplitData(data, 2); + UNIT_ASSERT_VALUES_EQUAL(sharded.size(), 2); + UNIT_ASSERT_VALUES_EQUAL(sharded[0]->num_rows(), 4990); + UNIT_ASSERT_VALUES_EQUAL(sharded[1]->num_rows(), 5010); + + TVector expected; + for (auto batch : sharded) { + expected.push_back(NArrow::SerializeBatchNoCompression(batch)); + } + + TVector returned; + { + NLongTx::TLongTxBeginResult resBeginTx = client.BeginReadTx().GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS); + + auto txId = resBeginTx.GetResult().tx_id(); + + NLongTx::TLongTxReadResult resRead = client.Read(txId, TestTablePath).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(resRead.Status().GetStatus(), EStatus::SUCCESS); + returned.push_back(resRead.GetResult().data().data()); + // TODO: read both + + UNIT_ASSERT(EqualBatches(expected[0], returned[0]) || + EqualBatches(expected[1], returned[0])); + } + + // Read before write again + { + NLongTx::TLongTxReadResult resRead = client.Read(beforeWriteTxId, TestTablePath).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(resRead.Status().GetStatus(), EStatus::SUCCESS); + UNIT_ASSERT_VALUES_EQUAL(resRead.GetResult().data().data(), ""); + } + } + + Y_UNIT_TEST(ReadFutureSnapshot) { + NKikimrConfig::TAppConfig appConfig; + TKikimrWithGrpcAndRootSchema server(appConfig); + server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG); + + ui16 grpc = server.GetPort(); + TString location = TStringBuilder() << "localhost:" << grpc; + auto connection = NYdb::TDriver(TDriverConfig().SetEndpoint(location)); + + TTestOlap::CreateTable(*server.ServerSettings, 1); + + NYdb::NLongTx::TClient client(connection); + + TString futureTxId; + NYdb::NLongTx::TClient::TAsyncReadResult futureRead; + { + NLongTx::TLongTxBeginResult resBeginTx = client.BeginReadTx().GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS); + + TString txId = resBeginTx.GetResult().tx_id(); + NLongTxService::TLongTxId parsed; + Y_ABORT_UNLESS(parsed.ParseString(txId)); + Y_ABORT_UNLESS(parsed.Snapshot.Step > 0); + parsed.Snapshot.Step += 2000; // 2 seconds in the future + futureTxId = parsed.ToString(); + // Cerr << "Future txId " << futureTxId << Endl; + + futureRead = client.Read(futureTxId, TestTablePath); + } + + // Write + TString data = TestBlob(); + { + NLongTx::TLongTxBeginResult resBeginTx = client.BeginWriteTx().GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS); + + auto txId = resBeginTx.GetResult().tx_id(); + + NLongTx::TLongTxWriteResult resWrite = + client.Write(txId, TestTablePath, "0", data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(resWrite.Status().GetStatus(), EStatus::SUCCESS); + + NLongTx::TLongTxCommitResult resCommitTx = client.CommitTx(txId).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(resCommitTx.Status().GetStatus(), EStatus::SUCCESS); + } + + // Await read + { + NLongTx::TLongTxReadResult resRead = futureRead.GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(resRead.Status().GetStatus(), EStatus::SUCCESS); + + auto inputBatch = NArrow::NSerialization::TFullDataDeserializer().Deserialize(data); + UNIT_ASSERT(inputBatch.ok()); + auto readBatch = NArrow::NSerialization::TBatchPayloadDeserializer(inputBatch->get()->schema()).Deserialize(resRead.GetResult().data().data()); + UNIT_ASSERT(readBatch.ok()); + UNIT_ASSERT_VALUES_EQUAL(readBatch->get()->ToString(), inputBatch->get()->ToString()); + } + } + + Y_UNIT_TEST(WriteAclChecks) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableDomainsConfig()->MutableSecurityConfig()->SetEnforceUserTokenRequirement(true); + TKikimrWithGrpcAndRootSchema server(appConfig); + server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG); + + ui16 grpc = server.GetPort(); + TString location = TStringBuilder() << "localhost:" << grpc; + auto connection1 = NYdb::TDriver(TDriverConfig() + .SetEndpoint(location) + .SetDatabase("/Root") + .SetAuthToken("user1@builtin")); + auto connection2 = NYdb::TDriver(TDriverConfig() + .SetEndpoint(location) + .SetDatabase("/Root") + .SetAuthToken("user2@builtin")); + + TTestOlap::CreateTable(*server.ServerSettings); + { + TClient annoyingClient(*server.ServerSettings); + annoyingClient.SetSecurityToken("root@builtin"); + NACLib::TDiffACL diff; + diff.AddAccess(NACLib::EAccessType::Allow, NACLib::UpdateRow, "user1@builtin"); + annoyingClient.ModifyACL("/Root/OlapStore", "OlapTable", diff.SerializeAsString()); + } + + // try user1 first + { + NYdb::NLongTx::TClient client(connection1); + + NLongTx::TLongTxBeginResult resBeginTx = client.BeginWriteTx().GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS); + + auto txId = resBeginTx.GetResult().tx_id(); + TString data = TestBlob(); + + NLongTx::TLongTxWriteResult resWrite = + client.Write(txId, TestTablePath, "0", data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(resWrite.Status().GetStatus(), EStatus::SUCCESS); + + NLongTx::TLongTxCommitResult resCommitTx = client.CommitTx(txId).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(resCommitTx.Status().GetStatus(), EStatus::SUCCESS); + } + + // try user2 next + { + NYdb::NLongTx::TClient client(connection2); + + NLongTx::TLongTxBeginResult resBeginTx = client.BeginWriteTx().GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS); + + auto txId = resBeginTx.GetResult().tx_id(); + TString data = TestBlob(); + + NLongTx::TLongTxWriteResult resWrite = + client.Write(txId, TestTablePath, "0", data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(resWrite.Status().GetStatus(), EStatus::UNAUTHORIZED); + + NLongTx::TLongTxRollbackResult resRollbackTx = client.RollbackTx(txId).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(resRollbackTx.Status().GetStatus(), EStatus::SUCCESS); + } + } + + Y_UNIT_TEST(ReadAclChecks) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableDomainsConfig()->MutableSecurityConfig()->SetEnforceUserTokenRequirement(true); + TKikimrWithGrpcAndRootSchema server(appConfig); + server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG); + + ui16 grpc = server.GetPort(); + TString location = TStringBuilder() << "localhost:" << grpc; + auto connection1 = NYdb::TDriver(TDriverConfig() + .SetEndpoint(location) + .SetDatabase("/Root") + .SetAuthToken("user1@builtin")); + auto connection2 = NYdb::TDriver(TDriverConfig() + .SetEndpoint(location) + .SetDatabase("/Root") + .SetAuthToken("user2@builtin")); + + TTestOlap::CreateTable(*server.ServerSettings); + { + TClient annoyingClient(*server.ServerSettings); + annoyingClient.SetSecurityToken("root@builtin"); + NACLib::TDiffACL diff; + diff.AddAccess(NACLib::EAccessType::Allow, NACLib::SelectRow, "user1@builtin"); + annoyingClient.ModifyACL("/Root/OlapStore", "OlapTable", diff.SerializeAsString()); + } + + // try user1 first + { + NYdb::NLongTx::TClient client(connection1); + + NLongTx::TLongTxBeginResult resBeginTx = client.BeginReadTx().GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS); + + auto txId = resBeginTx.GetResult().tx_id(); + + NLongTx::TLongTxReadResult resRead = client.Read(txId, TestTablePath).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(resRead.Status().GetStatus(), EStatus::SUCCESS); + } + + // try user2 next + { + NYdb::NLongTx::TClient client(connection2); + + NLongTx::TLongTxBeginResult resBeginTx = client.BeginReadTx().GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS); + + auto txId = resBeginTx.GetResult().tx_id(); + + NLongTx::TLongTxReadResult resRead = client.Read(txId, TestTablePath).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(resRead.Status().GetStatus(), EStatus::UNAUTHORIZED); + } + } + + Y_UNIT_TEST(CreateOlapWithDirs) { + NKikimrConfig::TAppConfig appConfig; + TKikimrWithGrpcAndRootSchema server(appConfig); + server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG); + + ui16 grpc = server.GetPort(); + TString location = TStringBuilder() << "localhost:" << grpc; + auto connection = NYdb::TDriver(TDriverConfig().SetEndpoint(location)); + + TTestOlap::CreateTable(*server.ServerSettings, 1, "DirA/OlapStore", "DirB/OlapTable"); + } + +} From dfacff4e065f965ed404b1dc405e05ea50e98512 Mon Sep 17 00:00:00 2001 From: Oleg Geller Date: Mon, 15 Jan 2024 13:00:15 +0300 Subject: [PATCH 5/5] fix tests after merge --- ydb/core/kqp/ut/olap/kqp_olap_stats_ut.cpp | 8 ++++---- ydb/core/kqp/ut/olap/kqp_olap_ut.cpp | 2 +- ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/ydb/core/kqp/ut/olap/kqp_olap_stats_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_stats_ut.cpp index 2d34fb23496a..f77b5a019f20 100644 --- a/ydb/core/kqp/ut/olap/kqp_olap_stats_ut.cpp +++ b/ydb/core/kqp/ut/olap/kqp_olap_stats_ut.cpp @@ -48,7 +48,7 @@ Y_UNIT_TEST_SUITE(KqpOlapStats) { tableInserter.AddRow().Add(i).Add("test_res_" + std::to_string(i)).AddNull(); } - testHelper.InsertData(testTable, tableInserter); + testHelper.BulkUpsert(testTable, tableInserter); } Sleep(TDuration::Seconds(1)); @@ -88,7 +88,7 @@ Y_UNIT_TEST_SUITE(KqpOlapStats) { for (size_t i = 0; i < inserted_rows; i++) { tableInserter.AddRow().Add(i).Add("test_res_" + std::to_string(i)).AddNull(); } - testHelper.InsertData(testTable, tableInserter); + testHelper.BulkUpsert(testTable, tableInserter); } Sleep(TDuration::Seconds(1)); @@ -135,7 +135,7 @@ Y_UNIT_TEST_SUITE(KqpOlapStats) { .Add("test_res_" + std::to_string(i + t * tables_in_store)) .AddNull(); } - testHelper.InsertData(testTable, tableInserter); + testHelper.BulkUpsert(testTable, tableInserter); } Sleep(TDuration::Seconds(20)); @@ -155,4 +155,4 @@ Y_UNIT_TEST_SUITE(KqpOlapStats) { } } // namespace NKqp -} // namespace NKikimr \ No newline at end of file +} // namespace NKikimr diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp index 9a630f2ebe3c..4013a0429d16 100644 --- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp @@ -5264,7 +5264,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { tableInserter.AddRow().Add(2).Add("test_res_2").Add("val1").AddNull(); tableInserter.AddRow().Add(3).Add("test_res_3").Add("val3").AddNull(); tableInserter.AddRow().Add(2).Add("test_res_2").Add("val2").AddNull(); - testHelper.InsertData(testTable, tableInserter); + testHelper.BulkUpsert(testTable, tableInserter); } while (csController->GetIndexations().Val() == 0) { Cout << "Wait indexation..." << Endl; diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp index 5b6d5781a536..bef64ef18078 100644 --- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp @@ -5579,7 +5579,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schemaWithNull)); tableInserter.AddRow().Add(1).Add("test_res_1").AddNull(); tableInserter.AddRow().Add(2).Add("test_res_2").Add(123); - testHelper.BulkUpsert(testTable, tableInserter, {}, Ydb::StatusIds::GENERIC_ERROR); + testHelper.BulkUpsert(testTable, tableInserter, Ydb::StatusIds::GENERIC_ERROR); } { TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schemaWithNull));