From cc95e9606d6a51ce58afe901d9977caef68864e0 Mon Sep 17 00:00:00 2001 From: Aleksei Borzenkov Date: Tue, 29 Oct 2024 10:53:43 +0000 Subject: [PATCH] Fix excessive read latency during and after shard splits --- .../tx/datashard/datashard__read_iterator.cpp | 10 +++ .../tx/datashard/datashard_ut_common_kqp.h | 8 ++- .../datashard/datashard_ut_read_iterator.cpp | 72 ++++++++++++++++++- 3 files changed, 85 insertions(+), 5 deletions(-) diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp index 6be261d99bee..230deb29e0f5 100644 --- a/ydb/core/tx/datashard/datashard__read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp @@ -2645,6 +2645,16 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct return; } + if (State == TShardState::PreOffline || + State == TShardState::Offline) + { + replyWithError( + Ydb::StatusIds::NOT_FOUND, + TStringBuilder() << "Shard " << TabletID() << " finished splitting/merging" + << " (node# " << SelfId().NodeId() << " state# " << DatashardStateName(State) << ")"); + return; + } + if (!IsStateNewReadAllowed()) { replyWithError( Ydb::StatusIds::OVERLOADED, diff --git a/ydb/core/tx/datashard/datashard_ut_common_kqp.h b/ydb/core/tx/datashard/datashard_ut_common_kqp.h index 1e4950575184..eaec5b5215a2 100644 --- a/ydb/core/tx/datashard/datashard_ut_common_kqp.h +++ b/ydb/core/tx/datashard/datashard_ut_common_kqp.h @@ -186,10 +186,14 @@ namespace NKqpHelpers { return KqpSimpleExec(runtime, query, true, database); } - inline TString KqpSimpleBegin(TTestActorRuntime& runtime, TString& sessionId, TString& txId, const TString& query) { + inline auto KqpSimpleBeginSend(TTestActorRuntime& runtime, TString& sessionId, const TString& query) { sessionId = CreateSessionRPC(runtime); + return SendRequest(runtime, MakeSimpleRequestRPC(query, sessionId, /* txId */ {}, false /* commitTx */)); + } + + inline TString KqpSimpleBegin(TTestActorRuntime& runtime, TString& sessionId, TString& txId, const TString& query) { txId.clear(); - auto response = AwaitResponse(runtime, SendRequest(runtime, MakeSimpleRequestRPC(query, sessionId, txId, false /* commitTx */))); + auto response = AwaitResponse(runtime, KqpSimpleBeginSend(runtime, sessionId, query)); if (response.operation().status() != Ydb::StatusIds::SUCCESS) { return TStringBuilder() << "ERROR: " << response.operation().status(); } diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp index 14153b3e8912..e65b10d9b119 100644 --- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp @@ -3970,7 +3970,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { Y_UNIT_TEST(HandleMvccGoneInContinue) { // TODO } -}; +} Y_UNIT_TEST_SUITE(DataShardReadIteratorSysTables) { Y_UNIT_TEST(ShouldRead) { @@ -4054,7 +4054,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorSysTables) { UNIT_ASSERT_VALUES_EQUAL(record.GetStatus().GetCode(), Ydb::StatusIds::UNSUPPORTED); } -}; +} Y_UNIT_TEST_SUITE(DataShardReadIteratorState) { Y_UNIT_TEST(ShouldCalculateQuota) { @@ -4105,7 +4105,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorState) { UNIT_ASSERT_VALUES_EQUAL(state.Quota.Bytes, 131729); UNIT_ASSERT(state.State == NDataShard::TReadIteratorState::EState::Executing); } -}; +} Y_UNIT_TEST_SUITE(DataShardReadIteratorPageFaults) { Y_UNIT_TEST(CancelPageFaultedReadThenDropTable) { @@ -4755,4 +4755,70 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorConsistency) { } +Y_UNIT_TEST_SUITE(DataShardReadIteratorLatency) { + + Y_UNIT_TEST(ReadSplitLatency) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false); + TServer::TPtr server = new TServer(serverSettings); + + auto& runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + + InitRoot(server, sender); + + TDisableDataShardLogBatching disableDataShardLogBatching; + + CreateShardedTable(server, sender, "/Root", "table-1", 1); + + // Insert initial data + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 10), (2, 20), (3, 30), (4, 40), (5, 50);"); + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (6, 60), (7, 70), (8, 80), (9, 90), (10, 100);"); + + // Copy table (this will ensure original shards stay alive after split) + { + auto senderCopy = runtime.AllocateEdgeActor(); + ui64 txId = AsyncCreateCopyTable(server, senderCopy, "/Root", "table-2", "/Root/table-1"); + WaitTxNotification(server, senderCopy, txId); + } + + TBlockEvents blockedReads(runtime); + + Cerr << "... starting read from table-1" << Endl; + TString readSessionId; + auto readFuture = KqpSimpleBeginSend(runtime, readSessionId, R"( + SELECT * FROM `/Root/table-1` ORDER BY key; + )"); + + runtime.WaitFor("blocked TEvRead", [&]{ return blockedReads.size() >= 1; }); + + { + Cerr << "... splitting table-1" << Endl; + SetSplitMergePartCountLimit(server->GetRuntime(), -1); + auto shards1before = GetTableShards(server, sender, "/Root/table-1"); + ui64 txId = AsyncSplitTable(server, sender, "/Root/table-1", shards1before.at(0), 5); + Cerr << "... split txId# " << txId << " started" << Endl; + WaitTxNotification(server, sender, txId); + Cerr << "... split txId# " << txId << " finished" << Endl; + } + + runtime.SimulateSleep(TDuration::MilliSeconds(1)); + + auto readStartTs = runtime.GetCurrentTime(); + blockedReads.Unblock(); + blockedReads.Stop(); + auto readResponse = runtime.WaitFuture(std::move(readFuture)); + UNIT_ASSERT_VALUES_EQUAL(readResponse.operation().status(), Ydb::StatusIds::SUCCESS); + auto readLatency = runtime.GetCurrentTime() - readStartTs; + Cerr << "... read latency was " << readLatency << Endl; + UNIT_ASSERT_C(readLatency < TDuration::MilliSeconds(100), + "unexpected read latency " << readLatency); + } + +} + } // namespace NKikimr