diff --git a/ydb/core/tx/datashard/cdc_stream_heartbeat.cpp b/ydb/core/tx/datashard/cdc_stream_heartbeat.cpp index 00396a250977..0c228730c13a 100644 --- a/ydb/core/tx/datashard/cdc_stream_heartbeat.cpp +++ b/ydb/core/tx/datashard/cdc_stream_heartbeat.cpp @@ -95,27 +95,27 @@ void TDataShard::EmitHeartbeats() { return; } + // We may possibly have more writes at this version + TRowVersion edge = GetMvccTxVersion(EMvccTxMode::ReadWrite); + bool wait = true; + if (const auto& plan = TransQueue.GetPlan()) { - const auto version = Min(plan.begin()->ToRowVersion(), VolatileTxManager.GetMinUncertainVersion()); - if (CdcStreamHeartbeatManager.ShouldEmitHeartbeat(version)) { - return Execute(new TTxCdcStreamEmitHeartbeats(this, version)); - } - return; + edge = Min(edge, plan.begin()->ToRowVersion()); + wait = false; } if (auto version = VolatileTxManager.GetMinUncertainVersion(); !version.IsMax()) { - if (CdcStreamHeartbeatManager.ShouldEmitHeartbeat(version)) { - return Execute(new TTxCdcStreamEmitHeartbeats(this, version)); - } - return; + edge = Min(edge, version); + wait = false; } - const TRowVersion nextWrite = GetMvccTxVersion(EMvccTxMode::ReadWrite); - if (CdcStreamHeartbeatManager.ShouldEmitHeartbeat(nextWrite)) { - return Execute(new TTxCdcStreamEmitHeartbeats(this, nextWrite)); + if (CdcStreamHeartbeatManager.ShouldEmitHeartbeat(edge)) { + return Execute(new TTxCdcStreamEmitHeartbeats(this, edge)); } - WaitPlanStep(lowest.Next().Step); + if (wait) { + WaitPlanStep(lowest.Next().Step); + } } void TCdcStreamHeartbeatManager::Reset() { @@ -215,7 +215,7 @@ bool TCdcStreamHeartbeatManager::ShouldEmitHeartbeat(const TRowVersion& edge) co return false; } - if (Schedule.top().Version > edge) { + if (Schedule.top().Version >= edge) { return false; } @@ -225,7 +225,7 @@ bool TCdcStreamHeartbeatManager::ShouldEmitHeartbeat(const TRowVersion& edge) co THashMap TCdcStreamHeartbeatManager::EmitHeartbeats( NTable::TDatabase& db, const TRowVersion& edge) { - if (Schedule.empty() || Schedule.top().Version > edge) { + if (!ShouldEmitHeartbeat(edge)) { return {}; } @@ -234,7 +234,7 @@ THashMap TCdcStreamHeartbea while (true) { const auto& top = Schedule.top(); - if (top.Version > edge) { + if (top.Version >= edge) { break; } diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index 7664fff85a8d..4991860291da 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -1985,7 +1986,7 @@ Y_UNIT_TEST_SUITE(Cdc) { return result; } - void WaitForContent(TServer::TPtr server, const TActorId& sender, const TString& path, const TVector& expected) { + TVector WaitForContent(TServer::TPtr server, const TActorId& sender, const TString& path, const TVector& expected) { while (true) { const auto records = GetRecords(*server->GetRuntime(), sender, path, 0); for (ui32 i = 0; i < std::min(records.size(), expected.size()); ++i) { @@ -1995,7 +1996,12 @@ Y_UNIT_TEST_SUITE(Cdc) { if (records.size() >= expected.size()) { UNIT_ASSERT_VALUES_EQUAL_C(records.size(), expected.size(), "Unexpected record: " << records.at(expected.size()).second); - break; + TVector values; + for (const auto& pr : records) { + bool ok = NJson::ReadJsonTree(pr.second, &values.emplace_back()); + Y_ABORT_UNLESS(ok); + } + return values; } SimulateSleep(server, TDuration::Seconds(1)); @@ -3692,6 +3698,136 @@ Y_UNIT_TEST_SUITE(Cdc) { }); } + Y_UNIT_TEST(ResolvedTimestampForDisplacedUpsert) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + ); + + TDisableDataShardLogBatching disableDataShardLogBatching; + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + SetSplitMergePartCountLimit(&runtime, -1); + CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable()); + + WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table", + WithVirtualTimestamps(WithResolvedTimestamps( + TDuration::Seconds(3), Updates(NKikimrSchemeOp::ECdcStreamFormatJson))))); + + Cerr << "... prepare" << Endl; + WaitForContent(server, edgeActor, "/Root/Table/Stream", { + R"({"resolved":"***"})", + }); + + KqpSimpleExec(runtime, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES (1, 10); + )"); + + auto records = WaitForContent(server, edgeActor, "/Root/Table/Stream", { + R"({"resolved":"***"})", + R"({"update":{"value":10},"key":[1],"ts":"***"})", + R"({"resolved":"***"})", + }); + + // Take the final step + ui64 lastStep = records.back()["resolved"][0].GetUInteger(); + Cerr << "... last heartbeat at " << lastStep << Endl; + + const auto tableId = ResolveTableId(server, edgeActor, "/Root/Table"); + const auto shards = GetTableShards(server, edgeActor, "/Root/Table"); + UNIT_ASSERT_VALUES_EQUAL(shards.size(), 1u); + + ui64 coordinator = ChangeStateStorage(Coordinator, server->GetSettings().Domain); + ui64 snapshotStep = lastStep + 3000 - 1; + ForwardToTablet(runtime, coordinator, edgeActor, new TEvTxProxy::TEvRequirePlanSteps(coordinator, snapshotStep)); + + TBlockEvents blockedUpdates(runtime, + [&](auto& ev) { + return ev->Get()->Record.GetTimeBarrier() > snapshotStep; + }); + + Cerr << "... performing a read from snapshot just before the next heartbeat" << Endl; + { + auto req = std::make_unique(); + { + auto& record = req->Record; + record.SetReadId(1); + record.MutableTableId()->SetOwnerId(tableId.PathId.OwnerId); + record.MutableTableId()->SetTableId(tableId.PathId.LocalPathId); + record.AddColumns(1); + record.AddColumns(2); + record.SetResultFormat(NKikimrDataEvents::FORMAT_CELLVEC); + ui32 key = 1; + TVector keys; + keys.push_back(TCell::Make(key)); + req->Keys.push_back(TSerializedCellVec(TSerializedCellVec::Serialize(keys))); + record.MutableSnapshot()->SetStep(snapshotStep); + record.MutableSnapshot()->SetTxId(Max()); + } + ForwardToTablet(runtime, shards.at(0), edgeActor, req.release()); + auto ev = runtime.GrabEdgeEventRethrow(edgeActor); + auto* res = ev->Get(); + UNIT_ASSERT_VALUES_EQUAL(res->Record.GetStatus().GetCode(), Ydb::StatusIds::SUCCESS); + UNIT_ASSERT_VALUES_EQUAL(res->Record.GetFinished(), true); + Cerr << "... read finished" << Endl; + } + for (int i = 0; i < 10; ++i) { + runtime.SimulateSleep(TDuration::MilliSeconds(1)); + } + + Cerr << "... starting upsert 1 (expected to displace)" << Endl; + auto upsert1 = KqpSimpleSend(runtime, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES (2, 20); + )"); + for (int i = 0; i < 10; ++i) { + runtime.SimulateSleep(TDuration::MilliSeconds(1)); + } + + Cerr << "... starting upsert 2 (expected to displace)" << Endl; + auto upsert2 = KqpSimpleSend(runtime, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES (3, 30); + )"); + for (int i = 0; i < 10; ++i) { + runtime.SimulateSleep(TDuration::MilliSeconds(1)); + } + + Cerr << "... unblocking updates" << Endl; + blockedUpdates.Unblock().Stop(); + for (int i = 0; i < 10; ++i) { + runtime.SimulateSleep(TDuration::MilliSeconds(1)); + } + + Cerr << "... checking the update is logged before the new resolved timestamp" << Endl; + records = WaitForContent(server, edgeActor, "/Root/Table/Stream", { + R"({"resolved":"***"})", + R"({"update":{"value":10},"key":[1],"ts":"***"})", + R"({"resolved":"***"})", + R"({"update":{"value":20},"key":[2],"ts":"***"})", + R"({"update":{"value":30},"key":[3],"ts":"***"})", + R"({"resolved":"***"})", + }); + + TRowVersion resolved(0, 0); + for (auto& record : records) { + if (record.Has("resolved")) { + resolved.Step = record["resolved"][0].GetUInteger(); + resolved.TxId = record["resolved"][1].GetUInteger(); + } + if (record.Has("ts")) { + TRowVersion ts( + record["ts"][0].GetUInteger(), + record["ts"][1].GetUInteger()); + UNIT_ASSERT_C(resolved < ts, + "Record with ts " << ts << " after resolved " << resolved); + } + } + } + } // Cdc } // NKikimr