diff --git a/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp b/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp index 3f47905e976f..26aa1f47b496 100644 --- a/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp +++ b/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp @@ -88,8 +88,8 @@ class TAlterCdcStreamUnit : public TExecutionUnit { return EExecutionStatus::DelayCompleteNoMoreRestarts; } - void Complete(TOperation::TPtr, const TActorContext& ctx) override { - DataShard.EmitHeartbeats(ctx); + void Complete(TOperation::TPtr, const TActorContext&) override { + DataShard.EmitHeartbeats(); } }; diff --git a/ydb/core/tx/datashard/cdc_stream_heartbeat.cpp b/ydb/core/tx/datashard/cdc_stream_heartbeat.cpp index 0e9ccbec844f..0560136f72a6 100644 --- a/ydb/core/tx/datashard/cdc_stream_heartbeat.cpp +++ b/ydb/core/tx/datashard/cdc_stream_heartbeat.cpp @@ -1,9 +1,9 @@ #include "cdc_stream_heartbeat.h" #include "datashard_impl.h" -#define LOG_D(stream) LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "[CdcStreamHeartbeat] " << stream) -#define LOG_I(stream) LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "[CdcStreamHeartbeat] " << stream) -#define LOG_W(stream) LOG_WARN_S(ctx, NKikimrServices::TX_DATASHARD, "[CdcStreamHeartbeat] " << stream) +#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "[CdcStreamHeartbeat] " << stream) +#define LOG_I(stream) LOG_INFO_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "[CdcStreamHeartbeat] " << stream) +#define LOG_W(stream) LOG_WARN_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "[CdcStreamHeartbeat] " << stream) namespace NKikimr::NDataShard { @@ -32,7 +32,7 @@ class TDataShard::TTxCdcStreamEmitHeartbeats: public NTabletFlatExecutor::TTrans TTxType GetTxType() const override { return TXTYPE_CDC_STREAM_EMIT_HEARTBEATS; } - bool Execute(TTransactionContext& txc, const TActorContext& ctx) override { + bool Execute(TTransactionContext& txc, const TActorContext&) override { LOG_I("Emit change records" << ": edge# " << Edge << ", at tablet# " << Self->TabletID()); @@ -69,16 +69,16 @@ class TDataShard::TTxCdcStreamEmitHeartbeats: public NTabletFlatExecutor::TTrans return true; } - void Complete(const TActorContext& ctx) override { + void Complete(const TActorContext&) override { LOG_I("Enqueue " << ChangeRecords.size() << " change record(s)" << ": at tablet# " << Self->TabletID()); Self->EnqueueChangeRecords(std::move(ChangeRecords)); - Self->EmitHeartbeats(ctx); + Self->EmitHeartbeats(); } }; // TTxCdcStreamEmitHeartbeats -void TDataShard::EmitHeartbeats(const TActorContext& ctx) { +void TDataShard::EmitHeartbeats() { LOG_D("Emit heartbeats" << ": at tablet# " << TabletID()); @@ -92,15 +92,23 @@ void TDataShard::EmitHeartbeats(const TActorContext& ctx) { } if (const auto& plan = TransQueue.GetPlan()) { - const auto version = plan.begin()->ToRowVersion(); + const auto version = Min(plan.begin()->ToRowVersion(), VolatileTxManager.GetMinUncertainVersion()); if (CdcStreamHeartbeatManager.ShouldEmitHeartbeat(version)) { - return Execute(new TTxCdcStreamEmitHeartbeats(this, version), ctx); + return Execute(new TTxCdcStreamEmitHeartbeats(this, version)); } + return; + } + + if (auto version = VolatileTxManager.GetMinUncertainVersion(); !version.IsMax()) { + if (CdcStreamHeartbeatManager.ShouldEmitHeartbeat(version)) { + return Execute(new TTxCdcStreamEmitHeartbeats(this, version)); + } + return; } const TRowVersion nextWrite = GetMvccTxVersion(EMvccTxMode::ReadWrite); if (CdcStreamHeartbeatManager.ShouldEmitHeartbeat(nextWrite)) { - return Execute(new TTxCdcStreamEmitHeartbeats(this, nextWrite), ctx); + return Execute(new TTxCdcStreamEmitHeartbeats(this, nextWrite)); } WaitPlanStep(lowest.Next().Step); diff --git a/ydb/core/tx/datashard/complete_data_tx_unit.cpp b/ydb/core/tx/datashard/complete_data_tx_unit.cpp index a078bf487c3b..1055068cdfed 100644 --- a/ydb/core/tx/datashard/complete_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/complete_data_tx_unit.cpp @@ -122,7 +122,7 @@ void TCompleteOperationUnit::Complete(TOperation::TPtr op, DataShard.NotifySchemeshard(ctx, op->GetTxId()); DataShard.EnqueueChangeRecords(std::move(op->ChangeRecords())); - DataShard.EmitHeartbeats(ctx); + DataShard.EmitHeartbeats(); if (op->HasOutputData()) { const auto& outReadSets = op->OutReadSets(); diff --git a/ydb/core/tx/datashard/create_cdc_stream_unit.cpp b/ydb/core/tx/datashard/create_cdc_stream_unit.cpp index ef5433e8aba3..0bbf0261f5da 100644 --- a/ydb/core/tx/datashard/create_cdc_stream_unit.cpp +++ b/ydb/core/tx/datashard/create_cdc_stream_unit.cpp @@ -77,7 +77,7 @@ class TCreateCdcStreamUnit : public TExecutionUnit { void Complete(TOperation::TPtr, const TActorContext& ctx) override { if (AddSender) { ctx.Send(DataShard.GetChangeSender(), AddSender.Release()); - DataShard.EmitHeartbeats(ctx); + DataShard.EmitHeartbeats(); } } }; diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 5433ad33c9f7..f71f7231e516 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -2289,7 +2289,7 @@ void TDataShard::SendAfterMediatorStepActivate(ui64 mediatorStep, const TActorCo PromoteFollowerReadEdge(); } - EmitHeartbeats(ctx); + EmitHeartbeats(); } void TDataShard::CheckMediatorStateRestored() { diff --git a/ydb/core/tx/datashard/datashard__init.cpp b/ydb/core/tx/datashard/datashard__init.cpp index 93fb4ce452a5..b00bb254ecdb 100644 --- a/ydb/core/tx/datashard/datashard__init.cpp +++ b/ydb/core/tx/datashard/datashard__init.cpp @@ -115,7 +115,7 @@ void TDataShard::TTxInit::Complete(const TActorContext &ctx) { Self->CreateChangeSender(ctx); Self->EnqueueChangeRecords(std::move(ChangeRecords)); Self->MaybeActivateChangeSender(ctx); - Self->EmitHeartbeats(ctx); + Self->EmitHeartbeats(); if (!Self->ChangesQueue) { if (!Self->ChangeExchangeSplitter.Done()) { diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index bc3e8a3e4333..7720260dce60 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -1877,7 +1877,7 @@ class TDataShard TCdcStreamHeartbeatManager& GetCdcStreamHeartbeatManager() { return CdcStreamHeartbeatManager; } const TCdcStreamHeartbeatManager& GetCdcStreamHeartbeatManager() const { return CdcStreamHeartbeatManager; } - void EmitHeartbeats(const TActorContext& ctx); + void EmitHeartbeats(); template bool PromoteCompleteEdge(Args&&... args) { diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index 97576e848154..601a8c579ec3 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -1,4 +1,5 @@ #include +#include "datashard_ut_common_kqp.h" #include #include @@ -22,6 +23,7 @@ namespace NKikimr { using namespace NDataShard; +using namespace NDataShard::NKqpHelpers; using namespace Tests; Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) { @@ -1864,11 +1866,13 @@ Y_UNIT_TEST_SUITE(Cdc) { void WaitForContent(TServer::TPtr server, const TActorId& sender, const TString& path, const TVector& expected) { while (true) { const auto records = GetRecords(*server->GetRuntime(), sender, path, 0); - if (records.size() == expected.size()) { - for (ui32 i = 0; i < expected.size(); ++i) { - AssertJsonsEqual(records.at(i).second, expected.at(i)); - } + for (ui32 i = 0; i < std::min(records.size(), expected.size()); ++i) { + AssertJsonsEqual(records.at(i).second, expected.at(i)); + } + if (records.size() >= expected.size()) { + UNIT_ASSERT_VALUES_EQUAL_C(records.size(), expected.size(), + "Unexpected record: " << records.at(expected.size()).second); break; } @@ -3157,6 +3161,103 @@ Y_UNIT_TEST_SUITE(Cdc) { }); } + Y_UNIT_TEST(ResolvedTimestampsVolatileOutOfOrder) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + .SetEnableDataShardVolatileTransactions(true) + ); + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + CreateShardedTable(server, edgeActor, "/Root", "Table1", SimpleTable()); + CreateShardedTable(server, edgeActor, "/Root", "Table2", SimpleTable()); + + WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table1", + WithResolvedTimestamps(TDuration::Seconds(3), Updates(NKikimrSchemeOp::ECdcStreamFormatJson)))); + WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table2", + WithResolvedTimestamps(TDuration::Seconds(3), Updates(NKikimrSchemeOp::ECdcStreamFormatJson)))); + + WaitForContent(server, edgeActor, "/Root/Table1/Stream", { + R"({"resolved":"***"})", + }); + WaitForContent(server, edgeActor, "/Root/Table2/Stream", { + R"({"resolved":"***"})", + }); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table1` (key, value) VALUES (1, 10); + UPSERT INTO `/Root/Table2` (key, value) VALUES (2, 20); + )"); + + WaitForContent(server, edgeActor, "/Root/Table1/Stream", { + R"({"resolved":"***"})", + R"({"update":{"value":10},"key":[1]})", + R"({"resolved":"***"})", + }); + WaitForContent(server, edgeActor, "/Root/Table2/Stream", { + R"({"resolved":"***"})", + R"({"update":{"value":20},"key":[2]})", + R"({"resolved":"***"})", + }); + + // Block readset exchange + std::vector> readSets; + auto blockReadSets = runtime.AddObserver([&](TEvTxProcessing::TEvReadSet::TPtr& ev) { + readSets.emplace_back(ev.Release()); + }); + + // Start a distributed write to both tables + TString sessionId = CreateSessionRPC(runtime, "/Root"); + auto upsertResult = SendRequest( + runtime, + MakeSimpleRequestRPC(R"( + UPSERT INTO `/Root/Table1` (key, value) VALUES (3, 30); + UPSERT INTO `/Root/Table2` (key, value) VALUES (4, 40); + )", sessionId, /* txId */ "", /* commitTx */ true), + "/Root"); + WaitFor(runtime, [&]{ return readSets.size() >= 4; }, "readsets"); + + // Stop blocking further readsets + blockReadSets.Remove(); + + // Start another distributed write to both tables, it should succeed + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table1` (key, value) VALUES (5, 50); + UPSERT INTO `/Root/Table2` (key, value) VALUES (6, 60); + )"); + + runtime.SimulateSleep(TDuration::Seconds(10)); + + // Unblock readsets + for (auto& ev : readSets) { + runtime.Send(ev.release(), 0, true); + } + readSets.clear(); + + // There should be only one resolved timestamp after out of order writes + WaitForContent(server, edgeActor, "/Root/Table1/Stream", { + R"({"resolved":"***"})", + R"({"update":{"value":10},"key":[1]})", + R"({"resolved":"***"})", + R"({"update":{"value":50},"key":[5]})", + R"({"update":{"value":30},"key":[3]})", + R"({"resolved":"***"})", + }); + WaitForContent(server, edgeActor, "/Root/Table2/Stream", { + R"({"resolved":"***"})", + R"({"update":{"value":20},"key":[2]})", + R"({"resolved":"***"})", + R"({"update":{"value":60},"key":[6]})", + R"({"update":{"value":40},"key":[4]})", + R"({"resolved":"***"})", + }); + } + Y_UNIT_TEST(SequentialSplitMerge) { TTestPqEnv env(SimpleTable(), Updates(NKikimrSchemeOp::ECdcStreamFormatJson), false); SetSplitMergePartCountLimit(env.GetServer()->GetRuntime(), -1); diff --git a/ydb/core/tx/datashard/direct_tx_unit.cpp b/ydb/core/tx/datashard/direct_tx_unit.cpp index 11e394800cb7..b47187c5daa8 100644 --- a/ydb/core/tx/datashard/direct_tx_unit.cpp +++ b/ydb/core/tx/datashard/direct_tx_unit.cpp @@ -80,7 +80,7 @@ class TDirectOpUnit : public TExecutionUnit { void Complete(TOperation::TPtr op, const TActorContext& ctx) override { Pipeline.RemoveCommittingOp(op); DataShard.EnqueueChangeRecords(std::move(op->ChangeRecords())); - DataShard.EmitHeartbeats(ctx); + DataShard.EmitHeartbeats(); TDirectTransaction* tx = dynamic_cast(op.Get()); Y_ABORT_UNLESS(tx != nullptr); diff --git a/ydb/core/tx/datashard/finish_propose_unit.cpp b/ydb/core/tx/datashard/finish_propose_unit.cpp index 2360bdc40040..7c30cb960093 100644 --- a/ydb/core/tx/datashard/finish_propose_unit.cpp +++ b/ydb/core/tx/datashard/finish_propose_unit.cpp @@ -141,7 +141,7 @@ void TFinishProposeUnit::Complete(TOperation::TPtr op, Pipeline.RemoveActiveOp(op); DataShard.EnqueueChangeRecords(std::move(op->ChangeRecords())); - DataShard.EmitHeartbeats(ctx); + DataShard.EmitHeartbeats(); } DataShard.SendRegistrationRequestTimeCast(ctx); diff --git a/ydb/core/tx/datashard/finish_propose_write_unit.cpp b/ydb/core/tx/datashard/finish_propose_write_unit.cpp index 413f09e0716c..d6c9dffa10fa 100644 --- a/ydb/core/tx/datashard/finish_propose_write_unit.cpp +++ b/ydb/core/tx/datashard/finish_propose_write_unit.cpp @@ -140,7 +140,7 @@ void TFinishProposeWriteUnit::Complete(TOperation::TPtr op, const TActorContext Pipeline.RemoveActiveOp(op); DataShard.EnqueueChangeRecords(std::move(op->ChangeRecords())); - DataShard.EmitHeartbeats(ctx); + DataShard.EmitHeartbeats(); } DataShard.SendRegistrationRequestTimeCast(ctx); diff --git a/ydb/core/tx/datashard/volatile_tx.cpp b/ydb/core/tx/datashard/volatile_tx.cpp index 35af1fd255fb..edd5ca149d69 100644 --- a/ydb/core/tx/datashard/volatile_tx.cpp +++ b/ydb/core/tx/datashard/volatile_tx.cpp @@ -610,6 +610,8 @@ namespace NKikimr::NDataShard { Self->PromoteFollowerReadEdge(); } + Self->EmitHeartbeats(); + if (!WaitingSnapshotEvents.empty()) { TVolatileTxInfo* next = !VolatileTxByVersion.empty() ? *VolatileTxByVersion.begin() : nullptr; while (!WaitingSnapshotEvents.empty()) { diff --git a/ydb/core/tx/datashard/write_unit.cpp b/ydb/core/tx/datashard/write_unit.cpp index c469d75b0fd0..15d4065b279c 100644 --- a/ydb/core/tx/datashard/write_unit.cpp +++ b/ydb/core/tx/datashard/write_unit.cpp @@ -182,7 +182,7 @@ class TWriteUnit : public TExecutionUnit { void Complete(TOperation::TPtr op, const TActorContext& ctx) override { Pipeline.RemoveCommittingOp(op); DataShard.EnqueueChangeRecords(std::move(op->ChangeRecords())); - DataShard.EmitHeartbeats(ctx); + DataShard.EmitHeartbeats(); TWriteOperation* writeOp = TWriteOperation::CastWriteOperation(op);