diff --git a/ydb/core/tx/datashard/datashard__engine_host.h b/ydb/core/tx/datashard/datashard__engine_host.h index efe4b46b5dd8..660ed7ad5e52 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.h +++ b/ydb/core/tx/datashard/datashard__engine_host.h @@ -52,6 +52,7 @@ class TEngineBay : TNonCopyable { const NMiniKQL::IEngineFlat * GetEngine() const { return Engine.Get(); } NMiniKQL::IEngineFlat * GetEngine(); + NMiniKQL::TEngineHost * GetEngineHost() { return EngineHost.Get(); } void SetLockTxId(ui64 lockTxId, ui32 lockNodeId); void SetUseLlvmRuntime(bool llvmRuntime) { EngineSettings->LlvmRuntime = llvmRuntime; } diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp index 9e95d424f71a..e6cf62d36547 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.cpp +++ b/ydb/core/tx/datashard/datashard_pipeline.cpp @@ -1580,7 +1580,7 @@ TOperation::TPtr TPipeline::BuildOperation(NEvents::TDataEvents::TEvWrite::TPtr& return writeOp; } - writeOp->ExtractKeys(); + writeTx->ExtractKeys(true); switch (rec.txmode()) { case NKikimrDataEvents::TEvWrite::MODE_PREPARE: diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp index a34be8bfc22f..9d485d3e30ff 100644 --- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp @@ -735,6 +735,16 @@ struct TTestHelper { UNIT_ASSERT_VALUES_EQUAL(rowsRead, Min(rowCount, limit)); } + void WriteRowTwin(const TString& tableName, const TVector& values, bool isEvWrite) { + if(isEvWrite) + WriteRow(tableName, ++TxId, values); + else + ExecSQL(Server, Sender, TStringBuilder() + << "UPSERT INTO `/Root/" << tableName << "`\n" + << "(" << JoinSeq(",", MakeMappedRange(Tables[tableName].Columns, [](const auto& col) { return col.Name; })) << ")\n" + << "VALUES\n(" << JoinSeq(",", values) << ");"); + } + NKikimrDataEvents::TEvWriteResult WriteRow(const TString& tableName, ui64 txId, const TVector& values, NKikimrDataEvents::TEvWrite::ETxMode txMode = NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE) { const auto& table = Tables[tableName]; @@ -909,6 +919,7 @@ struct TTestHelper { ui64 ShardCount = 1; Tests::TServer::TPtr Server; TActorId Sender; + ui64 TxId = 100; THashMap Tables; }; @@ -3110,34 +3121,30 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { UNIT_ASSERT_VALUES_EQUAL(readResults, 0); } - Y_UNIT_TEST(ShouldReturnBrokenLockWhenReadKey) { + Y_UNIT_TEST_TWIN(ShouldReturnBrokenLockWhenReadKey, EvWrite) { TTestHelper helper; const ui64 lockTxId = 1011121314; + const TString tableName = "table-1"; - auto request1 = helper.GetBaseReadRequest("table-1", 1); + auto request1 = helper.GetBaseReadRequest(tableName, 1); request1->Record.SetLockTxId(lockTxId); AddKeyQuery(*request1, {1, 1, 1}); - auto readResult1 = helper.SendRead("table-1", request1.release()); + auto readResult1 = helper.SendRead(tableName, request1.release()); UNIT_ASSERT_VALUES_EQUAL(readResult1->Record.TxLocksSize(), 1); UNIT_ASSERT_VALUES_EQUAL(readResult1->Record.BrokenTxLocksSize(), 0); // breaks lock obtained above - ExecSQL(helper.Server, helper.Sender, R"( - UPSERT INTO `/Root/table-1` - (key1, key2, key3, value) - VALUES - (1, 1, 1, 101); - )"); + helper.WriteRowTwin(tableName, {1, 1, 1, 101}, EvWrite); // we use request2 to obtain same lock as in request1 to check it - auto request2 = helper.GetBaseReadRequest("table-1", 1); + auto request2 = helper.GetBaseReadRequest(tableName, 1); request2->Record.SetLockTxId(lockTxId); AddKeyQuery(*request2, {1, 1, 1}); - auto readResult2 = helper.SendRead("table-1", request2.release()); + auto readResult2 = helper.SendRead(tableName, request2.release()); UNIT_ASSERT_VALUES_EQUAL(readResult2->Record.TxLocksSize(), 0); UNIT_ASSERT_VALUES_EQUAL(readResult2->Record.BrokenTxLocksSize(), 1); @@ -3148,62 +3155,35 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { UNIT_ASSERT(lock.GetCounter() < brokenLock.GetCounter()); } - Y_UNIT_TEST(ShouldReturnBrokenLockWhenReadRange) { + Y_UNIT_TEST_TWIN(ShouldReturnBrokenLockWhenReadRange, EvWrite) { // upsert into "left border -1 " and to the "right border + 1" - lock not broken // upsert inside range - broken TTestHelper helper; const ui64 lockTxId = 1011121314; + const TString tableName = "table-1"; + const TVector checkKey = {11, 11, 11}; - auto request1 = helper.GetBaseReadRequest("table-1", 1); + auto request1 = helper.GetBaseReadRequest(tableName, 1); request1->Record.SetLockTxId(lockTxId); - AddRangeQuery( - *request1, - {3, 3, 3}, - true, - {8, 0, 1}, - true - ); + AddRangeQuery(*request1, {3, 3, 3}, true, {8, 0, 1}, true); + auto readResult1 = helper.SendRead(tableName, request1.release()); - auto readResult1 = helper.SendRead("table-1", request1.release()); + // upsert to the left and check that lock is not broken + helper.WriteRowTwin(tableName, {1, 1, 1, 101}, EvWrite); + helper.CheckLockValid(tableName, 2, checkKey, lockTxId); - { - // upsert to the left and check that lock is not broken - ExecSQL(helper.Server, helper.Sender, R"( - UPSERT INTO `/Root/table-1` - (key1, key2, key3, value) - VALUES - (1, 1, 1, 101); - )"); - - helper.CheckLockValid("table-1", 2, {11, 11, 11}, lockTxId); - } - - { - // upsert to the right and check that lock is not broken - ExecSQL(helper.Server, helper.Sender, R"( - UPSERT INTO `/Root/table-1` - (key1, key2, key3, value) - VALUES - (8, 1, 0, 802); - )"); - - helper.CheckLockValid("table-1", 2, {11, 11, 11}, lockTxId); - } + // upsert to the right and check that lock is not broken + helper.WriteRowTwin(tableName, {8, 1, 0, 802}, EvWrite); + helper.CheckLockValid(tableName, 2, checkKey, lockTxId); // breaks lock // also we modify range: insert new key - ExecSQL(helper.Server, helper.Sender, R"( - UPSERT INTO `/Root/table-1` - (key1, key2, key3, value) - VALUES - (4, 4, 4, 400); - )"); - - helper.CheckLockBroken("table-1", 3, {11, 11, 11}, lockTxId, *readResult1); + helper.WriteRowTwin(tableName, {4, 4, 4, 400}, EvWrite); + helper.CheckLockBroken(tableName, 3, checkKey, lockTxId, *readResult1); } - Y_UNIT_TEST(ShouldReturnBrokenLockWhenReadRangeInvisibleRowSkips) { + Y_UNIT_TEST_TWIN(ShouldReturnBrokenLockWhenReadRangeInvisibleRowSkips, EvWrite) { // If we read in v1, write in v2, then write breaks lock. // Because of out of order execution, v2 can happen before v1 // and we should properly handle it in DS to break lock. @@ -3217,29 +3197,20 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { {"/Root/movies", "/Root/table-1"}, TDuration::Hours(1)); + const ui64 lockTxId = 1011121314; + const TString tableName = "table-1"; + // write new data above snapshot - ExecSQL(helper.Server, helper.Sender, R"( - UPSERT INTO `/Root/table-1` - (key1, key2, key3, value) - VALUES - (4, 4, 4, 4444); - )"); + helper.WriteRowTwin(tableName, {4, 4, 4, 44441}, EvWrite); - const ui64 lockTxId = 1011121314; - auto request1 = helper.GetBaseReadRequest("table-1", 1, NKikimrDataEvents::FORMAT_ARROW, readVersion); + auto request1 = helper.GetBaseReadRequest(tableName, 1, NKikimrDataEvents::FORMAT_ARROW, readVersion); request1->Record.SetLockTxId(lockTxId); - AddRangeQuery( - *request1, - {1, 1, 1}, - true, - {5, 5, 5}, - true - ); + AddRangeQuery(*request1, {1, 1, 1}, true, {5, 5, 5}, true); - auto readResult1 = helper.SendRead("table-1", request1.release()); - CheckResult(helper.Tables["table-1"].UserTable, *readResult1, { + auto readResult1 = helper.SendRead(tableName, request1.release()); + CheckResult(helper.Tables[tableName].UserTable, *readResult1, { {1, 1, 1, 100}, {3, 3, 3, 300}, {5, 5, 5, 500}, @@ -3248,7 +3219,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { UNIT_ASSERT_VALUES_EQUAL(readResult1->Record.TxLocksSize(), 0); UNIT_ASSERT_VALUES_EQUAL(readResult1->Record.BrokenTxLocksSize(), 1); - helper.CheckLockBroken("table-1", 10, {11, 11, 11}, lockTxId, *readResult1); + helper.CheckLockBroken(tableName, 10, {11, 11, 11}, lockTxId, *readResult1); } Y_UNIT_TEST(ShouldReturnBrokenLockWhenReadRangeInvisibleRowSkips2) { @@ -3265,19 +3236,14 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { TDuration::Hours(1)); const ui64 lockTxId = 1011121314; + const TString tableName = "table-1"; - auto request1 = helper.GetBaseReadRequest("table-1", 1, NKikimrDataEvents::FORMAT_ARROW, readVersion); + auto request1 = helper.GetBaseReadRequest(tableName, 1, NKikimrDataEvents::FORMAT_ARROW, readVersion); request1->Record.SetLockTxId(lockTxId); - AddRangeQuery( - *request1, - {100, 0, 0}, - true, - {200, 0, 0}, - true - ); + AddRangeQuery(*request1, {100, 0, 0}, true, {200, 0, 0}, true); - auto readResult1 = helper.SendRead("table-1", request1.release()); - CheckResult(helper.Tables["table-1"].UserTable, *readResult1, {}); + auto readResult1 = helper.SendRead(tableName, request1.release()); + CheckResult(helper.Tables[tableName].UserTable, *readResult1, {}); UNIT_ASSERT_VALUES_EQUAL(readResult1->Record.TxLocksSize(), 1); UNIT_ASSERT_VALUES_EQUAL(readResult1->Record.BrokenTxLocksSize(), 0); @@ -3290,199 +3256,136 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { (300, 0, 0, 3000); )"); - auto request2 = helper.GetBaseReadRequest("table-1", 2, NKikimrDataEvents::FORMAT_ARROW, readVersion); + auto request2 = helper.GetBaseReadRequest(tableName, 2, NKikimrDataEvents::FORMAT_ARROW, readVersion); request2->Record.SetLockTxId(lockTxId); - AddRangeQuery( - *request2, - {300, 0, 0}, - true, - {300, 0, 0}, - true - ); + AddRangeQuery(*request2, {300, 0, 0}, true, {300, 0, 0}, true); - auto readResult2 = helper.SendRead("table-1", request2.release()); + auto readResult2 = helper.SendRead(tableName, request2.release()); UNIT_ASSERT_VALUES_EQUAL(readResult2->Record.TxLocksSize(), 0); UNIT_ASSERT_VALUES_EQUAL(readResult2->Record.BrokenTxLocksSize(), 1); - helper.CheckLockBroken("table-1", 10, {300, 0, 0}, lockTxId, *readResult2); + helper.CheckLockBroken(tableName, 10, {300, 0, 0}, lockTxId, *readResult2); } - Y_UNIT_TEST(ShouldReturnBrokenLockWhenReadRangeLeftBorder) { + Y_UNIT_TEST_TWIN(ShouldReturnBrokenLockWhenReadRangeLeftBorder, EvWrite) { TTestHelper helper; const ui64 lockTxId = 1011121314; + const TString tableName = "table-1"; - auto request1 = helper.GetBaseReadRequest("table-1", 1); + auto request1 = helper.GetBaseReadRequest(tableName, 1); request1->Record.SetLockTxId(lockTxId); - AddRangeQuery( - *request1, - {3, 3, 3}, - true, - {8, 0, 1}, - true - ); + AddRangeQuery(*request1, {3, 3, 3}, true, {8, 0, 1}, true); - auto readResult1 = helper.SendRead("table-1", request1.release()); + auto readResult1 = helper.SendRead(tableName, request1.release()); // breaks lock // also we modify range: insert new key - ExecSQL(helper.Server, helper.Sender, R"( - UPSERT INTO `/Root/table-1` - (key1, key2, key3, value) - VALUES - (3, 3, 3, 0xdead); - )"); - - helper.CheckLockBroken("table-1", 3, {11, 11, 11}, lockTxId, *readResult1); + helper.WriteRowTwin(tableName, {3, 3, 3, 0xdead}, EvWrite); + helper.CheckLockBroken(tableName, 3, {11, 11, 11}, lockTxId, *readResult1); } - Y_UNIT_TEST(ShouldReturnBrokenLockWhenReadRangeRightBorder) { + Y_UNIT_TEST_TWIN(ShouldReturnBrokenLockWhenReadRangeRightBorder, EvWrite) { TTestHelper helper; const ui64 lockTxId = 1011121314; + const TString tableName = "table-1"; - auto request1 = helper.GetBaseReadRequest("table-1", 1); + auto request1 = helper.GetBaseReadRequest(tableName, 1); request1->Record.SetLockTxId(lockTxId); - AddRangeQuery( - *request1, - {3, 3, 3}, - true, - {8, 0, 1}, - true - ); + AddRangeQuery(*request1, {3, 3, 3}, true, {8, 0, 1}, true); - auto readResult1 = helper.SendRead("table-1", request1.release()); + auto readResult1 = helper.SendRead(tableName, request1.release()); // breaks lock // also we modify range: insert new key - ExecSQL(helper.Server, helper.Sender, R"( - UPSERT INTO `/Root/table-1` - (key1, key2, key3, value) - VALUES - (8, 0, 1, 0xdead); - )"); - - helper.CheckLockBroken("table-1", 3, {11, 11, 11}, lockTxId, *readResult1); + helper.WriteRowTwin(tableName, {8, 0, 1, 0xdead}, EvWrite); + helper.CheckLockBroken(tableName, 3, {11, 11, 11}, lockTxId, *readResult1); } - Y_UNIT_TEST(ShouldReturnBrokenLockWhenReadKeyPrefix) { + Y_UNIT_TEST_TWIN(ShouldReturnBrokenLockWhenReadKeyPrefix, EvWrite) { // upsert into "left border -1 " and to the "right border + 1" - lock not broken // upsert inside range - broken TTestHelper helper; const ui64 lockTxId = 1011121314; + const TString tableName = "table-1"; - auto request1 = helper.GetBaseReadRequest("table-1", 1); + auto request1 = helper.GetBaseReadRequest(tableName, 1); request1->Record.SetLockTxId(lockTxId); AddKeyQuery(*request1, {8}); - auto readResult1 = helper.SendRead("table-1", request1.release()); - - { - // upsert to the left and check that lock is not broken - ExecSQL(helper.Server, helper.Sender, R"( - UPSERT INTO `/Root/table-1` - (key1, key2, key3, value) - VALUES - (5, 5, 5, 555); - )"); - - helper.CheckLockValid("table-1", 2, {11, 11, 11}, lockTxId); - } + auto readResult1 = helper.SendRead(tableName, request1.release()); - { - // upsert to the right and check that lock is not broken - ExecSQL(helper.Server, helper.Sender, R"( - UPSERT INTO `/Root/table-1` - (key1, key2, key3, value) - VALUES - (9, 0, 0, 900); - )"); + // upsert to the left and check that lock is not broken + helper.WriteRowTwin(tableName, {5, 5, 5, 555}, EvWrite); + helper.CheckLockValid(tableName, 2, {11, 11, 11}, lockTxId); - helper.CheckLockValid("table-1", 2, {11, 11, 11}, lockTxId); - } + // upsert to the right and check that lock is not broken + helper.WriteRowTwin(tableName, {9, 0, 0, 900}, EvWrite); + helper.CheckLockValid(tableName, 2, {11, 11, 11}, lockTxId); // breaks lock obtained above // also we modify range: insert new key - ExecSQL(helper.Server, helper.Sender, R"( - UPSERT INTO `/Root/table-1` - (key1, key2, key3, value) - VALUES - (8, 1, 1, 8000); - )"); - - helper.CheckLockBroken("table-1", 3, {11, 11, 11}, lockTxId, *readResult1); + helper.WriteRowTwin(tableName, {8, 1, 1, 8000}, EvWrite); + helper.CheckLockBroken(tableName, 3, {11, 11, 11}, lockTxId, *readResult1); } - Y_UNIT_TEST(ShouldReturnBrokenLockWhenReadKeyPrefixLeftBorder) { + Y_UNIT_TEST_TWIN(ShouldReturnBrokenLockWhenReadKeyPrefixLeftBorder, EvWrite) { TTestHelper helper; const ui64 lockTxId = 1011121314; + const TString tableName = "table-1"; - auto request1 = helper.GetBaseReadRequest("table-1", 1); + auto request1 = helper.GetBaseReadRequest(tableName, 1); request1->Record.SetLockTxId(lockTxId); AddKeyQuery(*request1, {8}); - auto readResult1 = helper.SendRead("table-1", request1.release()); + auto readResult1 = helper.SendRead(tableName, request1.release()); // breaks lock obtained above // also we modify range: insert new key - ExecSQL(helper.Server, helper.Sender, R"( - UPSERT INTO `/Root/table-1` - (key1, key2, key3, value) - VALUES - (8, 0, 0, 8000); - )"); - - helper.CheckLockBroken("table-1", 3, {11, 11, 11}, lockTxId, *readResult1); + helper.WriteRowTwin(tableName, {8, 0, 0, 8000}, EvWrite); + helper.CheckLockBroken(tableName, 3, {11, 11, 11}, lockTxId, *readResult1); } - Y_UNIT_TEST(ShouldReturnBrokenLockWhenReadKeyPrefixRightBorder) { + Y_UNIT_TEST_TWIN(ShouldReturnBrokenLockWhenReadKeyPrefixRightBorder, EvWrite) { TTestHelper helper; const ui64 lockTxId = 1011121314; + const TString tableName = "table-1"; - auto request1 = helper.GetBaseReadRequest("table-1", 1); + auto request1 = helper.GetBaseReadRequest(tableName, 1); request1->Record.SetLockTxId(lockTxId); AddKeyQuery(*request1, {8}); - auto readResult1 = helper.SendRead("table-1", request1.release()); + auto readResult1 = helper.SendRead(tableName, request1.release()); // breaks lock obtained above // also we modify range: insert new key - ExecSQL(helper.Server, helper.Sender, R"( - UPSERT INTO `/Root/table-1` - (key1, key2, key3, value) - VALUES - (8, 1, 1, 8000); - )"); - - helper.CheckLockBroken("table-1", 3, {11, 11, 11}, lockTxId, *readResult1); + helper.WriteRowTwin(tableName, {8, 1, 1, 8000}, EvWrite); + helper.CheckLockBroken(tableName, 3, {11, 11, 11}, lockTxId, *readResult1); } - Y_UNIT_TEST(ShouldReturnBrokenLockWhenReadKeyWithContinue) { + Y_UNIT_TEST_TWIN(ShouldReturnBrokenLockWhenReadKeyWithContinue, EvWrite) { TTestHelper helper; const ui64 lockTxId = 1011121314; + const TString tableName = "table-1"; - auto request1 = helper.GetBaseReadRequest("table-1", 1); + auto request1 = helper.GetBaseReadRequest(tableName, 1); AddKeyQuery(*request1, {3, 3, 3}); AddKeyQuery(*request1, {1, 1, 1}); AddKeyQuery(*request1, {5, 5, 5}); request1->Record.SetMaxRows(1); request1->Record.SetLockTxId(lockTxId); - auto readResult1 = helper.SendRead("table-1", request1.release()); + auto readResult1 = helper.SendRead(tableName, request1.release()); // breaks lock obtained above // also we modify range: insert new key - ExecSQL(helper.Server, helper.Sender, R"( - UPSERT INTO `/Root/table-1` - (key1, key2, key3, value) - VALUES - (1, 1, 1, 1000); - )"); + helper.WriteRowTwin(tableName, {1, 1, 1, 1000}, EvWrite); + helper.SendReadAck(tableName, readResult1->Record, 3, 10000); - helper.SendReadAck("table-1", readResult1->Record, 3, 10000); auto readResult2 = helper.WaitReadResult(); UNIT_ASSERT_VALUES_EQUAL(readResult2->Record.BrokenTxLocksSize(), 1UL); @@ -3492,7 +3395,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { UNIT_ASSERT(lock.GetCounter() < brokenLock.GetCounter()); } - Y_UNIT_TEST(ShouldReturnBrokenLockWhenReadKeyWithContinueInvisibleRowSkips) { + Y_UNIT_TEST_TWIN(ShouldReturnBrokenLockWhenReadKeyWithContinueInvisibleRowSkips, EvWrite) { // If we read in v1, write in v2, then write breaks lock. // Because of out of order execution, v2 can happen before v1 // and we should properly handle it in DS to break lock. @@ -3504,30 +3407,20 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { {"/Root/movies", "/Root/table-1"}, TDuration::Hours(1)); - // write new data above snapshot - ExecSQL(helper.Server, helper.Sender, R"( - UPSERT INTO `/Root/table-1` - (key1, key2, key3, value) - VALUES - (4, 4, 4, 4444); - )"); - const ui64 lockTxId = 1011121314; + const TString tableName = "table-1"; + + // write new data above snapshot + helper.WriteRowTwin(tableName, {4, 4, 4, 4444}, EvWrite); - auto request1 = helper.GetBaseReadRequest("table-1", 1, NKikimrDataEvents::FORMAT_ARROW, readVersion); + auto request1 = helper.GetBaseReadRequest(tableName, 1, NKikimrDataEvents::FORMAT_ARROW, readVersion); request1->Record.SetLockTxId(lockTxId); request1->Record.SetMaxRows(1); // set quota so that DS hangs waiting for ACK - AddRangeQuery( - *request1, - {1, 1, 1}, - true, - {5, 5, 5}, - true - ); + AddRangeQuery(*request1, {1, 1, 1}, true, {5, 5, 5}, true); - auto readResult1 = helper.SendRead("table-1", request1.release()); - CheckResult(helper.Tables["table-1"].UserTable, *readResult1, { + auto readResult1 = helper.SendRead(tableName, request1.release()); + CheckResult(helper.Tables[tableName].UserTable, *readResult1, { {1, 1, 1, 100}, }); @@ -3535,9 +3428,9 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { UNIT_ASSERT_VALUES_EQUAL(readResult1->Record.TxLocksSize(), 1); UNIT_ASSERT_VALUES_EQUAL(readResult1->Record.BrokenTxLocksSize(), 0); - helper.SendReadAck("table-1", readResult1->Record, 100, 10000); + helper.SendReadAck(tableName, readResult1->Record, 100, 10000); auto readResult2 = helper.WaitReadResult(); - CheckResult(helper.Tables["table-1"].UserTable, *readResult2, { + CheckResult(helper.Tables[tableName].UserTable, *readResult2, { {3, 3, 3, 300}, {5, 5, 5, 500}, }); @@ -3550,7 +3443,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { UNIT_ASSERT_VALUES_EQUAL(lock.GetLockId(), brokenLock.GetLockId()); UNIT_ASSERT(lock.GetCounter() < brokenLock.GetCounter()); - helper.CheckLockBroken("table-1", 10, {11, 11, 11}, lockTxId, *readResult1); + helper.CheckLockBroken(tableName, 10, {11, 11, 11}, lockTxId, *readResult1); } Y_UNIT_TEST(HandlePersistentSnapshotGoneInContinue) { diff --git a/ydb/core/tx/datashard/datashard_write_operation.h b/ydb/core/tx/datashard/datashard_write_operation.h index 4737c3ada7a1..b62c8a712455 100644 --- a/ydb/core/tx/datashard/datashard_write_operation.h +++ b/ydb/core/tx/datashard/datashard_write_operation.h @@ -86,6 +86,9 @@ class TValidatedWriteTx: TNonCopyable { NMiniKQL::IEngineFlat* GetEngine() { return EngineBay.GetEngine(); } + NMiniKQL::TEngineHost* GetEngineHost() { + return EngineBay.GetEngineHost(); + } void DestroyEngine() { EngineBay.DestroyEngine(); } diff --git a/ydb/core/tx/datashard/write_unit.cpp b/ydb/core/tx/datashard/write_unit.cpp index 641a23124de6..94704128af42 100644 --- a/ydb/core/tx/datashard/write_unit.cpp +++ b/ydb/core/tx/datashard/write_unit.cpp @@ -4,6 +4,8 @@ #include "datashard_locks_db.h" #include "datashard_user_db.h" +#include + namespace NKikimr { namespace NDataShard { @@ -36,7 +38,7 @@ class TWriteUnit : public TExecutionUnit { } void DoExecute(TDataShard* self, TWriteOperation* writeOp, TTransactionContext& txc, const TActorContext& ctx) { - const TValidatedWriteTx::TPtr& writeTx = writeOp->GetWriteTx(); + TValidatedWriteTx::TPtr& writeTx = writeOp->GetWriteTx(); const ui64 tableId = writeTx->GetTableId().PathId.LocalPathId; const TTableId fullTableId(self->GetPathOwnerId(), tableId); @@ -51,7 +53,6 @@ class TWriteUnit : public TExecutionUnit { Y_ABORT_UNLESS(TableInfo_.LocalTid == localTableId); Y_ABORT_UNLESS(TableInfo_.ShadowTid == shadowTableId); - const ui32 writeTableId = localTableId; auto [readVersion, writeVersion] = self->GetReadWriteVersions(writeOp); writeTx->SetReadVersion(readVersion); writeTx->SetWriteVersion(writeVersion); @@ -59,46 +60,42 @@ class TWriteUnit : public TExecutionUnit { TDataShardUserDb userDb(*self, txc.DB, readVersion); TDataShardChangeGroupProvider groupProvider(*self, txc.DB); - TVector key; - TVector value; - TVector keyCells; + TVector commands; const TSerializedCellMatrix& matrix = writeTx->GetMatrix(); for (ui32 rowIdx = 0; rowIdx < matrix.GetRowCount(); ++rowIdx) { - key.clear(); keyCells.clear(); keyCells.reserve(TableInfo_.KeyColumnIds.size()); ui64 keyBytes = 0; for (ui16 keyColIdx = 0; keyColIdx < TableInfo_.KeyColumnIds.size(); ++keyColIdx) { - const auto& cellType = TableInfo_.KeyColumnTypes[keyColIdx]; const TCell& cell = matrix.GetCell(rowIdx, keyColIdx); - keyBytes += cell.Size(); - key.emplace_back(TRawTypeValue(cell.AsRef(), cellType)); + keyBytes += cell.IsNull() ? 1 : cell.Size(); keyCells.emplace_back(cell); } - value.clear(); + commands.clear(); + Y_ABORT_UNLESS(matrix.GetColCount() >= TableInfo_.KeyColumnIds.size()); + commands.reserve(matrix.GetColCount() - TableInfo_.KeyColumnIds.size()); + + ui64 valueBytes = 0; for (ui16 valueColIdx = TableInfo_.KeyColumnIds.size(); valueColIdx < matrix.GetColCount(); ++valueColIdx) { ui32 columnTag = writeTx->RecordOperation().GetColumnIds(valueColIdx); const TCell& cell = matrix.GetCell(rowIdx, valueColIdx); - auto* col = TableInfo_.Columns.FindPtr(valueColIdx + 1); - Y_ABORT_UNLESS(col); + valueBytes += cell.IsNull() ? 1 : cell.Size(); - value.emplace_back(NTable::TUpdateOp(columnTag, NTable::ECellOp::Set, TRawTypeValue(cell.AsRef(), col->Type))); + NMiniKQL::IEngineFlatHost::TUpdateCommand command = {columnTag, TKeyDesc::EColumnOperation::Set, {}, cell}; + commands.emplace_back(std::move(command)); } - txc.DB.Update(writeTableId, NTable::ERowOp::Upsert, key, value, writeVersion); - self->GetConflictsCache().GetTableCache(writeTableId).RemoveUncommittedWrites(keyCells, txc.DB); + writeTx->GetEngineHost()->UpdateRow(fullTableId, keyCells, commands); } //TODO: Counters // self->IncCounter(COUNTER_UPLOAD_ROWS, rowCount); // self->IncCounter(COUNTER_UPLOAD_ROWS_BYTES, matrix.GetBuffer().size()); - TableInfo_.Stats.UpdateTime = TAppData::TimeProvider->Now(); - writeOp->SetWriteResult(NEvents::TDataEvents::TEvWriteResult::BuildCommited(self->TabletID(), writeOp->GetTxId())); LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Executed write operation for " << *writeOp << " at " << self->TabletID());