Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2351,9 +2351,19 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
Value2 String,
PRIMARY KEY (Key)
);
)", TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

result = db.ExecuteQuery(R"(
UPSERT INTO TestDdlDml2 (Key, Value1, Value2) VALUES (1, "1", "1");
SELECT * FROM TestDdlDml2;
ALTER TABLE TestDdlDml2 DROP COLUMN Value2;
)", TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString());

result = db.ExecuteQuery(R"(
UPSERT INTO TestDdlDml2 (Key, Value1) VALUES (1, "1");
SELECT * FROM TestDdlDml2;
UPSERT INTO TestDdlDml2 (Key, Value1) VALUES (2, "2");
SELECT * FROM TestDdlDml2;
CREATE TABLE TestDdlDml33 (
Expand All @@ -2363,7 +2373,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
)", TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 2);
CompareYson(R"([[[1u];["1"];["1"]]])", FormatResultSetYson(result.GetResultSet(0)));
CompareYson(R"([[[1u];["1"]]])", FormatResultSetYson(result.GetResultSet(0)));
CompareYson(R"([[[1u];["1"]];[[2u];["2"]]])", FormatResultSetYson(result.GetResultSet(1)));
UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString());

Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/datashard/alter_table_unit.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "datashard_impl.h"
#include "datashard_locks_db.h"
#include "datashard_pipeline.h"
#include "execution_unit_ctors.h"

Expand Down Expand Up @@ -151,7 +152,8 @@ EExecutionStatus TAlterTableUnit::Execute(TOperation::TPtr op,
}

TUserTable::TPtr info = DataShard.AlterUserTable(ctx, txc, alterTableTx);
DataShard.AddUserTable(tableId, info);
TDataShardLocksDb locksDb(DataShard, txc);
DataShard.AddUserTable(tableId, info, &locksDb);

if (info->NeedSchemaSnapshots()) {
DataShard.AddSchemaSnapshot(tableId, version, op->GetStep(), op->GetTxId(), txc, ctx);
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/datashard/create_cdc_stream_unit.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "datashard_impl.h"
#include "datashard_locks_db.h"
#include "datashard_pipeline.h"
#include "execution_unit_ctors.h"

Expand Down Expand Up @@ -40,7 +41,8 @@ class TCreateCdcStreamUnit : public TExecutionUnit {
Y_ABORT_UNLESS(version);

auto tableInfo = DataShard.AlterTableAddCdcStream(ctx, txc, pathId, version, streamDesc);
DataShard.AddUserTable(pathId, tableInfo);
TDataShardLocksDb locksDb(DataShard, txc);
DataShard.AddUserTable(pathId, tableInfo, &locksDb);

if (tableInfo->NeedSchemaSnapshots()) {
DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx);
Expand Down
5 changes: 2 additions & 3 deletions ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1872,7 +1872,6 @@ TUserTable::TPtr TDataShard::MoveUserTable(TOperation::TPtr op, const NKikimrTxD
newTableInfo->StatsNeedUpdate = true;

TDataShardLocksDb locksDb(*this, txc);

RemoveUserTable(prevId, &locksDb);
AddUserTable(newId, newTableInfo);

Expand Down Expand Up @@ -1951,8 +1950,8 @@ TUserTable::TPtr TDataShard::MoveUserIndex(TOperation::TPtr op, const NKikimrTxD
}

newTableInfo->SetSchema(schema);

AddUserTable(pathId, newTableInfo);
TDataShardLocksDb locksDb(*this, txc);
AddUserTable(pathId, newTableInfo, &locksDb);

if (newTableInfo->NeedSchemaSnapshots()) {
AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx);
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/tx/datashard/datashard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1636,7 +1636,10 @@ class TDataShard
TableInfos.erase(tableId.LocalPathId);
}

void AddUserTable(const TPathId& tableId, TUserTable::TPtr tableInfo) {
void AddUserTable(const TPathId& tableId, TUserTable::TPtr tableInfo, ILocksDb* locksDb = nullptr) {
if (locksDb) {
SysLocks.RemoveSchema(tableId, locksDb);
}
TableInfos[tableId.LocalPathId] = tableInfo;
SysLocks.UpdateSchema(tableId, tableInfo->KeyColumnTypes);
Pipeline.GetDepTracker().UpdateSchema(tableId, *tableInfo);
Expand Down
38 changes: 38 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3682,6 +3682,44 @@ Y_UNIT_TEST_SUITE(Cdc) {
MustNotLoseSchemaSnapshot(true);
}

Y_UNIT_TEST(ShouldBreakLocksOnConcurrentSchemeTx) {
TPortManager portManager;
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
.SetUseRealThreads(false)
.SetDomainName("Root")
);

auto& runtime = *server->GetRuntime();
const auto edgeActor = runtime.AllocateEdgeActor();

SetupLogging(runtime);
InitRoot(server, edgeActor);
CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable());

WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table",
Updates(NKikimrSchemeOp::ECdcStreamFormatJson)));

ExecSQL(server, edgeActor, "UPSERT INTO `/Root/Table` (key, value) VALUES (1, 10);");

TString sessionId;
TString txId;
KqpSimpleBegin(runtime, sessionId, txId, "UPSERT INTO `/Root/Table` (key, value) VALUES (1, 11);");

UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleContinue(runtime, sessionId, txId, "SELECT key, value FROM `/Root/Table`;"),
"{ items { uint32_value: 1 } items { uint32_value: 11 } }");

