diff --git a/.github/config/muted_ya.txt b/.github/config/muted_ya.txt index af469721619e..9ad27838404d 100644 --- a/.github/config/muted_ya.txt +++ b/.github/config/muted_ya.txt @@ -10,9 +10,7 @@ ydb/core/keyvalue/ut_trace TKeyValueTracingTest.* ydb/core/kqp/provider/ut KikimrIcGateway.TestLoadBasicSecretValueFromExternalDataSourceMetadata ydb/core/kqp/ut/olap KqpOlapIndexes.IndexesActualization ydb/core/kqp/ut/olap KqpOlapBlobsSharing.* -ydb/core/kqp/ut/olap KqpOlap.ScanQueryOltpAndOlap ydb/core/kqp/ut/olap KqpOlapStatistics.StatsUsageWithTTL -ydb/core/kqp/ut/olap KqpOlap.YqlScriptOltpAndOlap ydb/core/kqp/ut/olap KqpOlapAggregations.Aggregation_ResultCountAll_FilterL ydb/core/kqp/ut/pg KqpPg.CreateIndex ydb/core/kqp/ut/query KqpLimits.QueryReplySize diff --git a/ydb/core/kqp/common/kqp_tx.cpp b/ydb/core/kqp/common/kqp_tx.cpp index e3209f08dec9..12ed99ef6c30 100644 --- a/ydb/core/kqp/common/kqp_tx.cpp +++ b/ydb/core/kqp/common/kqp_tx.cpp @@ -166,6 +166,13 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig for (const auto &input : stage.GetInputs()) { hasStreamLookup |= input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kStreamLookup; } + + for (const auto &tableOp : stage.GetTableOps()) { + if (tableOp.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kReadOlapRange) { + // always need snapshot for OLAP reads + return true; + } + } } } diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index 38447c484c8c..7ecac4e0aa6b 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -313,10 +313,6 @@ class TKqpQueryState : public TNonCopyable { bool NeedPersistentSnapshot() const { auto type = GetType(); - if (type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY || - type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY) { - return ::NKikimr::NKqp::HasOlapTableReadInTx(PreparedQuery->GetPhysicalQuery()); - } return ( type == NKikimrKqp::QUERY_TYPE_SQL_SCAN || type == NKikimrKqp::QUERY_TYPE_AST_SCAN diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index b66c28e20be1..d304fc06768a 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -832,9 +832,10 @@ class TKqpSessionActor : public TActorBootstrapped { const NKqpProto::TKqpPhyQuery& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery(); HasOlapTable |= ::NKikimr::NKqp::HasOlapTableReadInTx(phyQuery) || ::NKikimr::NKqp::HasOlapTableWriteInTx(phyQuery); HasOltpTable |= ::NKikimr::NKqp::HasOltpTableReadInTx(phyQuery) || ::NKikimr::NKqp::HasOltpTableWriteInTx(phyQuery); - if (HasOlapTable && HasOltpTable) { + HasTableWrite |= ::NKikimr::NKqp::HasOlapTableWriteInTx(phyQuery) || ::NKikimr::NKqp::HasOltpTableWriteInTx(phyQuery); + if (HasOlapTable && HasOltpTable && HasTableWrite) { ReplyQueryError(Ydb::StatusIds::PRECONDITION_FAILED, - "Transactions between column and row tables are disabled at current time."); + "Write transactions between column and row tables are disabled at current time."); return false; } QueryState->TxCtx->SetTempTables(QueryState->TempTablesState); @@ -2525,6 +2526,7 @@ class TKqpSessionActor : public TActorBootstrapped { bool HasOlapTable = false; bool HasOltpTable = false; + bool HasTableWrite = false; TGUCSettings::TPtr GUCSettings; }; diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp index 1469403863ae..648734a0674e 100644 --- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp @@ -2735,7 +2735,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); UNIT_ASSERT(!insertResult.IsSuccess()); UNIT_ASSERT_C( - insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"), + insertResult.GetIssues().ToString().Contains("Write transactions between column and row tables are disabled at current time"), insertResult.GetIssues().ToString()); } @@ -2748,20 +2748,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); UNIT_ASSERT(!insertResult.IsSuccess()); UNIT_ASSERT_C( - insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"), - insertResult.GetIssues().ToString()); - } - - { - // column & row read - const TString sql = R"( - SELECT * FROM `/Root/DataShard`; - SELECT * FROM `/Root/ColumnShard`; - )"; - auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); - UNIT_ASSERT(!insertResult.IsSuccess()); - UNIT_ASSERT_C( - insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"), + insertResult.GetIssues().ToString().Contains("Write transactions between column and row tables are disabled at current time"), insertResult.GetIssues().ToString()); } @@ -2776,7 +2763,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); UNIT_ASSERT(!insertResult.IsSuccess()); UNIT_ASSERT_C( - insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"), + insertResult.GetIssues().ToString().Contains("Write transactions between column and row tables are disabled at current time"), insertResult.GetIssues().ToString()); } @@ -2790,7 +2777,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); UNIT_ASSERT(!insertResult.IsSuccess()); UNIT_ASSERT_C( - insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"), + insertResult.GetIssues().ToString().Contains("Write transactions between column and row tables are disabled at current time"), insertResult.GetIssues().ToString()); } @@ -2804,7 +2791,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); UNIT_ASSERT(!insertResult.IsSuccess()); UNIT_ASSERT_C( - insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"), + insertResult.GetIssues().ToString().Contains("Write transactions between column and row tables are disabled at current time"), insertResult.GetIssues().ToString()); } } @@ -3479,6 +3466,96 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { } } + Y_UNIT_TEST(ReadDatashardAndColumnshard) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true); + appConfig.MutableTableServiceConfig()->SetEnableOltpSink(true); + auto settings = TKikimrSettings() + .SetAppConfig(appConfig) + .SetWithSampleTables(false); + + TKikimrRunner kikimr(settings); + Tests::NCommon::TLoggerInit(kikimr).Initialize(); + + auto client = kikimr.GetQueryClient(); + + { + auto createTable = client.ExecuteQuery(R"sql( + CREATE TABLE `/Root/DataShard` ( + Col1 Uint64 NOT NULL, + Col2 Int32, + Col3 String, + PRIMARY KEY (Col1) + ) WITH ( + STORE = ROW, + AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10 + ); + CREATE TABLE `/Root/ColumnShard` ( + Col1 Uint64 NOT NULL, + Col2 Int32, + Col3 String, + PRIMARY KEY (Col1) + ) WITH ( + STORE = COLUMN, + AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10 + ); + )sql", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_C(createTable.IsSuccess(), createTable.GetIssues().ToString()); + } + + { + auto replaceValues = client.ExecuteQuery(R"sql( + REPLACE INTO `/Root/DataShard` (Col1, Col2, Col3) VALUES + (1u, 1, "row"); + )sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(replaceValues.IsSuccess(), replaceValues.GetIssues().ToString()); + } + + { + auto replaceValues = client.ExecuteQuery(R"sql( + REPLACE INTO `/Root/ColumnShard` (Col1, Col2, Col3) VALUES + (2u, 2, "column"); + )sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(replaceValues.IsSuccess(), replaceValues.GetIssues().ToString()); + } + + { + auto it = client.StreamExecuteQuery(R"sql( + SELECT * FROM `/Root/ColumnShard`; + )sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString()); + TString output = StreamResultToYson(it); + CompareYson( + output, + R"([[2u;[2];["column"]]])"); + } + + { + auto it = client.StreamExecuteQuery(R"sql( + SELECT * FROM `/Root/DataShard` + UNION ALL + SELECT * FROM `/Root/ColumnShard`; + )sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString()); + TString output = StreamResultToYson(it); + CompareYson( + output, + R"([[1u;[1];["row"]];[2u;[2];["column"]]])"); + } + + { + auto it = client.StreamExecuteQuery(R"sql( + SELECT r.Col3, c.Col3 FROM `/Root/DataShard` AS r + JOIN `/Root/ColumnShard` AS c ON r.Col1 + 1 = c.Col1; + )sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString()); + TString output = StreamResultToYson(it); + CompareYson( + output, + R"([[["row"];["column"]]])"); + } + } + Y_UNIT_TEST(ReplaceIntoWithDefaultValue) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableOlapSink(false);