From bc856901103728d2207deaa428d50c4c604641ee Mon Sep 17 00:00:00 2001 From: Oleg Geller Date: Fri, 12 Jan 2024 16:11:13 +0300 Subject: [PATCH 1/8] =?UTF-8?q?KIKIMR-20714=20=D0=A3=D0=B4=D0=B0=D0=BB?= =?UTF-8?q?=D1=8F=D0=B5=D0=BC=20LongTxService=20=D0=B2=20=D1=82=D0=B5?= =?UTF-8?q?=D1=81=D1=82=D0=B0=D1=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ydb/core/kqp/ut/common/columnshard.cpp | 22 +- ydb/core/kqp/ut/common/columnshard.h | 4 +- ydb/core/kqp/ut/olap/kqp_olap_ut.cpp | 49 +- ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp | 1 - ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp | 13 +- .../ydb/ut/CMakeLists.darwin-arm64.txt | 1 - .../ydb/ut/CMakeLists.darwin-x86_64.txt | 1 - .../ydb/ut/CMakeLists.linux-aarch64.txt | 1 - .../ydb/ut/CMakeLists.linux-x86_64.txt | 1 - .../ydb/ut/CMakeLists.windows-x86_64.txt | 1 - ydb/services/ydb/ut/ya.make | 1 - ydb/services/ydb/ydb_long_tx_ut.cpp | 456 ------------------ 12 files changed, 13 insertions(+), 538 deletions(-) delete mode 100644 ydb/services/ydb/ydb_long_tx_ut.cpp diff --git a/ydb/core/kqp/ut/common/columnshard.cpp b/ydb/core/kqp/ut/common/columnshard.cpp index 10f162f8baa6..cb049857dc32 100644 --- a/ydb/core/kqp/ut/common/columnshard.cpp +++ b/ydb/core/kqp/ut/common/columnshard.cpp @@ -9,7 +9,6 @@ namespace NKqp { TTestHelper::TTestHelper(const TKikimrSettings& settings) : Kikimr(settings) , TableClient(Kikimr.GetTableClient()) - , LongTxClient(Kikimr.GetDriver()) , Session(TableClient.CreateSession().GetValueSync().GetSession()) {} @@ -27,24 +26,9 @@ namespace NKqp { UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); } - void TTestHelper::InsertData(const TColumnTable& table, TTestHelper::TUpdatesBuilder& updates, const std::function onBeforeCommit /*= {}*/, const EStatus opStatus /*= EStatus::SUCCESS*/) { - NLongTx::TLongTxBeginResult resBeginTx = LongTxClient.BeginWriteTx().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(resBeginTx.Status().GetStatus(), EStatus::SUCCESS, resBeginTx.Status().GetIssues().ToString()); - - auto txId = resBeginTx.GetResult().tx_id(); - auto batch = updates.BuildArrow(); - TString data = NArrow::NSerialization::TFullDataSerializer(arrow::ipc::IpcWriteOptions::Defaults()).Serialize(batch); - - NLongTx::TLongTxWriteResult resWrite = - LongTxClient.Write(txId, table.GetName(), txId, data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(resWrite.Status().GetStatus(), opStatus, resWrite.Status().GetIssues().ToString()); - - if (onBeforeCommit) { - onBeforeCommit(); - } - - NLongTx::TLongTxCommitResult resCommitTx = LongTxClient.CommitTx(txId).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(resCommitTx.Status().GetStatus(), EStatus::SUCCESS, resCommitTx.Status().GetIssues().ToString()); + void TTestHelper::InsertData(const TColumnTable& table, TTestHelper::TUpdatesBuilder& updates, const std::function onBeforeCommit /*= {}*/, const Ydb::StatusIds_StatusCode& opStatus /*= Ydb::StatusIds::SUCCESS*/) { + Y_UNUSED(onBeforeCommit); + BulkUpsert(table, updates, opStatus); } void TTestHelper::BulkUpsert(const TColumnTable& table, TTestHelper::TUpdatesBuilder& updates, const Ydb::StatusIds_StatusCode& opStatus /*= Ydb::StatusIds::SUCCESS*/) { diff --git a/ydb/core/kqp/ut/common/columnshard.h b/ydb/core/kqp/ut/common/columnshard.h index e374b973edbf..794ab23825b6 100644 --- a/ydb/core/kqp/ut/common/columnshard.h +++ b/ydb/core/kqp/ut/common/columnshard.h @@ -3,7 +3,6 @@ #include "kqp_ut_common.h" #include #include -#include #include #include #include @@ -64,7 +63,6 @@ namespace NKqp { private: TKikimrRunner Kikimr; NYdb::NTable::TTableClient TableClient; - NYdb::NLongTx::TClient LongTxClient; NYdb::NTable::TSession Session; public: @@ -72,7 +70,7 @@ namespace NKqp { TKikimrRunner& GetKikimr(); NYdb::NTable::TSession& GetSession(); void CreateTable(const TColumnTableBase& table); - void InsertData(const TColumnTable& table, TTestHelper::TUpdatesBuilder& updates, const std::function onBeforeCommit = {}, const NYdb::EStatus opStatus = NYdb::EStatus::SUCCESS); + void InsertData(const TColumnTable& table, TTestHelper::TUpdatesBuilder& updates, const std::function onBeforeCommit = {}, const Ydb::StatusIds_StatusCode& opStatus = Ydb::StatusIds::SUCCESS); void BulkUpsert(const TColumnTable& table, TTestHelper::TUpdatesBuilder& updates, const Ydb::StatusIds_StatusCode& opStatus = Ydb::StatusIds::SUCCESS); void BulkUpsert(const TColumnTable& table, std::shared_ptr batch, const Ydb::StatusIds_StatusCode& opStatus = Ydb::StatusIds::SUCCESS); void ReadData(const TString& query, const TString& expected, const NYdb::EStatus opStatus = NYdb::EStatus::SUCCESS); diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp index 83373dbbca8e..80281f1d2d6c 100644 --- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp @@ -1,5 +1,4 @@ #include -#include #include #include @@ -519,65 +518,23 @@ Y_UNIT_TEST_SUITE(KqpOlap) { void WriteTestData(TKikimrRunner& kikimr, TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) { UNIT_ASSERT(testTable != "/Root/benchTable"); // TODO: check schema instead - TLocalHelper lHelper(kikimr); - NYdb::NLongTx::TClient client(kikimr.GetDriver()); - - NLongTx::TLongTxBeginResult resBeginTx = client.BeginWriteTx().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(resBeginTx.Status().GetStatus(), EStatus::SUCCESS, resBeginTx.Status().GetIssues().ToString()); - - auto txId = resBeginTx.GetResult().tx_id(); auto batch = lHelper.TestArrowBatch(pathIdBegin, tsBegin, rowCount); - - TString data = NArrow::NSerialization::TFullDataSerializer(arrow::ipc::IpcWriteOptions::Defaults()).Serialize(batch); - - NLongTx::TLongTxWriteResult resWrite = - client.Write(txId, testTable, txId, data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(resWrite.Status().GetStatus(), EStatus::SUCCESS, resWrite.Status().GetIssues().ToString()); - - NLongTx::TLongTxCommitResult resCommitTx = client.CommitTx(txId).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(resCommitTx.Status().GetStatus(), EStatus::SUCCESS, resCommitTx.Status().GetIssues().ToString()); + lHelper.SendDataViaActorSystem(testTable, batch); } void WriteTestDataForClickBench(TKikimrRunner& kikimr, TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) { UNIT_ASSERT(testTable == "/Root/benchTable"); // TODO: check schema instead - TClickHelper lHelper(kikimr.GetTestServer()); - NYdb::NLongTx::TClient client(kikimr.GetDriver()); - - NLongTx::TLongTxBeginResult resBeginTx = client.BeginWriteTx().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(resBeginTx.Status().GetStatus(), EStatus::SUCCESS, resBeginTx.Status().GetIssues().ToString()); - - auto txId = resBeginTx.GetResult().tx_id(); auto batch = lHelper.TestArrowBatch(pathIdBegin, tsBegin, rowCount); - TString data = NArrow::NSerialization::TFullDataSerializer(arrow::ipc::IpcWriteOptions::Defaults()).Serialize(batch); - - NLongTx::TLongTxWriteResult resWrite = - client.Write(txId, testTable, txId, data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(resWrite.Status().GetStatus(), EStatus::SUCCESS, resWrite.Status().GetIssues().ToString()); - - NLongTx::TLongTxCommitResult resCommitTx = client.CommitTx(txId).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(resCommitTx.Status().GetStatus(), EStatus::SUCCESS, resCommitTx.Status().GetIssues().ToString()); + lHelper.SendDataViaActorSystem(testTable, batch); } void WriteTestDataForTableWithNulls(TKikimrRunner& kikimr, TString testTable) { UNIT_ASSERT(testTable == "/Root/tableWithNulls"); // TODO: check schema instead TTableWithNullsHelper lHelper(kikimr.GetTestServer()); - NYdb::NLongTx::TClient client(kikimr.GetDriver()); - - NLongTx::TLongTxBeginResult resBeginTx = client.BeginWriteTx().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(resBeginTx.Status().GetStatus(), EStatus::SUCCESS, resBeginTx.Status().GetIssues().ToString()); - - auto txId = resBeginTx.GetResult().tx_id(); auto batch = lHelper.TestArrowBatch(); - TString data = NArrow::NSerialization::TFullDataSerializer(arrow::ipc::IpcWriteOptions::Defaults()).Serialize(batch); - - NLongTx::TLongTxWriteResult resWrite = - client.Write(txId, testTable, txId, data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(resWrite.Status().GetStatus(), EStatus::SUCCESS, resWrite.Status().GetIssues().ToString()); - - NLongTx::TLongTxCommitResult resCommitTx = client.CommitTx(txId).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(resCommitTx.Status().GetStatus(), EStatus::SUCCESS, resCommitTx.Status().GetIssues().ToString()); + lHelper.SendDataViaActorSystem(testTable, batch); } void CreateTableOfAllTypes(TKikimrRunner& kikimr) { diff --git a/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp index 5a8e89b75436..1dbbf79f3198 100644 --- a/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp @@ -5,7 +5,6 @@ #include #include #include -#include #include #include #include diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp index 536595ccf922..d71f4f8080fe 100644 --- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp @@ -5,7 +5,6 @@ #include #include #include -#include #include #include #include @@ -5444,7 +5443,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { } testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;#;#;[\"test_res_1\"]]]"); } - +/* Y_UNIT_TEST(AddColumnOnSchemeChange) { TKikimrSettings runnerSettings; runnerSettings.WithSampleTables = false; @@ -5471,7 +5470,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { } testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;#;#;[\"test_res_1\"]]]"); } - +*/ Y_UNIT_TEST(AddColumnWithStore) { TKikimrSettings runnerSettings; runnerSettings.WithSampleTables = false; @@ -5580,7 +5579,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schemaWithNull)); tableInserter.AddRow().Add(1).Add("test_res_1").AddNull(); tableInserter.AddRow().Add(2).Add("test_res_2").Add(123); - testHelper.InsertData(testTable, tableInserter, {}, EStatus::GENERIC_ERROR); + testHelper.InsertData(testTable, tableInserter, {}, Ydb::StatusIds::GENERIC_ERROR); } { TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schemaWithNull)); @@ -5628,7 +5627,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { } testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` ", "[[1];[2]]"); } - +/* Y_UNIT_TEST(DropColumnOnSchemeChange) { TKikimrSettings runnerSettings; runnerSettings.WithSampleTables = false; @@ -5654,7 +5653,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { } testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;#]]"); } - +*/ Y_UNIT_TEST(DropColumnOldScheme) { TKikimrSettings runnerSettings; runnerSettings.WithSampleTables = false; @@ -5678,7 +5677,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { { TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema)); tableInserter.AddRow().Add(1).Add("test_res_1").AddNull(); - testHelper.InsertData(testTable, tableInserter, {}, EStatus::SUCCESS); + testHelper.InsertData(testTable, tableInserter); } // testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;#]]"); } diff --git a/ydb/services/ydb/ut/CMakeLists.darwin-arm64.txt b/ydb/services/ydb/ut/CMakeLists.darwin-arm64.txt index 5b40b5c912bb..ec2af8ae754d 100644 --- a/ydb/services/ydb/ut/CMakeLists.darwin-arm64.txt +++ b/ydb/services/ydb/ut/CMakeLists.darwin-arm64.txt @@ -62,7 +62,6 @@ target_sources(ydb-services-ydb-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_scripting_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_table_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_stats_ut.cpp - ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_long_tx_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_logstore_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_olapstore_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_monitoring_ut.cpp diff --git a/ydb/services/ydb/ut/CMakeLists.darwin-x86_64.txt b/ydb/services/ydb/ut/CMakeLists.darwin-x86_64.txt index 7d3a542c5833..236ce002b2ac 100644 --- a/ydb/services/ydb/ut/CMakeLists.darwin-x86_64.txt +++ b/ydb/services/ydb/ut/CMakeLists.darwin-x86_64.txt @@ -63,7 +63,6 @@ target_sources(ydb-services-ydb-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_scripting_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_table_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_stats_ut.cpp - ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_long_tx_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_logstore_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_olapstore_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_monitoring_ut.cpp diff --git a/ydb/services/ydb/ut/CMakeLists.linux-aarch64.txt b/ydb/services/ydb/ut/CMakeLists.linux-aarch64.txt index 10f9ea9c59a8..22602791c90c 100644 --- a/ydb/services/ydb/ut/CMakeLists.linux-aarch64.txt +++ b/ydb/services/ydb/ut/CMakeLists.linux-aarch64.txt @@ -66,7 +66,6 @@ target_sources(ydb-services-ydb-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_scripting_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_table_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_stats_ut.cpp - ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_long_tx_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_logstore_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_olapstore_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_monitoring_ut.cpp diff --git a/ydb/services/ydb/ut/CMakeLists.linux-x86_64.txt b/ydb/services/ydb/ut/CMakeLists.linux-x86_64.txt index 4262e1a9d7d6..500d6550c29a 100644 --- a/ydb/services/ydb/ut/CMakeLists.linux-x86_64.txt +++ b/ydb/services/ydb/ut/CMakeLists.linux-x86_64.txt @@ -67,7 +67,6 @@ target_sources(ydb-services-ydb-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_scripting_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_table_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_stats_ut.cpp - ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_long_tx_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_logstore_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_olapstore_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_monitoring_ut.cpp diff --git a/ydb/services/ydb/ut/CMakeLists.windows-x86_64.txt b/ydb/services/ydb/ut/CMakeLists.windows-x86_64.txt index fc2ccf0ac109..99cd79461a8f 100644 --- a/ydb/services/ydb/ut/CMakeLists.windows-x86_64.txt +++ b/ydb/services/ydb/ut/CMakeLists.windows-x86_64.txt @@ -56,7 +56,6 @@ target_sources(ydb-services-ydb-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_scripting_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_table_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_stats_ut.cpp - ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_long_tx_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_logstore_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_olapstore_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_monitoring_ut.cpp diff --git a/ydb/services/ydb/ut/ya.make b/ydb/services/ydb/ut/ya.make index 7817f096c68a..669e766eed71 100644 --- a/ydb/services/ydb/ut/ya.make +++ b/ydb/services/ydb/ut/ya.make @@ -22,7 +22,6 @@ SRCS( ydb_scripting_ut.cpp ydb_table_ut.cpp ydb_stats_ut.cpp - ydb_long_tx_ut.cpp ydb_logstore_ut.cpp ydb_olapstore_ut.cpp ydb_monitoring_ut.cpp diff --git a/ydb/services/ydb/ydb_long_tx_ut.cpp b/ydb/services/ydb/ydb_long_tx_ut.cpp deleted file mode 100644 index b46b933a12a5..000000000000 --- a/ydb/services/ydb/ydb_long_tx_ut.cpp +++ /dev/null @@ -1,456 +0,0 @@ -#include "ydb_common_ut.h" - -#include -#include -#include -#include -#include -#include -#include -#include - -using namespace NYdb; - -namespace -{ - -static const constexpr char* TestTablePath = TTestOlap::TablePath; - -TString TestBlob() { - auto batch = TTestOlap::SampleBatch(); - return NArrow::NSerialization::TFullDataSerializer(arrow::ipc::IpcWriteOptions::Defaults()).Serialize(batch); -} - -TVector> SplitData(const TString& data, ui32 numBatches) { - auto batch = NArrow::NSerialization::TFullDataDeserializer().Deserialize(data); - UNIT_ASSERT(batch.ok()); - - NSharding::TLogsSharding sharding(numBatches, { "timestamp", "uid" }, numBatches); - std::vector rowSharding = sharding.MakeSharding(*batch); - Y_ABORT_UNLESS(rowSharding.size() == (size_t)batch->get()->num_rows()); - - std::vector> sharded = NArrow::ShardingSplit(*batch, rowSharding, numBatches); - Y_ABORT_UNLESS(sharded.size() == numBatches); - - TVector> out; - for (size_t i = 0; i < numBatches; ++i) { - if (sharded[i]) { - Y_ABORT_UNLESS(sharded[i]->ValidateFull().ok()); - out.emplace_back(sharded[i]); - } - } - - return out; -} - -bool EqualBatches(const TString& x, const TString& y) { - auto schema = TTestOlap::ArrowSchema(); - std::shared_ptr batchX = NArrow::DeserializeBatch(x, schema); - std::shared_ptr batchY = NArrow::DeserializeBatch(y, schema); - Y_ABORT_UNLESS(batchX && batchY); - if ((batchX->num_columns() != batchY->num_columns()) || - (batchX->num_rows() != batchY->num_rows())) { - Cerr << __FILE__ << ':' << __LINE__ << " " - << batchX->num_columns() << ':' << batchX->num_rows() << " vs " - << batchY->num_columns() << ':' << batchY->num_rows() << "\n"; - return false; - } - - for (auto& column : schema->field_names()) { - auto filedX = batchX->schema()->GetFieldByName(column); - auto filedY = batchY->schema()->GetFieldByName(column); - Y_ABORT_UNLESS(filedX->type()->id() == filedY->type()->id()); - - auto arrX = batchX->GetColumnByName(column); - auto arrY = batchY->GetColumnByName(column); - - switch (filedX->type()->id()) { - case arrow::Type::INT32: - if (!NArrow::ArrayEqualValue(arrX, arrY)) { - Cerr << __FILE__ << ':' << __LINE__ << " " << column << "\n"; - return false; - } - break; - case arrow::Type::UINT64: - if (!NArrow::ArrayEqualValue(arrX, arrY)) { - Cerr << __FILE__ << ':' << __LINE__ << " " << column << "\n"; - return false; - } - break; - case arrow::Type::TIMESTAMP: - if (!NArrow::ArrayEqualValue(arrX, arrY)) { - Cerr << __FILE__ << ':' << __LINE__ << " " << column << "\n"; - return false; - } - break; - case arrow::Type::BINARY: - if (!NArrow::ArrayEqualView(arrX, arrY)) { - Cerr << __FILE__ << ':' << __LINE__ << " " << column << "\n"; - return false; - } - break; - case arrow::Type::STRING: - if (!NArrow::ArrayEqualView(arrX, arrY)) { - Cerr << __FILE__ << ':' << __LINE__ << " " << column << "\n"; - return false; - } - break; - - default: - Cerr << __FILE__ << ':' << __LINE__ << "\n"; - return false; - } - } - - return true; -} - -} - - -Y_UNIT_TEST_SUITE(YdbLongTx) { - - Y_UNIT_TEST(BeginWriteCommit) { - NKikimrConfig::TAppConfig appConfig; - TKikimrWithGrpcAndRootSchema server(appConfig); - server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG); - - ui16 grpc = server.GetPort(); - TString location = TStringBuilder() << "localhost:" << grpc; - auto connection = NYdb::TDriver(TDriverConfig().SetEndpoint(location)); - - TTestOlap::CreateTable(*server.ServerSettings); - - NYdb::NLongTx::TClient client(connection); - - NLongTx::TLongTxBeginResult resBeginTx = client.BeginWriteTx().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS); - - auto txId = resBeginTx.GetResult().tx_id(); - TString data = TestBlob(); - - NLongTx::TLongTxWriteResult resWrite = - client.Write(txId, TestTablePath, "0", data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resWrite.Status().GetStatus(), EStatus::SUCCESS); - - NLongTx::TLongTxCommitResult resCommitTx = client.CommitTx(txId).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resCommitTx.Status().GetStatus(), EStatus::SUCCESS); - } - - Y_UNIT_TEST(BeginWriteRollback) { - NKikimrConfig::TAppConfig appConfig; - TKikimrWithGrpcAndRootSchema server(appConfig); - server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG); - - ui16 grpc = server.GetPort(); - TString location = TStringBuilder() << "localhost:" << grpc; - auto connection = NYdb::TDriver(TDriverConfig().SetEndpoint(location)); - - TTestOlap::CreateTable(*server.ServerSettings); - - NYdb::NLongTx::TClient client(connection); - - NLongTx::TLongTxBeginResult resBeginTx = client.BeginWriteTx().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS); - - auto txId = resBeginTx.GetResult().tx_id(); - TString data = TestBlob(); - - NLongTx::TLongTxWriteResult resWrite = - client.Write(txId, TestTablePath, "0", data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resWrite.Status().GetStatus(), EStatus::SUCCESS); - - NLongTx::TLongTxRollbackResult resRollbackTx = client.RollbackTx(txId).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resRollbackTx.Status().GetStatus(), EStatus::SUCCESS); - } - - Y_UNIT_TEST(BeginRead) { - NKikimrConfig::TAppConfig appConfig; - TKikimrWithGrpcAndRootSchema server(appConfig); - server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG); - - ui16 grpc = server.GetPort(); - TString location = TStringBuilder() << "localhost:" << grpc; - auto connection = NYdb::TDriver(TDriverConfig().SetEndpoint(location)); - - TTestOlap::CreateTable(*server.ServerSettings); - - NYdb::NLongTx::TClient client(connection); - - NLongTx::TLongTxBeginResult resBeginTx = client.BeginReadTx().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS); - - auto txId = resBeginTx.GetResult().tx_id(); - - NLongTx::TLongTxReadResult resRead = client.Read(txId, TestTablePath).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resRead.Status().GetStatus(), EStatus::SUCCESS); - UNIT_ASSERT_VALUES_EQUAL(resRead.GetResult().data().data(), ""); - } - - Y_UNIT_TEST(WriteThenRead) { - NKikimrConfig::TAppConfig appConfig; - TKikimrWithGrpcAndRootSchema server(appConfig); - server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG); - - ui16 grpc = server.GetPort(); - TString location = TStringBuilder() << "localhost:" << grpc; - auto connection = NYdb::TDriver(TDriverConfig().SetEndpoint(location)); - - TTestOlap::CreateTable(*server.ServerSettings); - - NYdb::NLongTx::TClient client(connection); - - // Read before write - TString beforeWriteTxId; - { - NLongTx::TLongTxBeginResult resBeginTx = client.BeginReadTx().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS); - - beforeWriteTxId = resBeginTx.GetResult().tx_id(); - - NLongTx::TLongTxReadResult resRead = client.Read(beforeWriteTxId, TestTablePath).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resRead.Status().GetStatus(), EStatus::SUCCESS); - UNIT_ASSERT_VALUES_EQUAL(resRead.GetResult().data().data(), ""); - } - - // Write - auto batch = TTestOlap::SampleBatch(false, 10000); - const TString data = NArrow::NSerialization::TFullDataSerializer(arrow::ipc::IpcWriteOptions::Defaults()).Serialize(batch); - - { - NLongTx::TLongTxBeginResult resBeginTx = client.BeginWriteTx().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS); - - auto txId = resBeginTx.GetResult().tx_id(); - - NLongTx::TLongTxWriteResult resWrite = - client.Write(txId, TestTablePath, "0", data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resWrite.Status().GetStatus(), EStatus::SUCCESS); - - NLongTx::TLongTxCommitResult resCommitTx = client.CommitTx(txId).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resCommitTx.Status().GetStatus(), EStatus::SUCCESS); - } - - // Read after write - auto sharded = SplitData(data, 2); - UNIT_ASSERT_VALUES_EQUAL(sharded.size(), 2); - UNIT_ASSERT_VALUES_EQUAL(sharded[0]->num_rows(), 4990); - UNIT_ASSERT_VALUES_EQUAL(sharded[1]->num_rows(), 5010); - - TVector expected; - for (auto batch : sharded) { - expected.push_back(NArrow::SerializeBatchNoCompression(batch)); - } - - TVector returned; - { - NLongTx::TLongTxBeginResult resBeginTx = client.BeginReadTx().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS); - - auto txId = resBeginTx.GetResult().tx_id(); - - NLongTx::TLongTxReadResult resRead = client.Read(txId, TestTablePath).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resRead.Status().GetStatus(), EStatus::SUCCESS); - returned.push_back(resRead.GetResult().data().data()); - // TODO: read both - - UNIT_ASSERT(EqualBatches(expected[0], returned[0]) || - EqualBatches(expected[1], returned[0])); - } - - // Read before write again - { - NLongTx::TLongTxReadResult resRead = client.Read(beforeWriteTxId, TestTablePath).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resRead.Status().GetStatus(), EStatus::SUCCESS); - UNIT_ASSERT_VALUES_EQUAL(resRead.GetResult().data().data(), ""); - } - } - - Y_UNIT_TEST(ReadFutureSnapshot) { - NKikimrConfig::TAppConfig appConfig; - TKikimrWithGrpcAndRootSchema server(appConfig); - server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG); - - ui16 grpc = server.GetPort(); - TString location = TStringBuilder() << "localhost:" << grpc; - auto connection = NYdb::TDriver(TDriverConfig().SetEndpoint(location)); - - TTestOlap::CreateTable(*server.ServerSettings, 1); - - NYdb::NLongTx::TClient client(connection); - - TString futureTxId; - NYdb::NLongTx::TClient::TAsyncReadResult futureRead; - { - NLongTx::TLongTxBeginResult resBeginTx = client.BeginReadTx().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS); - - TString txId = resBeginTx.GetResult().tx_id(); - NLongTxService::TLongTxId parsed; - Y_ABORT_UNLESS(parsed.ParseString(txId)); - Y_ABORT_UNLESS(parsed.Snapshot.Step > 0); - parsed.Snapshot.Step += 2000; // 2 seconds in the future - futureTxId = parsed.ToString(); - // Cerr << "Future txId " << futureTxId << Endl; - - futureRead = client.Read(futureTxId, TestTablePath); - } - - // Write - TString data = TestBlob(); - { - NLongTx::TLongTxBeginResult resBeginTx = client.BeginWriteTx().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS); - - auto txId = resBeginTx.GetResult().tx_id(); - - NLongTx::TLongTxWriteResult resWrite = - client.Write(txId, TestTablePath, "0", data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resWrite.Status().GetStatus(), EStatus::SUCCESS); - - NLongTx::TLongTxCommitResult resCommitTx = client.CommitTx(txId).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resCommitTx.Status().GetStatus(), EStatus::SUCCESS); - } - - // Await read - { - NLongTx::TLongTxReadResult resRead = futureRead.GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resRead.Status().GetStatus(), EStatus::SUCCESS); - - auto inputBatch = NArrow::NSerialization::TFullDataDeserializer().Deserialize(data); - UNIT_ASSERT(inputBatch.ok()); - auto readBatch = NArrow::NSerialization::TBatchPayloadDeserializer(inputBatch->get()->schema()).Deserialize(resRead.GetResult().data().data()); - UNIT_ASSERT(readBatch.ok()); - UNIT_ASSERT_VALUES_EQUAL(readBatch->get()->ToString(), inputBatch->get()->ToString()); - } - } - - Y_UNIT_TEST(WriteAclChecks) { - NKikimrConfig::TAppConfig appConfig; - appConfig.MutableDomainsConfig()->MutableSecurityConfig()->SetEnforceUserTokenRequirement(true); - TKikimrWithGrpcAndRootSchema server(appConfig); - server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG); - - ui16 grpc = server.GetPort(); - TString location = TStringBuilder() << "localhost:" << grpc; - auto connection1 = NYdb::TDriver(TDriverConfig() - .SetEndpoint(location) - .SetDatabase("/Root") - .SetAuthToken("user1@builtin")); - auto connection2 = NYdb::TDriver(TDriverConfig() - .SetEndpoint(location) - .SetDatabase("/Root") - .SetAuthToken("user2@builtin")); - - TTestOlap::CreateTable(*server.ServerSettings); - { - TClient annoyingClient(*server.ServerSettings); - annoyingClient.SetSecurityToken("root@builtin"); - NACLib::TDiffACL diff; - diff.AddAccess(NACLib::EAccessType::Allow, NACLib::UpdateRow, "user1@builtin"); - annoyingClient.ModifyACL("/Root/OlapStore", "OlapTable", diff.SerializeAsString()); - } - - // try user1 first - { - NYdb::NLongTx::TClient client(connection1); - - NLongTx::TLongTxBeginResult resBeginTx = client.BeginWriteTx().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS); - - auto txId = resBeginTx.GetResult().tx_id(); - TString data = TestBlob(); - - NLongTx::TLongTxWriteResult resWrite = - client.Write(txId, TestTablePath, "0", data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resWrite.Status().GetStatus(), EStatus::SUCCESS); - - NLongTx::TLongTxCommitResult resCommitTx = client.CommitTx(txId).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resCommitTx.Status().GetStatus(), EStatus::SUCCESS); - } - - // try user2 next - { - NYdb::NLongTx::TClient client(connection2); - - NLongTx::TLongTxBeginResult resBeginTx = client.BeginWriteTx().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS); - - auto txId = resBeginTx.GetResult().tx_id(); - TString data = TestBlob(); - - NLongTx::TLongTxWriteResult resWrite = - client.Write(txId, TestTablePath, "0", data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resWrite.Status().GetStatus(), EStatus::UNAUTHORIZED); - - NLongTx::TLongTxRollbackResult resRollbackTx = client.RollbackTx(txId).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resRollbackTx.Status().GetStatus(), EStatus::SUCCESS); - } - } - - Y_UNIT_TEST(ReadAclChecks) { - NKikimrConfig::TAppConfig appConfig; - appConfig.MutableDomainsConfig()->MutableSecurityConfig()->SetEnforceUserTokenRequirement(true); - TKikimrWithGrpcAndRootSchema server(appConfig); - server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG); - - ui16 grpc = server.GetPort(); - TString location = TStringBuilder() << "localhost:" << grpc; - auto connection1 = NYdb::TDriver(TDriverConfig() - .SetEndpoint(location) - .SetDatabase("/Root") - .SetAuthToken("user1@builtin")); - auto connection2 = NYdb::TDriver(TDriverConfig() - .SetEndpoint(location) - .SetDatabase("/Root") - .SetAuthToken("user2@builtin")); - - TTestOlap::CreateTable(*server.ServerSettings); - { - TClient annoyingClient(*server.ServerSettings); - annoyingClient.SetSecurityToken("root@builtin"); - NACLib::TDiffACL diff; - diff.AddAccess(NACLib::EAccessType::Allow, NACLib::SelectRow, "user1@builtin"); - annoyingClient.ModifyACL("/Root/OlapStore", "OlapTable", diff.SerializeAsString()); - } - - // try user1 first - { - NYdb::NLongTx::TClient client(connection1); - - NLongTx::TLongTxBeginResult resBeginTx = client.BeginReadTx().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS); - - auto txId = resBeginTx.GetResult().tx_id(); - - NLongTx::TLongTxReadResult resRead = client.Read(txId, TestTablePath).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resRead.Status().GetStatus(), EStatus::SUCCESS); - } - - // try user2 next - { - NYdb::NLongTx::TClient client(connection2); - - NLongTx::TLongTxBeginResult resBeginTx = client.BeginReadTx().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS); - - auto txId = resBeginTx.GetResult().tx_id(); - - NLongTx::TLongTxReadResult resRead = client.Read(txId, TestTablePath).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resRead.Status().GetStatus(), EStatus::UNAUTHORIZED); - } - } - - Y_UNIT_TEST(CreateOlapWithDirs) { - NKikimrConfig::TAppConfig appConfig; - TKikimrWithGrpcAndRootSchema server(appConfig); - server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG); - - ui16 grpc = server.GetPort(); - TString location = TStringBuilder() << "localhost:" << grpc; - auto connection = NYdb::TDriver(TDriverConfig().SetEndpoint(location)); - - TTestOlap::CreateTable(*server.ServerSettings, 1, "DirA/OlapStore", "DirB/OlapTable"); - } - -} From 408d79741c7ab49ce0ad03516a0f2a0cd405fc9e Mon Sep 17 00:00:00 2001 From: Oleg Geller Date: Fri, 12 Jan 2024 22:00:25 +0300 Subject: [PATCH 2/8] fix tests --- ydb/core/testlib/cs_helper.cpp | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/ydb/core/testlib/cs_helper.cpp b/ydb/core/testlib/cs_helper.cpp index a2cb1668991b..4cfa02c554ab 100644 --- a/ydb/core/testlib/cs_helper.cpp +++ b/ydb/core/testlib/cs_helper.cpp @@ -418,15 +418,14 @@ std::shared_ptr TTableWithNullsHelper::TestArrowBatch(ui64, Y_ABORT_UNLESS(bJsonDoc.AppendNull().ok()); } - auto maybeJsonDoc = NBinaryJson::SerializeToBinaryJson(R"({"col1": "val1", "obj": {"obj_col2_int": 16}})"); - Y_ABORT_UNLESS(maybeJsonDoc.Defined()); + const auto maybeJsonDoc = std::string(R"({"col1": "val1", "obj": {"obj_col2_int": 16}})"); for (size_t i = rowCount / 2 + 1; i <= rowCount; ++i) { Y_ABORT_UNLESS(bId.Append(i).ok()); Y_ABORT_UNLESS(bResourceId.Append(std::to_string(i)).ok()); Y_ABORT_UNLESS(bLevel.AppendNull().ok()); Y_ABORT_UNLESS(bBinaryStr.Append(std::to_string(i)).ok()); Y_ABORT_UNLESS(bJsonVal.AppendNull().ok()); - Y_ABORT_UNLESS(bJsonDoc.Append(maybeJsonDoc->Data(), maybeJsonDoc->Size()).ok()); + Y_ABORT_UNLESS(bJsonDoc.Append(maybeJsonDoc.data(), maybeJsonDoc.length()).ok()); } std::shared_ptr aId; From 9222552b593de3ebfc93e6ddb4153229dda58bdc Mon Sep 17 00:00:00 2001 From: Oleg Geller Date: Sat, 13 Jan 2024 09:41:03 +0300 Subject: [PATCH 3/8] Remove test DropColumnOldScheme --- ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp | 27 ------------------------ 1 file changed, 27 deletions(-) diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp index d71f4f8080fe..f11f62471f92 100644 --- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp @@ -5654,33 +5654,6 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;#]]"); } */ - Y_UNIT_TEST(DropColumnOldScheme) { - TKikimrSettings runnerSettings; - runnerSettings.WithSampleTables = false; - TTestHelper testHelper(runnerSettings); - - TVector schema = { - TTestHelper::TColumnSchema().SetName("id").SetType(NScheme::NTypeIds::Int32).SetNullable(false), - TTestHelper::TColumnSchema().SetName("resource_id").SetType(NScheme::NTypeIds::Utf8), - TTestHelper::TColumnSchema().SetName("level").SetType(NScheme::NTypeIds::Int32) - }; - - TTestHelper::TColumnTable testTable; - - testTable.SetName("/Root/ColumnTableTest").SetPrimaryKey({"id"}).SetSharding({"id"}).SetSchema(schema); - testHelper.CreateTable(testTable); - { - auto alterQuery = TStringBuilder() << "ALTER TABLE `" << testTable.GetName() << "`DROP COLUMN resource_id;"; - auto alterResult = testHelper.GetSession().ExecuteSchemeQuery(alterQuery).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString()); - } - { - TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema)); - tableInserter.AddRow().Add(1).Add("test_res_1").AddNull(); - testHelper.InsertData(testTable, tableInserter); - } - // testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;#]]"); - } Y_UNIT_TEST(DropColumnOldSchemeBulkUpsert) { TKikimrSettings runnerSettings; From e88b8f0a1a2f356bb36fe43d5fc57c5faee74740 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9E=D0=BB=D0=B5=D0=B3?= Date: Tue, 16 Jan 2024 16:48:20 +0300 Subject: [PATCH 4/8] =?UTF-8?q?KIKIMR-20714:=20=D0=A3=D0=B4=D0=B0=D0=BB?= =?UTF-8?q?=D1=8F=D0=B5=D0=BC=20LongTxService=20grpc?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ydb/core/grpc_services/rpc_long_tx.cpp | 373 ------------------ ydb/core/grpc_services/service_longtx.h | 16 - .../grpc/draft/CMakeLists.darwin-arm64.txt | 1 - .../grpc/draft/CMakeLists.darwin-x86_64.txt | 1 - .../grpc/draft/CMakeLists.linux-aarch64.txt | 1 - .../grpc/draft/CMakeLists.linux-x86_64.txt | 1 - .../grpc/draft/CMakeLists.windows-x86_64.txt | 1 - ydb/public/api/grpc/draft/ya.make | 1 - .../api/grpc/draft/ydb_long_tx_v1.proto | 14 - .../api/protos/CMakeLists.darwin-arm64.txt | 1 - .../api/protos/CMakeLists.darwin-x86_64.txt | 1 - .../api/protos/CMakeLists.linux-aarch64.txt | 1 - .../api/protos/CMakeLists.linux-x86_64.txt | 1 - .../api/protos/CMakeLists.windows-x86_64.txt | 1 - ydb/public/api/protos/draft/ydb_long_tx.proto | 89 ----- ydb/public/api/protos/ya.make | 1 - .../client/draft/CMakeLists.darwin-arm64.txt | 1 - .../client/draft/CMakeLists.darwin-x86_64.txt | 1 - .../client/draft/CMakeLists.linux-aarch64.txt | 1 - .../client/draft/CMakeLists.linux-x86_64.txt | 1 - .../draft/CMakeLists.windows-x86_64.txt | 1 - ydb/public/sdk/cpp/client/draft/ya.make | 1 - .../sdk/cpp/client/draft/ydb_long_tx.cpp | 105 ----- ydb/public/sdk/cpp/client/draft/ydb_long_tx.h | 103 ----- ydb/services/ydb/CMakeLists.darwin-arm64.txt | 1 - ydb/services/ydb/CMakeLists.darwin-x86_64.txt | 1 - ydb/services/ydb/CMakeLists.linux-aarch64.txt | 1 - ydb/services/ydb/CMakeLists.linux-x86_64.txt | 1 - .../ydb/CMakeLists.windows-x86_64.txt | 1 - ydb/services/ydb/ya.make | 1 - ydb/services/ydb/ydb_long_tx.cpp | 35 -- ydb/services/ydb/ydb_long_tx.h | 21 - 32 files changed, 780 deletions(-) delete mode 100644 ydb/core/grpc_services/service_longtx.h delete mode 100644 ydb/public/api/grpc/draft/ydb_long_tx_v1.proto delete mode 100644 ydb/public/api/protos/draft/ydb_long_tx.proto delete mode 100644 ydb/public/sdk/cpp/client/draft/ydb_long_tx.cpp delete mode 100644 ydb/public/sdk/cpp/client/draft/ydb_long_tx.h delete mode 100644 ydb/services/ydb/ydb_long_tx.cpp delete mode 100644 ydb/services/ydb/ydb_long_tx.h diff --git a/ydb/core/grpc_services/rpc_long_tx.cpp b/ydb/core/grpc_services/rpc_long_tx.cpp index 80e9d7e76821..59dc26c2990d 100644 --- a/ydb/core/grpc_services/rpc_long_tx.cpp +++ b/ydb/core/grpc_services/rpc_long_tx.cpp @@ -1,8 +1,5 @@ #include "rpc_common/rpc_common.h" #include "rpc_deferrable.h" -#include "service_longtx.h" - -#include #include #include @@ -23,235 +20,10 @@ namespace NKikimr { -namespace { - -using TEvLongTxBeginRequest = NGRpcService::TGrpcRequestOperationCall; -using TEvLongTxCommitRequest = NGRpcService::TGrpcRequestOperationCall; -using TEvLongTxRollbackRequest = NGRpcService::TGrpcRequestOperationCall; -using TEvLongTxWriteRequest = NGRpcService::TGrpcRequestOperationCall; - -} - namespace NGRpcService { using namespace NActors; using namespace NLongTxService; -class TLongTxBeginRPC : public TActorBootstrapped { - using TBase = TActorBootstrapped; - -public: - static constexpr NKikimrServices::TActivity::EType ActorActivityType() { - return NKikimrServices::TActivity::GRPC_REQ; - } - - explicit TLongTxBeginRPC(std::unique_ptr request) - : TBase() - , Request(std::move(request)) - , DatabaseName(Request->GetDatabaseName().GetOrElse(DatabaseFromDomain(AppData()))) - {} - - void Bootstrap() { - const auto* req = TEvLongTxBeginRequest::GetProtoRequest(Request); - - NKikimrLongTxService::TEvBeginTx::EMode mode = {}; - switch (req->tx_type()) { - case Ydb::LongTx::BeginTransactionRequest::READ: - mode = NKikimrLongTxService::TEvBeginTx::MODE_READ_ONLY; - break; - case Ydb::LongTx::BeginTransactionRequest::WRITE: - mode = NKikimrLongTxService::TEvBeginTx::MODE_WRITE_ONLY; - break; - default: - // TODO: report error - break; - } - - Send(MakeLongTxServiceID(SelfId().NodeId()), new TEvLongTxService::TEvBeginTx(DatabaseName, mode)); - Become(&TThis::StateWork); - } - -private: - STFUNC(StateWork) { - switch (ev->GetTypeRewrite()) { - hFunc(TEvLongTxService::TEvBeginTxResult, Handle); - } - } - - void Handle(TEvLongTxService::TEvBeginTxResult::TPtr& ev) { - const auto* msg = ev->Get(); - - if (msg->Record.GetStatus() != Ydb::StatusIds::SUCCESS) { - NYql::TIssues issues; - NYql::IssuesFromMessage(msg->Record.GetIssues(), issues); - if (issues) { - Request->RaiseIssues(std::move(issues)); - } - Request->ReplyWithYdbStatus(msg->Record.GetStatus()); - return PassAway(); - } - - Ydb::LongTx::BeginTransactionResult result; - result.set_tx_id(msg->GetLongTxId().ToString()); - ReplySuccess(result); - } - - void ReplySuccess(const Ydb::LongTx::BeginTransactionResult& result) { - Request->SendResult(result, Ydb::StatusIds::SUCCESS); - PassAway(); - } - -private: - std::unique_ptr Request; - TString DatabaseName; -}; - -// - -class TLongTxCommitRPC : public TActorBootstrapped { - using TBase = TActorBootstrapped; -public: - static constexpr NKikimrServices::TActivity::EType ActorActivityType() { - return NKikimrServices::TActivity::GRPC_REQ; - } - - explicit TLongTxCommitRPC(std::unique_ptr request) - : TBase() - , Request(std::move(request)) - { - } - - void Bootstrap() { - const auto* req = TEvLongTxCommitRequest::GetProtoRequest(Request); - - TString errMsg; - if (!LongTxId.ParseString(req->tx_id(), &errMsg)) { - return ReplyError(Ydb::StatusIds::BAD_REQUEST, errMsg); - } - - Send(MakeLongTxServiceID(SelfId().NodeId()), new TEvLongTxService::TEvCommitTx(LongTxId)); - Become(&TThis::StateWork); - } - -private: - STFUNC(StateWork) { - switch (ev->GetTypeRewrite()) { - hFunc(TEvLongTxService::TEvCommitTxResult, Handle); - } - } - - void Handle(TEvLongTxService::TEvCommitTxResult::TPtr& ev) { - const auto* msg = ev->Get(); - - if (msg->Record.GetStatus() != Ydb::StatusIds::SUCCESS) { - NYql::TIssues issues; - NYql::IssuesFromMessage(msg->Record.GetIssues(), issues); - if (issues) { - Request->RaiseIssues(std::move(issues)); - } - Request->ReplyWithYdbStatus(msg->Record.GetStatus()); - return PassAway(); - } - - Ydb::LongTx::CommitTransactionResult result; - const auto* req = TEvLongTxCommitRequest::GetProtoRequest(Request); - result.set_tx_id(req->tx_id()); - ReplySuccess(result); - } - - void ReplyError(Ydb::StatusIds::StatusCode status, const TString& message) { - if (!message.empty()) { - Request->RaiseIssue(NYql::TIssue(message)); - } - Request->ReplyWithYdbStatus(status); - PassAway(); - } - - void ReplySuccess(const Ydb::LongTx::CommitTransactionResult& result) { - Request->SendResult(result, Ydb::StatusIds::SUCCESS); - PassAway(); - } - -private: - std::unique_ptr Request; - TLongTxId LongTxId; -}; - -// - -class TLongTxRollbackRPC : public TActorBootstrapped { - using TBase = TActorBootstrapped; -public: - static constexpr NKikimrServices::TActivity::EType ActorActivityType() { - return NKikimrServices::TActivity::GRPC_REQ; - } - - explicit TLongTxRollbackRPC(std::unique_ptr request) - : TBase() - , Request(std::move(request)) - { - } - - void Bootstrap() { - const auto* req = TEvLongTxRollbackRequest::GetProtoRequest(Request); - - TString errMsg; - if (!LongTxId.ParseString(req->tx_id(), &errMsg)) { - return ReplyError(Ydb::StatusIds::BAD_REQUEST, errMsg); - } - - Send(MakeLongTxServiceID(SelfId().NodeId()), new TEvLongTxService::TEvRollbackTx(LongTxId)); - Become(&TThis::StateWork); - } - -private: - STFUNC(StateWork) { - switch (ev->GetTypeRewrite()) { - hFunc(TEvLongTxService::TEvRollbackTxResult, Handle); - } - } - - void Handle(TEvLongTxService::TEvRollbackTxResult::TPtr& ev) { - const auto* msg = ev->Get(); - - if (msg->Record.GetStatus() != Ydb::StatusIds::SUCCESS) { - NYql::TIssues issues; - NYql::IssuesFromMessage(msg->Record.GetIssues(), issues); - if (issues) { - Request->RaiseIssues(std::move(issues)); - } - Request->ReplyWithYdbStatus(msg->Record.GetStatus()); - return PassAway(); - } - - Ydb::LongTx::RollbackTransactionResult result; - const auto* req = TEvLongTxRollbackRequest::GetProtoRequest(Request); - result.set_tx_id(req->tx_id()); - ReplySuccess(result); - } - - void ReplyError(Ydb::StatusIds::StatusCode status, const TString& message) { - if (!message.empty()) { - Request->RaiseIssue(NYql::TIssue(message)); - } - Request->ReplyWithYdbStatus(status); - PassAway(); - } - - void ReplySuccess(const Ydb::LongTx::RollbackTransactionResult& result) { - Request->SendResult(result, Ydb::StatusIds::SUCCESS); - PassAway(); - } - -private: - std::unique_ptr Request; - TLongTxId LongTxId; -}; - - // Common logic of LongTx Write that takes care of splitting the data according to the sharding scheme, // sending it to shards and collecting their responses template @@ -411,136 +183,6 @@ class TLongTxWriteBase : public TActorBootstrapped { bool IndexReady = false; }; - -// GRPC call implementation of LongTx Write -class TLongTxWriteRPC : public TLongTxWriteBase { - using TBase = TLongTxWriteBase; - - class TProtoDataWrapper : public NEvWrite::IShardsSplitter::IEvWriteDataAccessor { - const TEvLongTxWriteRequest::TRequest* ProtoRequest = nullptr; - mutable std::shared_ptr Batch; - public: - TProtoDataWrapper(const TEvLongTxWriteRequest::TRequest* request) - : ProtoRequest(request) - { - } - - std::shared_ptr GetDeserializedBatch() const override { - if (Batch) { - return Batch; - } else { - auto res = NArrow::NSerialization::TFullDataDeserializer().Deserialize(GetSerializedData()); - if (res.ok()) { - Batch = *res; - } - } - return Batch; - } - - TString GetSerializedData() const override { - Y_ABORT_UNLESS(ProtoRequest); - return ProtoRequest->data().data(); - } - }; - - NEvWrite::IShardsSplitter::IEvWriteDataAccessor::TPtr DataAccessor; -public: - static constexpr NKikimrServices::TActivity::EType ActorActivityType() { - return NKikimrServices::TActivity::GRPC_REQ; - } - - explicit TLongTxWriteRPC(std::unique_ptr request) - : TBase(request->GetDatabaseName().GetOrElse(DatabaseFromDomain(AppData())), - TEvLongTxWriteRequest::GetProtoRequest(request)->path(), - request->GetSerializedToken(), - TLongTxId(), - TEvLongTxWriteRequest::GetProtoRequest(request)->dedup_id()) - , Request(std::move(request)) - , SchemeCache(MakeSchemeCacheID()) - { - DataAccessor = std::make_shared(GetProtoRequest()); - } - - void Bootstrap() { - const auto* req = GetProtoRequest(); - - TString errMsg; - if (!LongTxId.ParseString(req->tx_id(), &errMsg)) { - return ReplyError(Ydb::StatusIds::BAD_REQUEST, errMsg); - } - - if (GetProtoRequest()->data().format() != Ydb::LongTx::Data::APACHE_ARROW) { - return ReplyError(Ydb::StatusIds::BAD_REQUEST, "Only APACHE_ARROW data format is supported"); - } - - SendNavigateRequest(); - } - - void SendNavigateRequest() { - auto request = MakeHolder(); - request->DatabaseName = this->DatabaseName; - auto& entry = request->ResultSet.emplace_back(); - entry.Path = ::NKikimr::SplitPath(Path); - entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpPath; - Send(SchemeCache, new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release())); - Become(&TThis::StateNavigate); - } - - STFUNC(StateNavigate) { - switch (ev->GetTypeRewrite()) { - hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); - } - } - - void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { - NSchemeCache::TSchemeCacheNavigate* resp = ev->Get()->Request.Get(); - Y_ABORT_UNLESS(resp); - ProceedWithSchema(*resp); - } - -private: - const TEvLongTxWriteRequest::TRequest* GetProtoRequest() const { - return TEvLongTxWriteRequest::GetProtoRequest(Request); - } - -protected: - NEvWrite::IShardsSplitter::IEvWriteDataAccessor& GetDataAccessor() const override { - return *DataAccessor; - } - - void RaiseIssue(const NYql::TIssue& issue) override { - Request->RaiseIssue(issue); - } - - void ReplyError(Ydb::StatusIds::StatusCode status, const TString& message = TString()) override { - if (!message.empty()) { - Request->RaiseIssue(NYql::TIssue(message)); - } - Request->ReplyWithYdbStatus(status); - PassAway(); - } - - void ReplySuccess() override { - Ydb::LongTx::WriteResult result; - result.set_tx_id(GetProtoRequest()->tx_id()); - result.set_path(Path); - result.set_dedup_id(DedupId); - - Request->SendResult(result, Ydb::StatusIds::SUCCESS); - PassAway(); - } - -private: - std::unique_ptr Request; - TActorId SchemeCache; -}; - - -template<> -IActor* TEvLongTxWriteRequest::CreateRpcActor(NKikimr::NGRpcService::IRequestOpCtx* msg) { - return new TLongTxWriteRPC(std::unique_ptr(msg)); -} - // LongTx Write implementation called from the inside of YDB (e.g. as a part of BulkUpsert call) // NOTE: permission checks must have been done by the caller class TLongTxWriteInternal : public TLongTxWriteBase { @@ -630,21 +272,6 @@ TActorId DoLongTxWriteSameMailbox(const TActorContext& ctx, const TActorId& repl // -void DoLongTxBeginRPC(std::unique_ptr p, const IFacilityProvider& f) { - f.RegisterActor(new TLongTxBeginRPC(std::move(p))); -} - -void DoLongTxCommitRPC(std::unique_ptr p, const IFacilityProvider& f) { - f.RegisterActor(new TLongTxCommitRPC(std::move(p))); -} - -void DoLongTxRollbackRPC(std::unique_ptr p, const IFacilityProvider& f) { - f.RegisterActor(new TLongTxRollbackRPC(std::move(p))); -} - -void DoLongTxWriteRPC(std::unique_ptr p, const IFacilityProvider& f) { - f.RegisterActor(new TLongTxWriteRPC(std::move(p))); -} } } diff --git a/ydb/core/grpc_services/service_longtx.h b/ydb/core/grpc_services/service_longtx.h deleted file mode 100644 index 2d46a639eecd..000000000000 --- a/ydb/core/grpc_services/service_longtx.h +++ /dev/null @@ -1,16 +0,0 @@ -#pragma once -#include - -namespace NKikimr { -namespace NGRpcService { - -class IRequestOpCtx; -class IFacilityProvider; - -void DoLongTxBeginRPC(std::unique_ptr p, const IFacilityProvider& f); -void DoLongTxCommitRPC(std::unique_ptr p, const IFacilityProvider& f); -void DoLongTxRollbackRPC(std::unique_ptr p, const IFacilityProvider& f); -void DoLongTxWriteRPC(std::unique_ptr p, const IFacilityProvider& f); - -} -} diff --git a/ydb/public/api/grpc/draft/CMakeLists.darwin-arm64.txt b/ydb/public/api/grpc/draft/CMakeLists.darwin-arm64.txt index 5ec7da389c4a..ec91e2feae26 100644 --- a/ydb/public/api/grpc/draft/CMakeLists.darwin-arm64.txt +++ b/ydb/public/api/grpc/draft/CMakeLists.darwin-arm64.txt @@ -138,7 +138,6 @@ target_proto_messages(api-grpc-draft PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_clickhouse_internal_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_persqueue_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_datastreams_v1.proto - ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_long_tx_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_maintenance_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_logstore_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_dynamic_config_v1.proto diff --git a/ydb/public/api/grpc/draft/CMakeLists.darwin-x86_64.txt b/ydb/public/api/grpc/draft/CMakeLists.darwin-x86_64.txt index 5ec7da389c4a..ec91e2feae26 100644 --- a/ydb/public/api/grpc/draft/CMakeLists.darwin-x86_64.txt +++ b/ydb/public/api/grpc/draft/CMakeLists.darwin-x86_64.txt @@ -138,7 +138,6 @@ target_proto_messages(api-grpc-draft PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_clickhouse_internal_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_persqueue_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_datastreams_v1.proto - ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_long_tx_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_maintenance_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_logstore_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_dynamic_config_v1.proto diff --git a/ydb/public/api/grpc/draft/CMakeLists.linux-aarch64.txt b/ydb/public/api/grpc/draft/CMakeLists.linux-aarch64.txt index 501f71bd6ddf..bf26eb685a8e 100644 --- a/ydb/public/api/grpc/draft/CMakeLists.linux-aarch64.txt +++ b/ydb/public/api/grpc/draft/CMakeLists.linux-aarch64.txt @@ -139,7 +139,6 @@ target_proto_messages(api-grpc-draft PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_clickhouse_internal_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_persqueue_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_datastreams_v1.proto - ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_long_tx_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_maintenance_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_logstore_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_dynamic_config_v1.proto diff --git a/ydb/public/api/grpc/draft/CMakeLists.linux-x86_64.txt b/ydb/public/api/grpc/draft/CMakeLists.linux-x86_64.txt index 501f71bd6ddf..bf26eb685a8e 100644 --- a/ydb/public/api/grpc/draft/CMakeLists.linux-x86_64.txt +++ b/ydb/public/api/grpc/draft/CMakeLists.linux-x86_64.txt @@ -139,7 +139,6 @@ target_proto_messages(api-grpc-draft PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_clickhouse_internal_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_persqueue_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_datastreams_v1.proto - ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_long_tx_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_maintenance_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_logstore_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_dynamic_config_v1.proto diff --git a/ydb/public/api/grpc/draft/CMakeLists.windows-x86_64.txt b/ydb/public/api/grpc/draft/CMakeLists.windows-x86_64.txt index 5ec7da389c4a..ec91e2feae26 100644 --- a/ydb/public/api/grpc/draft/CMakeLists.windows-x86_64.txt +++ b/ydb/public/api/grpc/draft/CMakeLists.windows-x86_64.txt @@ -138,7 +138,6 @@ target_proto_messages(api-grpc-draft PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_clickhouse_internal_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_persqueue_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_datastreams_v1.proto - ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_long_tx_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_maintenance_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_logstore_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_dynamic_config_v1.proto diff --git a/ydb/public/api/grpc/draft/ya.make b/ydb/public/api/grpc/draft/ya.make index f508a9309e46..908ae4985028 100644 --- a/ydb/public/api/grpc/draft/ya.make +++ b/ydb/public/api/grpc/draft/ya.make @@ -10,7 +10,6 @@ SRCS( ydb_clickhouse_internal_v1.proto ydb_persqueue_v1.proto ydb_datastreams_v1.proto - ydb_long_tx_v1.proto ydb_maintenance_v1.proto ydb_logstore_v1.proto ydb_dynamic_config_v1.proto diff --git a/ydb/public/api/grpc/draft/ydb_long_tx_v1.proto b/ydb/public/api/grpc/draft/ydb_long_tx_v1.proto deleted file mode 100644 index 7f920b44af39..000000000000 --- a/ydb/public/api/grpc/draft/ydb_long_tx_v1.proto +++ /dev/null @@ -1,14 +0,0 @@ -syntax = "proto3"; - -package Ydb.LongTx.V1; -option java_package = "com.yandex.ydb.long_tx.v1"; - -import "ydb/public/api/protos/draft/ydb_long_tx.proto"; - -service LongTxService { - rpc BeginTx(BeginTransactionRequest) returns (BeginTransactionResponse); - rpc CommitTx(CommitTransactionRequest) returns (CommitTransactionResponse); - rpc RollbackTx(RollbackTransactionRequest) returns (RollbackTransactionResponse); - rpc Write(WriteRequest) returns (WriteResponse); -// rpc ResolveNodes(ResolveNodesRequest) returns (stream ResolveNodesResponse); -} diff --git a/ydb/public/api/protos/CMakeLists.darwin-arm64.txt b/ydb/public/api/protos/CMakeLists.darwin-arm64.txt index b9750e7dc20a..472587120819 100644 --- a/ydb/public/api/protos/CMakeLists.darwin-arm64.txt +++ b/ydb/public/api/protos/CMakeLists.darwin-arm64.txt @@ -460,7 +460,6 @@ target_proto_messages(api-protos PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/draft/fq.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/draft/persqueue_common.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/draft/persqueue_error_codes.proto - ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/draft/ydb_long_tx.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/draft/ydb_maintenance.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/draft/ydb_logstore.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/draft/ydb_dynamic_config.proto diff --git a/ydb/public/api/protos/CMakeLists.darwin-x86_64.txt b/ydb/public/api/protos/CMakeLists.darwin-x86_64.txt index b9750e7dc20a..472587120819 100644 --- a/ydb/public/api/protos/CMakeLists.darwin-x86_64.txt +++ b/ydb/public/api/protos/CMakeLists.darwin-x86_64.txt @@ -460,7 +460,6 @@ target_proto_messages(api-protos PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/draft/fq.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/draft/persqueue_common.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/draft/persqueue_error_codes.proto - ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/draft/ydb_long_tx.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/draft/ydb_maintenance.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/draft/ydb_logstore.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/draft/ydb_dynamic_config.proto diff --git a/ydb/public/api/protos/CMakeLists.linux-aarch64.txt b/ydb/public/api/protos/CMakeLists.linux-aarch64.txt index 5b0fa3407712..ff7dd71ab01c 100644 --- a/ydb/public/api/protos/CMakeLists.linux-aarch64.txt +++ b/ydb/public/api/protos/CMakeLists.linux-aarch64.txt @@ -461,7 +461,6 @@ target_proto_messages(api-protos PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/draft/fq.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/draft/persqueue_common.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/draft/persqueue_error_codes.proto - ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/draft/ydb_long_tx.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/draft/ydb_maintenance.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/draft/ydb_logstore.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/draft/ydb_dynamic_config.proto diff --git a/ydb/public/api/protos/CMakeLists.linux-x86_64.txt b/ydb/public/api/protos/CMakeLists.linux-x86_64.txt index 5b0fa3407712..ff7dd71ab01c 100644 --- a/ydb/public/api/protos/CMakeLists.linux-x86_64.txt +++ b/ydb/public/api/protos/CMakeLists.linux-x86_64.txt @@ -461,7 +461,6 @@ target_proto_messages(api-protos PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/draft/fq.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/draft/persqueue_common.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/draft/persqueue_error_codes.proto - ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/draft/ydb_long_tx.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/draft/ydb_maintenance.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/draft/ydb_logstore.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/draft/ydb_dynamic_config.proto diff --git a/ydb/public/api/protos/CMakeLists.windows-x86_64.txt b/ydb/public/api/protos/CMakeLists.windows-x86_64.txt index b9750e7dc20a..472587120819 100644 --- a/ydb/public/api/protos/CMakeLists.windows-x86_64.txt +++ b/ydb/public/api/protos/CMakeLists.windows-x86_64.txt @@ -460,7 +460,6 @@ target_proto_messages(api-protos PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/draft/fq.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/draft/persqueue_common.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/draft/persqueue_error_codes.proto - ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/draft/ydb_long_tx.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/draft/ydb_maintenance.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/draft/ydb_logstore.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/draft/ydb_dynamic_config.proto diff --git a/ydb/public/api/protos/draft/ydb_long_tx.proto b/ydb/public/api/protos/draft/ydb_long_tx.proto deleted file mode 100644 index d5b658595fdd..000000000000 --- a/ydb/public/api/protos/draft/ydb_long_tx.proto +++ /dev/null @@ -1,89 +0,0 @@ -syntax = "proto3"; -option cc_enable_arenas = true; - -package Ydb.LongTx; -option java_package = "com.yandex.ydb.long_tx"; -option java_outer_classname = "LongTxProtos"; - -import "ydb/public/api/protos/ydb_operation.proto"; - -message Data -{ - enum Format { - FORMAT_UNSPECIFIED = 0x0000; - YDB_ROWS = 0x0001; - //CLICKHOUSE = 0x0002; - APACHE_ARROW = 0x0003; - //APACHE_PARQUET = 0x0004; - //APACHE_ORC = 0x0005; - } - - Format format = 1; - bytes data = 2; -} - -message BeginTransactionRequest { - enum TxTypeId { - TX_TYPE_ID_UNSPECIFIED = 0x0000; - WRITE = 0x0001; - READ = 0x0002; - //UPDATE = 0x0003; - //DELETE = 0x0004; - } - - Ydb.Operations.OperationParams operation_params = 1; - TxTypeId tx_type = 2; -} - -message BeginTransactionResult { - // Transaction id required for other requests. - string tx_id = 1; -} - -message BeginTransactionResponse { - Ydb.Operations.Operation operation = 1; -} - -message CommitTransactionRequest { - Ydb.Operations.OperationParams operation_params = 1; - string tx_id = 2; -} - -message CommitTransactionResult { - string tx_id = 1; -} - -message CommitTransactionResponse { - Ydb.Operations.Operation operation = 1; -} - -message RollbackTransactionRequest { - Ydb.Operations.OperationParams operation_params = 1; - string tx_id = 2; -} - -message RollbackTransactionResult { - string tx_id = 1; -} - -message RollbackTransactionResponse { - Ydb.Operations.Operation operation = 1; -} - -message WriteRequest { - Ydb.Operations.OperationParams operation_params = 1; - string tx_id = 2; - string path = 3; - string dedup_id = 4; - Data data = 5; -} - -message WriteResult { - string tx_id = 1; - string path = 2; - string dedup_id = 3; -} - -message WriteResponse { - Ydb.Operations.Operation operation = 1; -} diff --git a/ydb/public/api/protos/ya.make b/ydb/public/api/protos/ya.make index e21024ef0aa8..2b0ea3e8d39a 100644 --- a/ydb/public/api/protos/ya.make +++ b/ydb/public/api/protos/ya.make @@ -11,7 +11,6 @@ SRCS( draft/fq.proto draft/persqueue_common.proto draft/persqueue_error_codes.proto - draft/ydb_long_tx.proto draft/ydb_maintenance.proto draft/ydb_logstore.proto draft/ydb_dynamic_config.proto diff --git a/ydb/public/sdk/cpp/client/draft/CMakeLists.darwin-arm64.txt b/ydb/public/sdk/cpp/client/draft/CMakeLists.darwin-arm64.txt index f030fbc25cdc..5b65c5c25dc4 100644 --- a/ydb/public/sdk/cpp/client/draft/CMakeLists.darwin-arm64.txt +++ b/ydb/public/sdk/cpp/client/draft/CMakeLists.darwin-arm64.txt @@ -20,5 +20,4 @@ target_link_libraries(cpp-client-draft PUBLIC target_sources(cpp-client-draft PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_dynamic_config.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_scripting.cpp - ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_long_tx.cpp ) diff --git a/ydb/public/sdk/cpp/client/draft/CMakeLists.darwin-x86_64.txt b/ydb/public/sdk/cpp/client/draft/CMakeLists.darwin-x86_64.txt index f030fbc25cdc..5b65c5c25dc4 100644 --- a/ydb/public/sdk/cpp/client/draft/CMakeLists.darwin-x86_64.txt +++ b/ydb/public/sdk/cpp/client/draft/CMakeLists.darwin-x86_64.txt @@ -20,5 +20,4 @@ target_link_libraries(cpp-client-draft PUBLIC target_sources(cpp-client-draft PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_dynamic_config.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_scripting.cpp - ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_long_tx.cpp ) diff --git a/ydb/public/sdk/cpp/client/draft/CMakeLists.linux-aarch64.txt b/ydb/public/sdk/cpp/client/draft/CMakeLists.linux-aarch64.txt index c4dd9ff9e7e0..586b17a94d25 100644 --- a/ydb/public/sdk/cpp/client/draft/CMakeLists.linux-aarch64.txt +++ b/ydb/public/sdk/cpp/client/draft/CMakeLists.linux-aarch64.txt @@ -21,5 +21,4 @@ target_link_libraries(cpp-client-draft PUBLIC target_sources(cpp-client-draft PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_dynamic_config.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_scripting.cpp - ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_long_tx.cpp ) diff --git a/ydb/public/sdk/cpp/client/draft/CMakeLists.linux-x86_64.txt b/ydb/public/sdk/cpp/client/draft/CMakeLists.linux-x86_64.txt index c4dd9ff9e7e0..586b17a94d25 100644 --- a/ydb/public/sdk/cpp/client/draft/CMakeLists.linux-x86_64.txt +++ b/ydb/public/sdk/cpp/client/draft/CMakeLists.linux-x86_64.txt @@ -21,5 +21,4 @@ target_link_libraries(cpp-client-draft PUBLIC target_sources(cpp-client-draft PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_dynamic_config.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_scripting.cpp - ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_long_tx.cpp ) diff --git a/ydb/public/sdk/cpp/client/draft/CMakeLists.windows-x86_64.txt b/ydb/public/sdk/cpp/client/draft/CMakeLists.windows-x86_64.txt index f030fbc25cdc..5b65c5c25dc4 100644 --- a/ydb/public/sdk/cpp/client/draft/CMakeLists.windows-x86_64.txt +++ b/ydb/public/sdk/cpp/client/draft/CMakeLists.windows-x86_64.txt @@ -20,5 +20,4 @@ target_link_libraries(cpp-client-draft PUBLIC target_sources(cpp-client-draft PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_dynamic_config.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_scripting.cpp - ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_long_tx.cpp ) diff --git a/ydb/public/sdk/cpp/client/draft/ya.make b/ydb/public/sdk/cpp/client/draft/ya.make index 91b96fc5d04f..7d4f206e71be 100644 --- a/ydb/public/sdk/cpp/client/draft/ya.make +++ b/ydb/public/sdk/cpp/client/draft/ya.make @@ -3,7 +3,6 @@ LIBRARY() SRCS( ydb_dynamic_config.cpp ydb_scripting.cpp - ydb_long_tx.cpp ) PEERDIR( diff --git a/ydb/public/sdk/cpp/client/draft/ydb_long_tx.cpp b/ydb/public/sdk/cpp/client/draft/ydb_long_tx.cpp deleted file mode 100644 index 06ef07b11135..000000000000 --- a/ydb/public/sdk/cpp/client/draft/ydb_long_tx.cpp +++ /dev/null @@ -1,105 +0,0 @@ -#include "ydb_long_tx.h" - -#define INCLUDE_YDB_INTERNAL_H -#include -#undef INCLUDE_YDB_INTERNAL_H - -#include - -namespace NYdb { -namespace NLongTx { - -namespace { - -struct TOpSettings : public TOperationRequestSettings { -}; - -} - -class TClient::TImpl: public TClientImplCommon { -public: - TImpl(std::shared_ptr&& connections, const TClientSettings& settings) - : TClientImplCommon(std::move(connections), settings) - {} - - TAsyncBeginTxResult BeginTx(Ydb::LongTx::BeginTransactionRequest::TxTypeId txType, - const TOpSettings& settings = TOpSettings()) { - auto request = MakeOperationRequest(settings); - request.set_tx_type(txType); - - return RunOperation( - std::move(request), - &Ydb::LongTx::V1::LongTxService::Stub::AsyncBeginTx, - TRpcRequestSettings::Make(settings)); - } - - TAsyncCommitTxResult CommitTx(const TString& txId, - const TOpSettings& settings = TOpSettings()) { - auto request = MakeOperationRequest(settings); - request.set_tx_id(txId); - - return RunOperation( - std::move(request), - &Ydb::LongTx::V1::LongTxService::Stub::AsyncCommitTx, - TRpcRequestSettings::Make(settings)); - } - - TAsyncRollbackTxResult RollbackTx(const TString& txId, - const TOpSettings& settings = TOpSettings()) { - auto request = MakeOperationRequest(settings); - request.set_tx_id(txId); - - return RunOperation( - std::move(request), - &Ydb::LongTx::V1::LongTxService::Stub::AsyncRollbackTx, - TRpcRequestSettings::Make(settings)); - } - - TAsyncWriteResult Write(const TString& txId, const TString& table, const TString& dedupId, - const TString& data, Ydb::LongTx::Data::Format format, - const TOpSettings& settings = TOpSettings()) { - auto request = MakeOperationRequest(settings); - request.set_tx_id(txId); - request.set_path(table); - request.set_dedup_id(dedupId); - - auto req_data = request.mutable_data(); - req_data->set_format(format); - req_data->set_data(data); - - return RunOperation( - std::move(request), - &Ydb::LongTx::V1::LongTxService::Stub::AsyncWrite, - TRpcRequestSettings::Make(settings)); - } - -}; - -TClient::TClient(const TDriver& driver, const TClientSettings& settings) - : Impl_(new TImpl(CreateInternalInterface(driver), settings)) -{} - -TClient::TAsyncBeginTxResult TClient::BeginWriteTx() { - return Impl_->BeginTx(Ydb::LongTx::BeginTransactionRequest::WRITE); -} - -TClient::TAsyncCommitTxResult TClient::CommitTx(const TString& txId) { - return Impl_->CommitTx(txId); -} - -TClient::TAsyncRollbackTxResult TClient::RollbackTx(const TString& txId) { - return Impl_->RollbackTx(txId); -} - -TClient::TAsyncWriteResult TClient::Write(const TString& txId, const TString& table, const TString& dedupId, - const TString& data, Ydb::LongTx::Data::Format format) { - return Impl_->Write(txId, table, dedupId, data, format); -} - - -} // namespace NLongTx -} // namespace NYdb diff --git a/ydb/public/sdk/cpp/client/draft/ydb_long_tx.h b/ydb/public/sdk/cpp/client/draft/ydb_long_tx.h deleted file mode 100644 index e749d0348ab8..000000000000 --- a/ydb/public/sdk/cpp/client/draft/ydb_long_tx.h +++ /dev/null @@ -1,103 +0,0 @@ -#pragma once - -#include -#include -#include - -namespace NYdb { -namespace NLongTx { - -class TLongTxBeginResult : public TOperation { -public: - explicit TLongTxBeginResult(TStatus&& status) - : TOperation(std::move(status)) - {} - - TLongTxBeginResult(TStatus&& status, Ydb::Operations::Operation&& operation) - : TOperation(std::move(status), std::move(operation)) - {} - - Ydb::LongTx::BeginTransactionResult GetResult() { - Ydb::LongTx::BeginTransactionResult result; - GetProto().result().UnpackTo(&result); - return result; - } -}; - -class TLongTxCommitResult : public TOperation { -public: - explicit TLongTxCommitResult(TStatus&& status) - : TOperation(std::move(status)) - {} - - TLongTxCommitResult(TStatus&& status, Ydb::Operations::Operation&& operation) - : TOperation(std::move(status), std::move(operation)) - {} - - Ydb::LongTx::CommitTransactionResult GetResult() { - Ydb::LongTx::CommitTransactionResult result; - GetProto().result().UnpackTo(&result); - return result; - } -}; - -class TLongTxRollbackResult : public TOperation { -public: - explicit TLongTxRollbackResult(TStatus&& status) - : TOperation(std::move(status)) - {} - - TLongTxRollbackResult(TStatus&& status, Ydb::Operations::Operation&& operation) - : TOperation(std::move(status), std::move(operation)) - {} - - Ydb::LongTx::RollbackTransactionResult GetResult() { - Ydb::LongTx::RollbackTransactionResult result; - GetProto().result().UnpackTo(&result); - return result; - } -}; - -class TLongTxWriteResult : public TOperation { -public: - explicit TLongTxWriteResult(TStatus&& status) - : TOperation(std::move(status)) - {} - - TLongTxWriteResult(TStatus&& status, Ydb::Operations::Operation&& operation) - : TOperation(std::move(status), std::move(operation)) - {} - - Ydb::LongTx::WriteResult GetResult() { - Ydb::LongTx::WriteResult result; - GetProto().result().UnpackTo(&result); - return result; - } -}; - -struct TClientSettings : public TCommonClientSettingsBase { - using TSelf = TClientSettings; -}; - -class TClient { -public: - using TAsyncBeginTxResult = NThreading::TFuture; - using TAsyncCommitTxResult = NThreading::TFuture; - using TAsyncRollbackTxResult = NThreading::TFuture; - using TAsyncWriteResult = NThreading::TFuture; - - TClient(const TDriver& driver, const TClientSettings& settings = TClientSettings()); - - TAsyncBeginTxResult BeginWriteTx(); - TAsyncCommitTxResult CommitTx(const TString& txId); - TAsyncRollbackTxResult RollbackTx(const TString& txId); - TAsyncWriteResult Write(const TString& txId, const TString& table, const TString& dedupId, - const TString& data, Ydb::LongTx::Data::Format format); - -private: - class TImpl; - std::shared_ptr Impl_; -}; - -} // namespace NLongTx -} // namespace NYdb diff --git a/ydb/services/ydb/CMakeLists.darwin-arm64.txt b/ydb/services/ydb/CMakeLists.darwin-arm64.txt index 86a699d8001d..1eb633ff6647 100644 --- a/ydb/services/ydb/CMakeLists.darwin-arm64.txt +++ b/ydb/services/ydb/CMakeLists.darwin-arm64.txt @@ -45,5 +45,4 @@ target_sources(ydb-services-ydb PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_scheme.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_scripting.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_table.cpp - ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_long_tx.cpp ) diff --git a/ydb/services/ydb/CMakeLists.darwin-x86_64.txt b/ydb/services/ydb/CMakeLists.darwin-x86_64.txt index 86a699d8001d..1eb633ff6647 100644 --- a/ydb/services/ydb/CMakeLists.darwin-x86_64.txt +++ b/ydb/services/ydb/CMakeLists.darwin-x86_64.txt @@ -45,5 +45,4 @@ target_sources(ydb-services-ydb PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_scheme.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_scripting.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_table.cpp - ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_long_tx.cpp ) diff --git a/ydb/services/ydb/CMakeLists.linux-aarch64.txt b/ydb/services/ydb/CMakeLists.linux-aarch64.txt index 959b3536694c..53904b2d393b 100644 --- a/ydb/services/ydb/CMakeLists.linux-aarch64.txt +++ b/ydb/services/ydb/CMakeLists.linux-aarch64.txt @@ -46,5 +46,4 @@ target_sources(ydb-services-ydb PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_scheme.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_scripting.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_table.cpp - ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_long_tx.cpp ) diff --git a/ydb/services/ydb/CMakeLists.linux-x86_64.txt b/ydb/services/ydb/CMakeLists.linux-x86_64.txt index 959b3536694c..53904b2d393b 100644 --- a/ydb/services/ydb/CMakeLists.linux-x86_64.txt +++ b/ydb/services/ydb/CMakeLists.linux-x86_64.txt @@ -46,5 +46,4 @@ target_sources(ydb-services-ydb PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_scheme.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_scripting.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_table.cpp - ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_long_tx.cpp ) diff --git a/ydb/services/ydb/CMakeLists.windows-x86_64.txt b/ydb/services/ydb/CMakeLists.windows-x86_64.txt index 86a699d8001d..1eb633ff6647 100644 --- a/ydb/services/ydb/CMakeLists.windows-x86_64.txt +++ b/ydb/services/ydb/CMakeLists.windows-x86_64.txt @@ -45,5 +45,4 @@ target_sources(ydb-services-ydb PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_scheme.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_scripting.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_table.cpp - ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_long_tx.cpp ) diff --git a/ydb/services/ydb/ya.make b/ydb/services/ydb/ya.make index 74632bf34d20..26c9ef2a1db9 100644 --- a/ydb/services/ydb/ya.make +++ b/ydb/services/ydb/ya.make @@ -11,7 +11,6 @@ SRCS( ydb_scheme.cpp ydb_scripting.cpp ydb_table.cpp - ydb_long_tx.cpp ) PEERDIR( diff --git a/ydb/services/ydb/ydb_long_tx.cpp b/ydb/services/ydb/ydb_long_tx.cpp deleted file mode 100644 index 643d95b858a3..000000000000 --- a/ydb/services/ydb/ydb_long_tx.cpp +++ /dev/null @@ -1,35 +0,0 @@ -#include "ydb_long_tx.h" - -#include -#include -#include - -namespace NKikimr { -namespace NGRpcService { - -void TGRpcYdbLongTxService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) { - auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); - -#ifdef ADD_REQUEST -#error ADD_REQUEST macro already defined -#endif -#define ADD_REQUEST(NAME, REQ, CB) \ - MakeIntrusive> \ - (this, &Service_, CQ_, \ - [this](NYdbGrpc::IRequestContextBase *ctx) { \ - NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ - ActorSystem_->Send(GRpcRequestProxyId_, \ - new TGrpcRequestOperationCall \ - (ctx, &CB, TRequestAuxSettings{TRateLimiterMode::Off, nullptr})); \ - }, &Ydb::LongTx::V1::LongTxService::AsyncService::Request ## NAME, \ - #NAME, logger, getCounterBlock("long_tx", #NAME))->Run(); - - ADD_REQUEST(BeginTx, BeginTransaction, DoLongTxBeginRPC) - ADD_REQUEST(CommitTx, CommitTransaction, DoLongTxCommitRPC) - ADD_REQUEST(RollbackTx, RollbackTransaction, DoLongTxRollbackRPC) - ADD_REQUEST(Write, Write, DoLongTxWriteRPC) -#undef ADD_REQUEST -} - -} // namespace NGRpcService -} // namespace NKikimr diff --git a/ydb/services/ydb/ydb_long_tx.h b/ydb/services/ydb/ydb_long_tx.h deleted file mode 100644 index 645459f5fd90..000000000000 --- a/ydb/services/ydb/ydb_long_tx.h +++ /dev/null @@ -1,21 +0,0 @@ -#pragma once - -#include -#include -#include -#include - -namespace NKikimr { -namespace NGRpcService { - -class TGRpcYdbLongTxService - : public TGrpcServiceBase -{ -public: - using TGrpcServiceBase::TGrpcServiceBase; -private: - void SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger); -}; - -} // namespace NGRpcService -} // namespace NKikimr From f2ba3884a2185cb42c6f83ff7e3f8b7cd6e6029b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9E=D0=BB=D0=B5=D0=B3?= Date: Tue, 16 Jan 2024 17:28:58 +0300 Subject: [PATCH 5/8] fix build --- ydb/core/driver_lib/run/run.cpp | 12 ------------ ydb/core/testlib/test_client.cpp | 8 -------- ydb/core/tx/data_events/shard_writer.h | 1 - ydb/core/tx/tx_proxy/upload_rows_common_impl.h | 2 -- ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp | 1 - 5 files changed, 24 deletions(-) diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index 160affb9d238..cfdeb9ec567f 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -111,7 +111,6 @@ #include #include #include -#include #include #include #include @@ -565,8 +564,6 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { names["clickhouse_internal"] = &hasClickhouseInternal; TServiceCfg hasRateLimiter = false; names["rate_limiter"] = &hasRateLimiter; - TServiceCfg hasLongTx = false; - names["long_tx"] = &hasLongTx; TServiceCfg hasExport = services.empty(); names["export"] = &hasExport; TServiceCfg hasImport = services.empty(); @@ -726,11 +723,6 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { grpcRequestProxies[0], hasScripting.IsRlAllowed())); } - if (hasLongTx) { - server.AddService(new NGRpcService::TGRpcYdbLongTxService(ActorSystem.Get(), Counters, - grpcRequestProxies[0], hasLongTx.IsRlAllowed())); - } - if (hasSchemeService) { // RPC RL enabled // We have no way to disable or enable this service explicitly @@ -1576,10 +1568,6 @@ TIntrusivePtr TKikimrRunner::CreateServiceInitializers sil->AddServiceInitializer(new TAuditWriterInitializer(runConfig)); } - if (serviceMask.EnableLongTxService) { - sil->AddServiceInitializer(new TLongTxServiceInitializer(runConfig)); - } - if (serviceMask.EnableKqp || serviceMask.EnableYandexQuery) { sil->AddServiceInitializer(new TYqlLogsInitializer(runConfig)); } diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index bc4a5645342b..d6fcf947cd37 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -24,7 +24,6 @@ #include #include #include -#include #include #include #include @@ -385,7 +384,6 @@ namespace Tests { GRpcServer->AddService(discoveryService); GRpcServer->AddService(new NGRpcService::TGRpcYdbClickhouseInternalService(system, counters, appData.InFlightLimiterRegistry, grpcRequestProxies[0], true)); GRpcServer->AddService(new NQuoter::TRateLimiterGRpcService(system, counters, grpcRequestProxies[0])); - GRpcServer->AddService(new NGRpcService::TGRpcYdbLongTxService(system, counters, grpcRequestProxies[0], true)); GRpcServer->AddService(new NGRpcService::TGRpcDataStreamsService(system, counters, grpcRequestProxies[0], true)); GRpcServer->AddService(new NGRpcService::TGRpcMonitoringService(system, counters, grpcRequestProxies[0], true)); GRpcServer->AddService(new NGRpcService::TGRpcYdbQueryService(system, counters, grpcRequestProxies[0], true)); @@ -905,12 +903,6 @@ namespace Tests { Runtime->RegisterService(MakeMiniKQLCompileServiceID(), compileServiceId, nodeIdx); } - { - IActor* longTxService = NLongTxService::CreateLongTxService(); - TActorId longTxServiceId = Runtime->Register(longTxService, nodeIdx); - Runtime->RegisterService(NLongTxService::MakeLongTxServiceID(Runtime->GetNodeId(nodeIdx)), longTxServiceId, nodeIdx); - } - { IActor* sequenceProxy = NSequenceProxy::CreateSequenceProxy(); TActorId sequenceProxyId = Runtime->Register(sequenceProxy, nodeIdx); diff --git a/ydb/core/tx/data_events/shard_writer.h b/ydb/core/tx/data_events/shard_writer.h index 7357c22d29c2..fe713d1960be 100644 --- a/ydb/core/tx/data_events/shard_writer.h +++ b/ydb/core/tx/data_events/shard_writer.h @@ -5,7 +5,6 @@ #include #include #include -#include #include #include diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h index 7388dfc339c2..907578f0346d 100644 --- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h +++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h @@ -23,7 +23,6 @@ #include #include -#include #define INCLUDE_YDB_INTERNAL_H #include @@ -169,7 +168,6 @@ class TUploadRowsBase : public TActorBootstrapped Issues = std::make_shared(); NLongTxService::TLongTxId LongTxId; - NThreading::TFuture WriteBatchResult; TUploadCounters UploadCounters; protected: diff --git a/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp b/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp index 6f6f9533968d..4ab649e0e446 100644 --- a/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp +++ b/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp @@ -2,7 +2,6 @@ #include #include -#include #include #include From 69b7954e7a850b6c8c17637a592c860449449d53 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9E=D0=BB=D0=B5=D0=B3?= Date: Thu, 18 Jan 2024 14:08:11 +0300 Subject: [PATCH 6/8] fix tests --- ydb/core/driver_lib/run/run.cpp | 4 ++++ ydb/core/testlib/test_client.cpp | 6 ++++++ 2 files changed, 10 insertions(+) diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index cfdeb9ec567f..69ac30badc98 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -1568,6 +1568,10 @@ TIntrusivePtr TKikimrRunner::CreateServiceInitializers sil->AddServiceInitializer(new TAuditWriterInitializer(runConfig)); } + if (serviceMask.EnableLongTxService) { + sil->AddServiceInitializer(new TLongTxServiceInitializer(runConfig)); + } + if (serviceMask.EnableKqp || serviceMask.EnableYandexQuery) { sil->AddServiceInitializer(new TYqlLogsInitializer(runConfig)); } diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index d6fcf947cd37..609cedf3184f 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -903,6 +903,12 @@ namespace Tests { Runtime->RegisterService(MakeMiniKQLCompileServiceID(), compileServiceId, nodeIdx); } + { + IActor* longTxService = NLongTxService::CreateLongTxService(); + TActorId longTxServiceId = Runtime->Register(longTxService, nodeIdx); + Runtime->RegisterService(NLongTxService::MakeLongTxServiceID(Runtime->GetNodeId(nodeIdx)), longTxServiceId, nodeIdx); + } + { IActor* sequenceProxy = NSequenceProxy::CreateSequenceProxy(); TActorId sequenceProxyId = Runtime->Register(sequenceProxy, nodeIdx); From 3c3890b6aad4ad8f684823af128971c4574db087 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9E=D0=BB=D0=B5=D0=B3?= Date: Thu, 18 Jan 2024 14:59:40 +0300 Subject: [PATCH 7/8] fix build --- ydb/core/kqp/ut/olap/kqp_olap_ut.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp index 208b639948c4..4a69d43ef61f 100644 --- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp @@ -531,7 +531,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { } }; - void WriteTestData(TKikimrRunner& kikimr, TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount, bool withSomeNulls = false) { + void WriteTestData(TKikimrRunner& kikimr, TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount, bool /*withSomeNulls = false*/) { UNIT_ASSERT(testTable != "/Root/benchTable"); // TODO: check schema instead TLocalHelper lHelper(kikimr); auto batch = lHelper.TestArrowBatch(pathIdBegin, tsBegin, rowCount); From 9d9a10b9b8303142290133ca282a835063ee38d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9E=D0=BB=D0=B5=D0=B3?= Date: Thu, 18 Jan 2024 17:39:45 +0300 Subject: [PATCH 8/8] another fix build --- ydb/core/kqp/ut/olap/kqp_olap_ut.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp index 4a69d43ef61f..12408595b220 100644 --- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp @@ -531,9 +531,11 @@ Y_UNIT_TEST_SUITE(KqpOlap) { } }; - void WriteTestData(TKikimrRunner& kikimr, TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount, bool /*withSomeNulls = false*/) { + void WriteTestData(TKikimrRunner& kikimr, TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount, bool withSomeNulls = false) { UNIT_ASSERT(testTable != "/Root/benchTable"); // TODO: check schema instead TLocalHelper lHelper(kikimr); + if (withSomeNulls) + lHelper.WithSomeNulls(); auto batch = lHelper.TestArrowBatch(pathIdBegin, tsBegin, rowCount); lHelper.SendDataViaActorSystem(testTable, batch); }