WaitTxNotification(server, edgeActor, AsyncAlterAddExtraColumn(server, "/Root", "Table"));

UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleCommit(runtime, sessionId, txId, "SELECT 1;"),
"ERROR: ABORTED");

WaitForContent(server, edgeActor, "/Root/Table/Stream", {
R"({"update":{"value":10},"key":[1]})",
});
}

Y_UNIT_TEST(ResolvedTimestampsContinueAfterMerge) {
TPortManager portManager;
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/datashard/drop_cdc_stream_unit.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "datashard_impl.h"
#include "datashard_locks_db.h"
#include "datashard_pipeline.h"
#include "execution_unit_ctors.h"

Expand Down Expand Up @@ -40,7 +41,8 @@ class TDropCdcStreamUnit : public TExecutionUnit {
Y_ABORT_UNLESS(version);

auto tableInfo = DataShard.AlterTableDropCdcStream(ctx, txc, pathId, version, streamPathId);
DataShard.AddUserTable(pathId, tableInfo);
TDataShardLocksDb locksDb(DataShard, txc);
DataShard.AddUserTable(pathId, tableInfo, &locksDb);

if (tableInfo->NeedSchemaSnapshots()) {
DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx);
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/datashard/drop_index_notice_unit.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "datashard_impl.h"
#include "datashard_locks_db.h"
#include "datashard_pipeline.h"
#include "execution_unit_ctors.h"

Expand Down Expand Up @@ -52,7 +53,8 @@ class TDropIndexNoticeUnit : public TExecutionUnit {
}

Y_ABORT_UNLESS(tableInfo);
DataShard.AddUserTable(pathId, tableInfo);
TDataShardLocksDb locksDb(DataShard, txc);
DataShard.AddUserTable(pathId, tableInfo, &locksDb);

if (tableInfo->NeedSchemaSnapshots()) {
DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx);
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/datashard/initiate_build_index_unit.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "datashard_impl.h"
#include "datashard_locks_db.h"
#include "datashard_pipeline.h"
#include "execution_unit_ctors.h"

Expand Down Expand Up @@ -53,7 +54,8 @@ class TInitiateBuildIndexUnit : public TExecutionUnit {
}

Y_ABORT_UNLESS(tableInfo);
DataShard.AddUserTable(pathId, tableInfo);
TDataShardLocksDb locksDb(DataShard, txc);
DataShard.AddUserTable(pathId, tableInfo, &locksDb);

if (tableInfo->NeedSchemaSnapshots()) {
DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx);
Expand Down
Loading