diff --git a/ydb/core/base/feature_flags.h b/ydb/core/base/feature_flags.h index 262939d0fc0b..626ccad3f674 100644 --- a/ydb/core/base/feature_flags.h +++ b/ydb/core/base/feature_flags.h @@ -13,17 +13,6 @@ class TFeatureFlags: public NKikimrConfig::TFeatureFlags { using TBase::TBase; using TBase::operator=; - inline std::optional GetEnableMvcc() const { - switch (TBase::GetEnableMvcc()) { - case NKikimrConfig::TFeatureFlags::UNSET: - return std::nullopt; - case NKikimrConfig::TFeatureFlags::VALUE_TRUE: - return true; - case NKikimrConfig::TFeatureFlags::VALUE_FALSE: - return false; - } - } - inline void SetEnableBackgroundCompactionForTest(bool value) { SetEnableBackgroundCompaction(value); } diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 0b9674e3df88..2a4d9e7cfc42 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -182,8 +182,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseEnableMvccSnapshotWithLegacyDomainRoot) && - AppData()->FeatureFlags.GetEnableMvccSnapshotReads() + (!Database.empty() || AppData()->EnableMvccSnapshotWithLegacyDomainRoot) ); return forceSnapshot; @@ -1272,6 +1271,11 @@ class TKqpDataExecuter : public TKqpExecuterBaseTabletId << " lost pipe while waiting for reply" << (msg->NotDelivered ? " (last message not delivered)" : "")); + if (ReadOnlyTx && msg->NotDelivered) { + CancelProposal(msg->TabletId); + return ReplyUnavailable(TStringBuilder() << "Could not deliver program to shard " << msg->TabletId); + } + return ReplyTxStateUnknown(msg->TabletId); } diff --git a/ydb/core/kqp/session_actor/kqp_tx.cpp b/ydb/core/kqp/session_actor/kqp_tx.cpp index ec5accc1b0e2..b419125c66be 100644 --- a/ydb/core/kqp/session_actor/kqp_tx.cpp +++ b/ydb/core/kqp/session_actor/kqp_tx.cpp @@ -123,10 +123,9 @@ TKqpTransactionInfo TKqpTransactionContext::GetInfo() const { bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfiguration& config, bool rollbackTx, bool commitTx, const NKqpProto::TKqpPhyQuery& physicalQuery) { - if (*txCtx.EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE) - return false; + Y_UNUSED(config); - if (!config.FeatureFlags.GetEnableMvccSnapshotReads()) + if (*txCtx.EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE) return false; if (txCtx.GetSnapshot().IsValid()) diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.h b/ydb/core/kqp/ut/common/kqp_ut_common.h index fce52247bd30..2f05c0d60834 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.h +++ b/ydb/core/kqp/ut/common/kqp_ut_common.h @@ -178,10 +178,6 @@ class TKikimrRunner { return GetTestServer().GetRuntime()->WaitFuture(future); } - bool IsUsingSnapshotReads() const { - return Server->GetRuntime()->GetAppData().FeatureFlags.GetEnableMvccSnapshotReads(); - } - private: void Initialize(const TKikimrSettings& settings); void WaitForKqpProxyInit(); diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp index e9bd4c7eedcb..d79e0f3440a1 100644 --- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp @@ -856,14 +856,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { SELECT * FROM `/Root/TwoShard` WHERE Key = 2; )", TTxControl::Tx(*tx).CommitTx()).GetValueSync(); - if (kikimr.IsUsingSnapshotReads()) { - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); - } else { - UNIT_ASSERT(!result.IsSuccess()); - result.GetIssues().PrintTo(Cerr); - UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::ABORTED); - UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED)); - } + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); } Y_UNIT_TEST(LocksMultiShard) { @@ -888,14 +881,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { SELECT * FROM `/Root/EightShard`; )", TTxControl::Tx(*tx).CommitTx()).GetValueSync(); - if (kikimr.IsUsingSnapshotReads()) { - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); - } else { - UNIT_ASSERT(!result.IsSuccess()); - result.GetIssues().PrintTo(Cerr); - UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::ABORTED); - UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED)); - } + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); } Y_UNIT_TEST(LocksMultiShardOk) { @@ -1016,13 +1002,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { SELECT 42; )", TTxControl::Tx(*tx).CommitTx()).GetValueSync(); - if (kikimr.IsUsingSnapshotReads()) { - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); - } else { - Cerr << result.GetIssues().ToString() << Endl; - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString()); - UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED)); - } + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); } Y_UNIT_TEST(BrokenLocksAtROTxSharded) { @@ -1050,13 +1030,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { SELECT 42; )", TTxControl::Tx(*tx).CommitTx()).GetValueSync(); - if (kikimr.IsUsingSnapshotReads()) { - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); - } else { - Cerr << result.GetIssues().ToString() << Endl; - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString()); - UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED)); - } + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); } Y_UNIT_TEST(BrokenLocksOnUpdate) { diff --git a/ydb/core/kqp/ut/tx/kqp_tx_ut.cpp b/ydb/core/kqp/ut/tx/kqp_tx_ut.cpp index abc0c2ed2738..d9b77c2cf891 100644 --- a/ydb/core/kqp/ut/tx/kqp_tx_ut.cpp +++ b/ydb/core/kqp/ut/tx/kqp_tx_ut.cpp @@ -357,12 +357,7 @@ Y_UNIT_TEST_SUITE(KqpTx) { } auto commitResult = tx->Commit().ExtractValueSync(); - if (kikimr.IsUsingSnapshotReads()) { - UNIT_ASSERT_VALUES_EQUAL_C(commitResult.GetStatus(), EStatus::SUCCESS, commitResult.GetIssues().ToString()); - } else { - UNIT_ASSERT_VALUES_EQUAL_C(commitResult.GetStatus(), EStatus::ABORTED, commitResult.GetIssues().ToString()); - UNIT_ASSERT(HasIssue(commitResult.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED)); - } + UNIT_ASSERT_VALUES_EQUAL_C(commitResult.GetStatus(), EStatus::SUCCESS, commitResult.GetIssues().ToString()); } Y_UNIT_TEST(EmptyTxOnCommit) { diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 62af1c8b2677..bb6323000edd 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1112,13 +1112,14 @@ message TImmediateControlsConfig { MaxValue: 134217728, DefaultValue: 0 }]; + // Note: these settings are deprecated (always enabled) optional uint64 PrioritizedMvccSnapshotReads = 8 [(ControlOptions) = { - Description: "Enables prioritized mvcc snapshot reads over immediate writes", + Description: "Enables prioritized mvcc snapshot reads over immediate writes (deprecated, always enabled)", MinValue: 0, MaxValue: 1, DefaultValue: 1 }]; optional uint64 UnprotectedMvccSnapshotReads = 9 [(ControlOptions) = { - Description: "Enables unprotected (fully readonly) mvcc snapshot reads", + Description: "Enables unprotected (fully readonly) mvcc snapshot reads (deprecated, always enabled)", MinValue: 0, MaxValue: 1, DefaultValue: 1 }]; diff --git a/ydb/core/protos/feature_flags.proto b/ydb/core/protos/feature_flags.proto index bf4af84de48b..79a55c2189b8 100644 --- a/ydb/core/protos/feature_flags.proto +++ b/ydb/core/protos/feature_flags.proto @@ -54,8 +54,8 @@ message TFeatureFlags { optional bool EnableKqpScanOverPersistentSnapshot = 43 [default = true]; // deprecated: always true optional bool EnableOlapSchemaOperations = 44 [default = true]; optional bool EnableVPatch = 45 [default = true]; - optional bool EnableMvccSnapshotReads = 46 [default = true]; - optional Tribool EnableMvcc = 47 [default = VALUE_TRUE]; + optional bool EnableMvccSnapshotReads = 46 [default = true]; // deprecated: always true + optional Tribool EnableMvcc = 47 [default = VALUE_TRUE]; // deprecated: always true optional bool EnableSchemeTransactionsAtSchemeShard = 48 [default = true]; optional bool EnableArrowFormatAtDatashard = 49 [default = false]; optional bool Enable3x3RequestsForMirror3DCMinLatencyPut = 50 [default = false]; diff --git a/ydb/core/testlib/basics/feature_flags.h b/ydb/core/testlib/basics/feature_flags.h index 32b44d6c7193..94ebd910a20d 100644 --- a/ydb/core/testlib/basics/feature_flags.h +++ b/ydb/core/testlib/basics/feature_flags.h @@ -26,7 +26,6 @@ class TTestFeatureFlagsHolder { FEATURE_FLAG_SETTER(EnableDataColumnForIndexTable) FEATURE_FLAG_SETTER(EnableClockGettimeForUserCpuAccounting) FEATURE_FLAG_SETTER(EnableOlapSchemaOperations) - FEATURE_FLAG_SETTER(EnableMvccSnapshotReads) FEATURE_FLAG_SETTER(EnableBackgroundCompaction) FEATURE_FLAG_SETTER(EnableBackgroundCompactionServerless) FEATURE_FLAG_SETTER(EnableBorrowedSplitCompaction) diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 6ac1352f8cfa..c251597f7ce7 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -151,8 +151,6 @@ TDataShard::TDataShard(const TActorId &tablet, TTabletStorageInfo *info) , BackupReadAheadHi(0, 0, 128*1024*1024) , TtlReadAheadLo(0, 0, 64*1024*1024) , TtlReadAheadHi(0, 0, 128*1024*1024) - , EnablePrioritizedMvccSnapshotReads(1, 0, 1) - , EnableUnprotectedMvccSnapshotReads(1, 0, 1) , EnableLockedWrites(1, 0, 1) , MaxLockedWritesPerKey(1000, 0, 1000000) , EnableLeaderLeases(1, 0, 1) @@ -325,8 +323,6 @@ void TDataShard::IcbRegister() { appData->Icb->RegisterSharedControl(TtlReadAheadLo, "DataShardControls.TtlReadAheadLo"); appData->Icb->RegisterSharedControl(TtlReadAheadHi, "DataShardControls.TtlReadAheadHi"); - appData->Icb->RegisterSharedControl(EnablePrioritizedMvccSnapshotReads, "DataShardControls.PrioritizedMvccSnapshotReads"); - appData->Icb->RegisterSharedControl(EnableUnprotectedMvccSnapshotReads, "DataShardControls.UnprotectedMvccSnapshotReads"); appData->Icb->RegisterSharedControl(EnableLockedWrites, "DataShardControls.EnableLockedWrites"); appData->Icb->RegisterSharedControl(MaxLockedWritesPerKey, "DataShardControls.MaxLockedWritesPerKey"); @@ -2098,27 +2094,20 @@ TDataShard::TPromotePostExecuteEdges TDataShard::PromoteImmediatePostExecuteEdge break; case EPromotePostExecuteEdges::RepeatableRead: { - bool unprotectedReads = GetEnableUnprotectedMvccSnapshotReads(); - if (unprotectedReads) { - // We want to use unprotected reads, but we need to make sure it's properly marked first - if (!SnapshotManager.GetPerformedUnprotectedReads()) { - SnapshotManager.SetPerformedUnprotectedReads(true, txc); - res.HadWrites = true; - } - if (!res.HadWrites && !SnapshotManager.IsPerformedUnprotectedReadsCommitted()) { - // We need to wait for completion until the flag is committed - res.WaitCompletion = true; - } - SnapshotManager.PromoteUnprotectedReadEdge(version); - } else if (SnapshotManager.GetPerformedUnprotectedReads()) { - // We want to drop the flag as soon as possible - SnapshotManager.SetPerformedUnprotectedReads(false, txc); + // We want to use unprotected reads, but we need to make sure it's properly marked first + if (!SnapshotManager.GetPerformedUnprotectedReads()) { + SnapshotManager.SetPerformedUnprotectedReads(true, txc); res.HadWrites = true; } + if (!res.HadWrites && !SnapshotManager.IsPerformedUnprotectedReadsCommitted()) { + // We need to wait for completion until the flag is committed + res.WaitCompletion = true; + } + SnapshotManager.PromoteUnprotectedReadEdge(version); // We want to promote the complete edge when protected reads are // used or when we're already writing something anyway. - if (res.HadWrites || !unprotectedReads) { + if (res.HadWrites) { res.HadWrites |= SnapshotManager.PromoteCompleteEdge(version, txc); if (!res.HadWrites && SnapshotManager.GetCommittedCompleteEdge() < version) { // We need to wait for completion because some other transaction @@ -3333,7 +3322,7 @@ bool TDataShard::CheckTxNeedWait(const TEvDataShard::TEvProposeTransaction::TPtr auto& rec = msg->Record; if (rec.HasMvccSnapshot()) { TRowVersion rowVersion(rec.GetMvccSnapshot().GetStep(), rec.GetMvccSnapshot().GetTxId()); - TRowVersion unreadableEdge = Pipeline.GetUnreadableEdge(GetEnablePrioritizedMvccSnapshotReads()); + TRowVersion unreadableEdge = Pipeline.GetUnreadableEdge(); if (rowVersion >= unreadableEdge) { LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "New transaction reads from " << rowVersion << " which is not before unreadable edge " << unreadableEdge); LWTRACK(ProposeTransactionWaitSnapshot, msg->Orbit, rowVersion.Step, rowVersion.TxId); diff --git a/ydb/core/tx/datashard/datashard__init.cpp b/ydb/core/tx/datashard/datashard__init.cpp index 065234b1f864..93fb4ce452a5 100644 --- a/ydb/core/tx/datashard/datashard__init.cpp +++ b/ydb/core/tx/datashard/datashard__init.cpp @@ -589,13 +589,11 @@ class TDataShard::TTxInitSchema : public TTransactionBase { Self->PersistSys(db, Schema::Sys_State, Self->State); - if (AppData(ctx)->FeatureFlags.GetEnableMvcc()) { - auto state = *AppData(ctx)->FeatureFlags.GetEnableMvcc() ? EMvccState::MvccEnabled : EMvccState::MvccDisabled; - Self->PersistSys(db, Schema::SysMvcc_State, (ui32)state); + auto state = EMvccState::MvccEnabled; + Self->PersistSys(db, Schema::SysMvcc_State, (ui32)state); - LOG_DEBUG(ctx, NKikimrServices::TX_DATASHARD, TStringBuilder() << "TxInitSchema.Execute" - << " MVCC state switched to" << (*AppData(ctx)->FeatureFlags.GetEnableMvcc() ? " enabled" : " disabled") << " state"); - } + LOG_DEBUG(ctx, NKikimrServices::TX_DATASHARD, TStringBuilder() << "TxInitSchema.Execute" + << " MVCC state switched to enabled state"); Self->MvccSwitchState = TSwitchState::DONE; } diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp index 8dca4a6ddc6b..80d3143ef83c 100644 --- a/ydb/core/tx/datashard/datashard__read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp @@ -2026,8 +2026,7 @@ class TDataShard::TTxReadViaPipeline : public NTabletFlatExecutor::TTransactionB snapshotUnavailable = true; } } else { - auto prioritizedMvccSnapshotReads = Self->GetEnablePrioritizedMvccSnapshotReads(); - TRowVersion unreadableEdge = Self->Pipeline.GetUnreadableEdge(prioritizedMvccSnapshotReads); + TRowVersion unreadableEdge = Self->Pipeline.GetUnreadableEdge(); if (state.ReadVersion >= unreadableEdge) { LWTRACK(ReadWaitSnapshot, request->Orbit, state.ReadVersion.Step, state.ReadVersion.TxId); Self->Pipeline.AddWaitingReadIterator(state.ReadVersion, std::move(Ev), ctx); diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index c9becaedfcee..2d5ff957122f 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -1646,16 +1646,6 @@ class TDataShard return TtlReadAheadHi; } - bool GetEnablePrioritizedMvccSnapshotReads() const { - ui64 value = EnablePrioritizedMvccSnapshotReads; - return value != 0; - } - - bool GetEnableUnprotectedMvccSnapshotReads() const { - ui64 value = EnableUnprotectedMvccSnapshotReads; - return value != 0; - } - bool GetEnableLockedWrites() const { ui64 value = EnableLockedWrites; return value != 0; @@ -2649,8 +2639,6 @@ class TDataShard TControlWrapper TtlReadAheadLo; TControlWrapper TtlReadAheadHi; - TControlWrapper EnablePrioritizedMvccSnapshotReads; - TControlWrapper EnableUnprotectedMvccSnapshotReads; TControlWrapper EnableLockedWrites; TControlWrapper MaxLockedWritesPerKey; diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp index 8d8eec8f9d09..20e69bf2c762 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.cpp +++ b/ydb/core/tx/datashard/datashard_pipeline.cpp @@ -1918,14 +1918,13 @@ bool TPipeline::AddWaitingTxOp(TEvDataShard::TEvProposeTransaction::TPtr& ev, co if (Self->MvccSwitchState == TSwitchState::SWITCHING) { WaitingDataTxOps.emplace(TRowVersion::Min(), IEventHandle::Upcast(std::move(ev))); // postpone tx processing till mvcc state switch is finished } else { - bool prioritizedReads = Self->GetEnablePrioritizedMvccSnapshotReads(); Y_DEBUG_ABORT_UNLESS(ev->Get()->Record.HasMvccSnapshot()); TRowVersion snapshot(ev->Get()->Record.GetMvccSnapshot().GetStep(), ev->Get()->Record.GetMvccSnapshot().GetTxId()); WaitingDataTxOps.emplace(snapshot, IEventHandle::Upcast(std::move(ev))); - const ui64 waitStep = prioritizedReads ? snapshot.Step : snapshot.Step + 1; + const ui64 waitStep = snapshot.Step; TRowVersion unreadableEdge; - if (!Self->WaitPlanStep(waitStep) && snapshot < (unreadableEdge = GetUnreadableEdge(prioritizedReads))) { - ActivateWaitingTxOps(unreadableEdge, prioritizedReads, ctx); // Async MediatorTimeCastEntry update, need to reschedule the op + if (!Self->WaitPlanStep(waitStep) && snapshot < (unreadableEdge = GetUnreadableEdge())) { + ActivateWaitingTxOps(unreadableEdge, ctx); // Async MediatorTimeCastEntry update, need to reschedule the op } } @@ -1941,7 +1940,7 @@ bool TPipeline::AddWaitingTxOp(NEvents::TDataEvents::TEvWrite::TPtr& ev) { return true; } -void TPipeline::ActivateWaitingTxOps(TRowVersion edge, bool prioritizedReads, const TActorContext& ctx) { +void TPipeline::ActivateWaitingTxOps(TRowVersion edge, const TActorContext& ctx) { LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " ActivateWaitingTxOps for version# " << edge << ", txOps: " << (WaitingDataTxOps.empty() ? "empty" : ToString(WaitingDataTxOps.begin()->first.Step)) << ", readIterators: " @@ -1976,8 +1975,8 @@ void TPipeline::ActivateWaitingTxOps(TRowVersion edge, bool prioritizedReads, co } if (minWait == TRowVersion::Max() || - Self->WaitPlanStep(prioritizedReads ? minWait.Step : minWait.Step + 1) || - minWait >= (edge = GetUnreadableEdge(prioritizedReads))) + Self->WaitPlanStep(minWait.Step) || + minWait >= (edge = GetUnreadableEdge())) { break; } @@ -1995,8 +1994,7 @@ void TPipeline::ActivateWaitingTxOps(const TActorContext& ctx) { if (isEmpty || Self->MvccSwitchState == TSwitchState::SWITCHING) return; - bool prioritizedReads = Self->GetEnablePrioritizedMvccSnapshotReads(); - ActivateWaitingTxOps(GetUnreadableEdge(prioritizedReads), prioritizedReads, ctx); + ActivateWaitingTxOps(GetUnreadableEdge(), ctx); } void TPipeline::AddWaitingReadIterator( @@ -2016,12 +2014,11 @@ void TPipeline::AddWaitingReadIterator( auto readId = ev->Get()->Record.GetReadId(); WaitingDataReadIterators.emplace(version, std::move(ev)); - bool prioritizedReads = Self->GetEnablePrioritizedMvccSnapshotReads(); - const ui64 waitStep = prioritizedReads ? version.Step : version.Step + 1; + const ui64 waitStep = version.Step; TRowVersion unreadableEdge = TRowVersion::Min(); - if (!Self->WaitPlanStep(waitStep) && version < (unreadableEdge = GetUnreadableEdge(prioritizedReads))) { + if (!Self->WaitPlanStep(waitStep) && version < (unreadableEdge = GetUnreadableEdge())) { // Async MediatorTimeCastEntry update, need to reschedule transaction - ActivateWaitingTxOps(unreadableEdge, prioritizedReads, ctx); + ActivateWaitingTxOps(unreadableEdge, ctx); } LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " put read iterator# " << readId @@ -2075,7 +2072,7 @@ TRowVersion TPipeline::GetReadEdge() const { return TRowVersion(step, Max()); } -TRowVersion TPipeline::GetUnreadableEdge(bool prioritizeReads) const { +TRowVersion TPipeline::GetUnreadableEdge() const { const auto last = TRowVersion( GetLastActivePlannedOpStep(), GetLastActivePlannedOpId()); @@ -2109,17 +2106,10 @@ TRowVersion TPipeline::GetUnreadableEdge(bool prioritizeReads) const { // distributed transactions up to the end of that step. const TRowVersion mediatorEdge(mediatorStep, ::Max()); - if (prioritizeReads) { - // We are prioritizing reads, and we are ok with blocking immediate writes - // in the current step. So the first unreadable version is actually in - // the next step. - return mediatorEdge.Next(); - } else { - // We cannot block immediate writes up to this edge, thus we actually - // need to wait until the edge progresses above this version. This - // would happen when mediator timecast moves to the next step. - return mediatorEdge; - } + // We are prioritizing reads, and we are ok with blocking immediate writes + // in the current step. So the first unreadable version is actually in + // the next step. + return mediatorEdge.Next(); } void TPipeline::AddCompletingOp(const TOperation::TPtr& op) { diff --git a/ydb/core/tx/datashard/datashard_pipeline.h b/ydb/core/tx/datashard/datashard_pipeline.h index 86095eb04496..ed06f2c0362c 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.h +++ b/ydb/core/tx/datashard/datashard_pipeline.h @@ -352,7 +352,7 @@ class TPipeline : TNonCopyable { bool CheckInflightLimit() const; bool AddWaitingTxOp(TEvDataShard::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx); bool AddWaitingTxOp(NEvents::TDataEvents::TEvWrite::TPtr& ev); - void ActivateWaitingTxOps(TRowVersion edge, bool prioritizedReads, const TActorContext& ctx); + void ActivateWaitingTxOps(TRowVersion edge, const TActorContext& ctx); void ActivateWaitingTxOps(const TActorContext& ctx); ui64 WaitingReadIterators() const { return WaitingDataReadIterators.size(); } @@ -366,7 +366,7 @@ class TPipeline : TNonCopyable { bool HandleWaitingReadIterator(const TReadIteratorId& readId, TEvDataShard::TEvRead* event); TRowVersion GetReadEdge() const; - TRowVersion GetUnreadableEdge(bool prioritizedReads) const; + TRowVersion GetUnreadableEdge() const; void AddCompletingOp(const TOperation::TPtr& op); void RemoveCompletingOp(const TOperation::TPtr& op); diff --git a/ydb/core/tx/datashard/datashard_switch_mvcc_state.cpp b/ydb/core/tx/datashard/datashard_switch_mvcc_state.cpp index 289250bf52a8..9bd42ebe7fad 100644 --- a/ydb/core/tx/datashard/datashard_switch_mvcc_state.cpp +++ b/ydb/core/tx/datashard/datashard_switch_mvcc_state.cpp @@ -15,8 +15,8 @@ void TDataShard::CheckMvccStateChangeCanStart(const TActorContext& ctx) { case TShardState::Ready: case TShardState::Frozen: { - const auto enable = AppData(ctx)->FeatureFlags.GetEnableMvcc(); - if (enable && *enable != IsMvccEnabled()) { + if (!IsMvccEnabled()) { + // Force enable mvcc for potential old shards MvccSwitchState = TSwitchState::SWITCHING; } else { MvccSwitchState = TSwitchState::DONE; @@ -80,11 +80,10 @@ bool TDataShard::TTxExecuteMvccStateChange::Execute(TTransactionContext& txc, co Y_ABORT_UNLESS(Self->TxInFly() == 0 && Self->ImmediateInFly() == 0); auto [step, txId] = Self->LastCompleteTxVersion(); - Self->SnapshotManager.ChangeMvccState(step, txId, txc, - *AppData(ctx)->FeatureFlags.GetEnableMvcc() ? EMvccState::MvccEnabled : EMvccState::MvccDisabled); + Self->SnapshotManager.ChangeMvccState(step, txId, txc, EMvccState::MvccEnabled); LOG_DEBUG(ctx, NKikimrServices::TX_DATASHARD, TStringBuilder() << "TTxExecuteMvccStateChange.Execute" - << " MVCC state switched to" << (*AppData(ctx)->FeatureFlags.GetEnableMvcc() ? " enabled" : " disabled") << " state"); + << " MVCC state switched to enabled state"); ActivateWaitingOps = true; } diff --git a/ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp b/ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp index cb13af2aac62..3ef5e85fba60 100644 --- a/ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp +++ b/ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp @@ -34,14 +34,11 @@ bool HasIssue(const TIssues& issues, ui32 code, TStringBuf message, std::functio class TLocalFixture { public: - TLocalFixture(bool disableSnaphots = false) { + TLocalFixture() { TPortManager pm; NKikimrConfig::TAppConfig app; app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false); TServerSettings serverSettings(pm.GetPort(2134)); - if (disableSnaphots) { - serverSettings.SetEnableMvccSnapshotReads(false); - } serverSettings.SetDomainName("Root") .SetNodeCount(2) .SetUseRealThreads(false) @@ -229,7 +226,7 @@ Y_UNIT_TEST(ProposeError) { } Y_UNIT_TEST(ProposeRequestUndelivered) { - TLocalFixture fixture(true); + TLocalFixture fixture; auto mitm = [&](TAutoPtr &ev) { if (ev->GetTypeRewrite() == TEvPipeCache::TEvForward::EventType) { auto forwardEvent = ev.Get()->Get(); @@ -301,24 +298,6 @@ void TestProposeResultLost(TTestActorRuntime& runtime, TActorId client, const TS fn(record); } -Y_UNIT_TEST(ProposeResultLost_RoTx) { - TLocalFixture fixture(true); - TestProposeResultLost(*fixture.Runtime, fixture.Client, - Q_("select * from `/Root/table-1`"), - [](const NKikimrKqp::TEvQueryResponse& record) { - UNIT_ASSERT_VALUES_EQUAL_C(record.GetYdbStatus(), Ydb::StatusIds::UNAVAILABLE, record.DebugString()); - - TIssues issues; - IssuesFromMessage(record.GetResponse().GetQueryIssues(), issues); - UNIT_ASSERT_C(HasIssue(issues, NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE, - "Kikimr cluster or one of its subsystems was unavailable."), record.GetResponse().DebugString()); - - UNIT_ASSERT_C(HasIssue(issues, NKikimrIssues::TIssuesIds::TX_STATE_UNKNOWN, "", [] (const TIssue& issue) { - return issue.GetMessage().StartsWith("Tx state unknown for shard "); - }), record.GetResponse().DebugString()); - }); -} - Y_UNIT_TEST(ProposeResultLost_RwTx) { TLocalFixture fixture; TestProposeResultLost(*fixture.Runtime, fixture.Client, diff --git a/ydb/core/tx/datashard/datashard_ut_minstep.cpp b/ydb/core/tx/datashard/datashard_ut_minstep.cpp index c97617bb4e7e..ac7c6d280415 100644 --- a/ydb/core/tx/datashard/datashard_ut_minstep.cpp +++ b/ydb/core/tx/datashard/datashard_ut_minstep.cpp @@ -66,14 +66,11 @@ TAutoPtr EjectDataPropose(TServer::TPtr server, ui64 dataShard) } Y_UNIT_TEST_SUITE(TDataShardMinStepTest) { - void TestDropTablePlanComesNotTooEarly(const TString& query, Ydb::StatusIds::StatusCode expectedStatus, bool disableSnaphots = true) { + void TestDropTablePlanComesNotTooEarly(const TString& query, Ydb::StatusIds::StatusCode expectedStatus) { TPortManager pm; NKikimrConfig::TAppConfig app; app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false); TServerSettings serverSettings(pm.GetPort(2134)); - if (disableSnaphots) { - serverSettings.SetEnableMvccSnapshotReads(false); - } serverSettings.SetDomainName("Root") .SetUseRealThreads(false) .SetAppConfig(app); @@ -140,13 +137,6 @@ Y_UNIT_TEST_SUITE(TDataShardMinStepTest) { WaitTabletBecomesOffline(server, shard2); } - Y_UNIT_TEST(TestDropTablePlanComesNotTooEarlyRO) { - TestDropTablePlanComesNotTooEarly( - "SELECT * FROM `/Root/table-1`; SELECT * FROM `/Root/table-2`;", - Ydb::StatusIds::UNAVAILABLE - ); - } - Y_UNIT_TEST(TestDropTablePlanComesNotTooEarlyRW) { TestDropTablePlanComesNotTooEarly( "UPSERT INTO `/Root/table-2` (key, value) SELECT key, value FROM `/Root/table-1`;", @@ -383,14 +373,11 @@ Y_UNIT_TEST_SUITE(TDataShardMinStepTest) { TestAlterProposeRebootMinStep(ERebootOnPropose::SchemeShard); } - void TestDropTableCompletesQuickly(const TString& query, Ydb::StatusIds::StatusCode expectedStatus, bool disableSnaphots = false) { + void TestDropTableCompletesQuickly(const TString& query, Ydb::StatusIds::StatusCode expectedStatus) { TPortManager pm; NKikimrConfig::TAppConfig app; app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false); TServerSettings serverSettings(pm.GetPort(2134)); - if (disableSnaphots) { - serverSettings.SetEnableMvccSnapshotReads(false); - } serverSettings.SetDomainName("Root") .SetUseRealThreads(false) .SetAppConfig(app); @@ -483,14 +470,6 @@ Y_UNIT_TEST_SUITE(TDataShardMinStepTest) { WaitTabletBecomesOffline(server, shard2); } - Y_UNIT_TEST(TestDropTableCompletesQuicklyRO) { - TestDropTableCompletesQuickly( - "SELECT * FROM `/Root/table-1` UNION ALL SELECT * FROM `/Root/table-2`;", - Ydb::StatusIds::SUCCESS, - true - ); - } - Y_UNIT_TEST(TestDropTableCompletesQuicklyRW) { TestDropTableCompletesQuickly( "UPSERT INTO `/Root/table-2` (key, value) SELECT key, value FROM `/Root/table-1`;", diff --git a/ydb/core/tx/datashard/datashard_ut_order.cpp b/ydb/core/tx/datashard/datashard_ut_order.cpp index 334084d1a4ab..194f826743e6 100644 --- a/ydb/core/tx/datashard/datashard_ut_order.cpp +++ b/ydb/core/tx/datashard/datashard_ut_order.cpp @@ -1313,95 +1313,6 @@ Y_UNIT_TEST(TestDelayedTxWaitsForWriteActiveTxOnly) { } } -Y_UNIT_TEST(TestOnlyDataTxLagCausesRejects) { - TPortManager pm; - NKikimrConfig::TAppConfig app; - app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false); - TServerSettings serverSettings(pm.GetPort(2134)); - serverSettings.SetEnableMvccSnapshotReads(false); - serverSettings.SetDomainName("Root") - .SetUseRealThreads(false) - .SetAppConfig(app) - // Note: this test currently relies on distributed non-mvcc reads, - // which are deprecated. This test should probably be removed. - .SetEnableDataShardVolatileTransactions(false); - - Tests::TServer::TPtr server = new TServer(serverSettings); - auto &runtime = *server->GetRuntime(); - auto sender = runtime.AllocateEdgeActor(); - TAutoPtr handle; - - runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); - runtime.SetLogPriority(NKikimrServices::KQP_EXECUTER, NLog::PRI_DEBUG); - runtime.SetLogPriority(NKikimrServices::KQP_SESSION, NLog::PRI_DEBUG); - runtime.SetLogPriority(NKikimrServices::MINIKQL_ENGINE, NActors::NLog::PRI_DEBUG); - - const bool usesVolatileTxs = runtime.GetAppData(0).FeatureFlags.GetEnableDataShardVolatileTransactions(); - - InitRoot(server, sender); - - CreateShardedTable(server, sender, "/Root", "table-1", 2); - //auto shards = GetTableShards(server, sender, "/Root/table-1"); - - ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 3000000001), (3000000003, 3)")); - - // Send ReadTable requests and wait until they hang waiting for quota. - for (int i = 0; i < 2; ++i) { - auto *req = new TEvTxUserProxy::TEvProposeTransaction; - req->Record.SetStreamResponse(true); - auto &tx = *req->Record.MutableTransaction()->MutableReadTableTransaction(); - tx.SetPath("/Root/table-1"); - runtime.Send(new IEventHandle(MakeTxProxyID(), sender, req)); - runtime.GrabEdgeEventRethrow(handle); - } - - // Now move time forward and check we can still execute data txs. - runtime.UpdateCurrentTime(runtime.GetCurrentTime() + TDuration::Minutes(10)); - // Wait for mediator timecast. - { - TDispatchOptions options; - options.FinalEvents.emplace_back(TEvMediatorTimecast::EvUpdate, 1); - runtime.DispatchEvents(options); - } - - ExecSQL(server, sender, Q_("SELECT COUNT(*) FROM `/Root/table-1`")); - - // Send SQL request which should hang due to lost RS. - TVector> readSets; - auto captureRS = [&](TAutoPtr &event) -> auto { - if (event->GetTypeRewrite() == TEvTxProcessing::EvReadSet) { - readSets.push_back(std::move(event)); - return TTestActorRuntime::EEventAction::DROP; - } - return TTestActorRuntime::EEventAction::PROCESS; - }; - runtime.SetObserverFunc(captureRS); - - SendSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) SELECT value, key FROM `/Root/table-1`")); - - const size_t expectedReadSets = usesVolatileTxs ? 4 : 2; - { - TDispatchOptions options; - options.FinalEvents.emplace_back( - [&](IEventHandle &) -> bool { - return readSets.size() >= expectedReadSets; - }); - runtime.DispatchEvents(options); - UNIT_ASSERT_VALUES_EQUAL(readSets.size(), expectedReadSets); - } - - // Now move time forward and check we can still execute data txs. - runtime.UpdateCurrentTime(runtime.GetCurrentTime() + TDuration::Minutes(10)); - // Wait for mediator timecast. - { - TDispatchOptions options; - options.FinalEvents.emplace_back(TEvMediatorTimecast::EvUpdate, 1); - runtime.DispatchEvents(options); - } - - ExecSQL(server, sender, Q_("SELECT COUNT(*) FROM `/Root/table-1`"), true, Ydb::StatusIds::OVERLOADED); -} - } Y_UNIT_TEST_SUITE(DataShardOutOfOrder) { @@ -1535,143 +1446,6 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderLockLost, StreamLookup) { } } -Y_UNIT_TEST(TestMvccReadDoesntBlockWrites) { - TPortManager pm; - NKikimrConfig::TAppConfig app; - app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false); - app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false); - TServerSettings serverSettings(pm.GetPort(2134)); - serverSettings.SetEnableMvccSnapshotReads(false); - serverSettings.SetDomainName("Root") - .SetAppConfig(app) - .SetUseRealThreads(false); - - Tests::TServer::TPtr server = new TServer(serverSettings); - auto &runtime = *server->GetRuntime(); - - auto sender = runtime.AllocateEdgeActor(); - - runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); - runtime.SetLogPriority(NKikimrServices::KQP_EXECUTER, NLog::PRI_DEBUG); - - const bool usesVolatileTxs = runtime.GetAppData(0).FeatureFlags.GetEnableDataShardVolatileTransactions(); - - InitRoot(server, sender); - - CreateShardedTable(server, sender, "/Root", "table-1", 1); - CreateShardedTable(server, sender, "/Root", "table-2", 1); - - ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);")); - ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 2);")); - - TString sessionId = CreateSessionRPC(runtime); - TString simpleSession = CreateSessionRPC(runtime); - - TString txId; - { - auto result = KqpSimpleBegin(runtime, sessionId, txId, Q_(R"( - SELECT * FROM `/Root/table-1` - UNION ALL - SELECT * FROM `/Root/table-2` - ORDER BY key)")); - UNIT_ASSERT_VALUES_EQUAL( - result, - "{ items { uint32_value: 1 } items { uint32_value: 1 } }, " - "{ items { uint32_value: 2 } items { uint32_value: 2 } }"); - } - - // Capture and block all readset messages - TVector> readSets; - auto captureRS = [&](TAutoPtr &event) -> auto { - if (event->GetTypeRewrite() == TEvTxProcessing::EvReadSet) { - readSets.push_back(std::move(event)); - return TTestActorRuntime::EEventAction::DROP; - } - return TTestActorRuntime::EEventAction::PROCESS; - }; - auto prevObserverFunc = runtime.SetObserverFunc(captureRS); - - // Send a commit request, it would block on readset exchange - auto f2 = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"( - UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 2); - UPSERT INTO `/Root/table-2` (key, value) VALUES (4, 2);)"), sessionId, txId, true)); - - // Wait until we captured both readsets - const size_t expectedReadSets = usesVolatileTxs ? 4 : 2; - { - TDispatchOptions options; - options.FinalEvents.emplace_back( - [&](IEventHandle &) -> bool { - return readSets.size() >= expectedReadSets; - }); - runtime.DispatchEvents(options); - UNIT_ASSERT_VALUES_EQUAL(readSets.size(), expectedReadSets); - } - runtime.SetObserverFunc(prevObserverFunc); - - // it will be blocked by previous transaction that is waiting for its readsets - auto simpleF = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"( - $rows = ( - SELECT * FROM `/Root/table-1` WHERE key = 3 OR key = 5 - UNION ALL - SELECT * FROM `/Root/table-2` WHERE key = 4 OR key = 6 - ); - SELECT key, value FROM $rows ORDER BY key)"), simpleSession, "", true)); - - // wait for the tx is planned - TDispatchOptions opts; - opts.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTxProcessing::EEv::EvPlanStep, 2)); - runtime.DispatchEvents(opts); - - { - // despite it's writing into the key that previous transaction reads this write should finish successfully - auto result = KqpSimpleExec(runtime, Q_(R"( - UPSERT INTO `/Root/table-1` (key, value) VALUES (5, 10); - UPSERT INTO `/Root/table-2` (key, value) VALUES (6, 10))")); - UNIT_ASSERT_VALUES_EQUAL(result, ""); - } - - // resend readsets, it will unblock both commit tx and read - for (auto& ev : readSets) { - runtime.Send(ev.Release(), 0, /* via actor system */ true); - } - readSets.clear(); - - // Read the commit reply next, it must succeed - { - auto response = AwaitResponse(runtime, f2); - UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); - } - - { - // Read should finish successfully and it doesn't see the write - auto response = AwaitResponse(runtime, simpleF); - UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); - Ydb::Table::ExecuteQueryResult result; - response.operation().result().UnpackTo(&result); - UNIT_ASSERT_VALUES_EQUAL( - FormatResult(result), - "{ items { uint32_value: 3 } items { uint32_value: 2 } }, " - "{ items { uint32_value: 4 } items { uint32_value: 2 } }"); - } - - { - // Now we see the write - auto result = KqpSimpleExec(runtime, Q_(R"( - $rows = ( - SELECT * FROM `/Root/table-1` WHERE key = 3 OR key = 5 - UNION ALL - SELECT * FROM `/Root/table-2` WHERE key = 4 OR key = 6 - ); - SELECT key, value FROM $rows ORDER BY key)")); - TString expected = "{ items { uint32_value: 3 } items { uint32_value: 2 } }, " - "{ items { uint32_value: 4 } items { uint32_value: 2 } }, " - "{ items { uint32_value: 5 } items { uint32_value: 10 } }, " - "{ items { uint32_value: 6 } items { uint32_value: 10 } }"; - UNIT_ASSERT_VALUES_EQUAL(result, expected); - } -} - Y_UNIT_TEST_TWIN(TestOutOfOrderReadOnlyAllowed, StreamLookup) { TPortManager pm; NKikimrConfig::TAppConfig app; @@ -1893,7 +1667,6 @@ Y_UNIT_TEST(MvccTestOutOfOrderRestartLocksSingleWithoutBarrier) { NKikimrConfig::TAppConfig app; app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false); TServerSettings serverSettings(pm.GetPort(2134)); - serverSettings.SetEnableMvccSnapshotReads(false); serverSettings.SetDomainName("Root") .SetAppConfig(app) .SetUseRealThreads(false); @@ -2346,411 +2119,6 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderNoBarrierRestartImmediateLongTail, StreamLookup) } } -Y_UNIT_TEST(TestCopyTableNoDeadlock) { - TPortManager pm; - NKikimrConfig::TAppConfig app; - app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false); - TServerSettings serverSettings(pm.GetPort(2134)); - serverSettings.SetEnableMvccSnapshotReads(false); - serverSettings.SetDomainName("Root") - .SetUseRealThreads(false) - .SetAppConfig(app); - - Tests::TServer::TPtr server = new TServer(serverSettings); - auto &runtime = *server->GetRuntime(); - auto sender = runtime.AllocateEdgeActor(); - - runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); - runtime.SetLogPriority(NKikimrServices::KQP_EXECUTER, NLog::PRI_DEBUG); - - const bool usesVolatileTxs = runtime.GetAppData(0).FeatureFlags.GetEnableDataShardVolatileTransactions(); - - InitRoot(server, sender); - - CreateShardedTable(server, sender, "/Root", "table-1", 1); - CreateShardedTable(server, sender, "/Root", "table-2", 1); - - ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);")); - ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 1);")); - - TString sessionId = CreateSessionRPC(runtime); - TString sessionRead = CreateSessionRPC(runtime); - - TString txId; - { - auto result = KqpSimpleBegin(runtime, sessionId, txId, Q_(R"( - SELECT * FROM `/Root/table-1` - UNION ALL - SELECT * FROM `/Root/table-2` - ORDER BY key)")); - UNIT_ASSERT_VALUES_EQUAL( - result, - "{ items { uint32_value: 1 } items { uint32_value: 1 } }, " - "{ items { uint32_value: 2 } items { uint32_value: 1 } }"); - } - - // Capture and block all readset messages - TVector> readSets; - TVector> txProposes; - size_t seenPlanSteps = 0; - bool captureReadSets = true; - bool captureTxProposes = false; - auto captureRS = [&](TAutoPtr &event) -> auto { - switch (event->GetTypeRewrite()) { - case TEvTxProcessing::EvPlanStep: - Cerr << "---- observed EvPlanStep ----" << Endl; - ++seenPlanSteps; - break; - case TEvTxProcessing::EvReadSet: - Cerr << "---- observed EvReadSet ----" << Endl; - if (captureReadSets) { - readSets.push_back(std::move(event)); - return TTestActorRuntime::EEventAction::DROP; - } - break; - case TEvTxProxy::EvProposeTransaction: - Cerr << "---- observed EvProposeTransaction ----" << Endl; - if (captureTxProposes) { - txProposes.push_back(std::move(event)); - return TTestActorRuntime::EEventAction::DROP; - } - break; - default: - break; - } - return TTestActorRuntime::EEventAction::PROCESS; - }; - auto prevObserverFunc = runtime.SetObserverFunc(captureRS); - - // Send a commit request, it would block on readset exchange - auto fCommit = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"( - UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 2); - UPSERT INTO `/Root/table-2` (key, value) VALUES (4, 2))"), sessionId, txId, true)); - - // Wait until we captured both readsets - const size_t expectedReadSets = usesVolatileTxs ? 4 : 2; - if (readSets.size() < expectedReadSets) { - TDispatchOptions options; - options.FinalEvents.emplace_back( - [&](IEventHandle &) -> bool { - return readSets.size() >= expectedReadSets; - }); - runtime.DispatchEvents(options); - } - UNIT_ASSERT_VALUES_EQUAL(readSets.size(), expectedReadSets); - - captureTxProposes = true; - - // Now we send a distributed read, while stopping coordinator proposals - auto fRead = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"( - SELECT * FROM `/Root/table-1` - UNION ALL - SELECT * FROM `/Root/table-2`)"), sessionRead, "", true)); - - // Wait until we capture the propose request - if (txProposes.size() < 1) { - TDispatchOptions options; - options.FinalEvents.emplace_back( - [&](IEventHandle &) -> bool { - return txProposes.size() >= 1; - }); - runtime.DispatchEvents(options); - } - UNIT_ASSERT_VALUES_EQUAL(txProposes.size(), 1u); - - Cerr << "---- captured propose for data tx ----" << Endl; - - // Now we send a copy table request, again blocking coordinator proposal - auto senderCopy = runtime.AllocateEdgeActor(); - auto txIdCopy = AsyncCreateCopyTable(server, senderCopy, "/Root", "table-3", "/Root/table-2"); - - // Wait until we capture the propose request - if (txProposes.size() < 2) { - TDispatchOptions options; - options.FinalEvents.emplace_back( - [&](IEventHandle &) -> bool { - return txProposes.size() >= 2; - }); - runtime.DispatchEvents(options); - } - UNIT_ASSERT_VALUES_EQUAL(txProposes.size(), 2u); - - Cerr << "---- captured propose for copy tx ----" << Endl; - - // Stop capturing stuff - captureReadSets = false; - captureTxProposes = false; - - // Release copy tx propose and wait for plan steps (table-3, table-2 and schemeshard) - // It is important for copy tx to be planned *before* the read tx - seenPlanSteps = 0; - runtime.Send(txProposes[1].Release(), 0, /* via actor system */ true); - if (seenPlanSteps < 3) { - TDispatchOptions options; - options.FinalEvents.emplace_back( - [&](IEventHandle &) -> bool { - return seenPlanSteps >= 3; - }); - runtime.DispatchEvents(options); - } - UNIT_ASSERT_VALUES_EQUAL(seenPlanSteps, 3u); - - // Release read tx propose and wait for plan steps (table-1 and table-2) - // Now read tx will be planned *after* the copy tx - seenPlanSteps = 0; - runtime.Send(txProposes[0].Release(), 0, /* via actor system */ true); - if (seenPlanSteps < 2) { - TDispatchOptions options; - options.FinalEvents.emplace_back( - [&](IEventHandle &) -> bool { - return seenPlanSteps >= 2; - }); - runtime.DispatchEvents(options); - } - UNIT_ASSERT_VALUES_EQUAL(seenPlanSteps, 2u); - - // Sleep a little so that everything stops in a settled state - // Bug KIKIMR-7711 would cause copy tx and read tx to depend on each other - SimulateSleep(server, TDuration::Seconds(1)); - - // Release readsets, allowing the first commit to finish - for (auto& ev : readSets) { - runtime.Send(ev.Release(), 0, /* via actor system */ true); - } - - // Wait for commit to complete, it must succeed - { - auto response = AwaitResponse(runtime, fCommit); - UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); - } - - // Wait for copy table tx to complete - WaitTxNotification(server, senderCopy, txIdCopy); - - // Wait for distributed read to complete, it must succeed - { - auto response = AwaitResponse(runtime, fRead); - UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); - } - - // Verify copied table has data committed - UNIT_ASSERT_VALUES_EQUAL( - KqpSimpleExec(runtime, Q_(R"( - SELECT * FROM `/Root/table-3` - ORDER BY key)")), - "{ items { uint32_value: 2 } items { uint32_value: 1 } }, " - "{ items { uint32_value: 4 } items { uint32_value: 2 } }"); -} - -void VerifyNoActiveTransactions() { - using namespace NActors::NMemory::NPrivate; - const auto& instance = TMemoryTracker::Instance(); - instance->Initialize(); - std::vector metrics; - TMemoryTracker::Instance()->GatherMetrics(metrics); - - for (size_t i : xrange(metrics.size())) { - if (instance->GetName(i) == TString(MemoryLabelActiveTransactionBody)) { - UNIT_ASSERT_VALUES_EQUAL(metrics[i].GetCount(), 0); - return; - } - } - - UNIT_ASSERT_C(false, "Datashard/TActiveTransaction/TxBody metric not found"); -} - -Y_UNIT_TEST(TestPlannedCancelSplit) { - TPortManager pm; - NKikimrConfig::TAppConfig app; - app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false); - TServerSettings serverSettings(pm.GetPort(2134)); - serverSettings.SetEnableMvccSnapshotReads(false); - serverSettings.SetDomainName("Root") - .SetUseRealThreads(false) - .SetAppConfig(app); - - Tests::TServer::TPtr server = new TServer(serverSettings); - auto &runtime = *server->GetRuntime(); - auto sender = runtime.AllocateEdgeActor(); - - runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); - runtime.SetLogPriority(NKikimrServices::KQP_EXECUTER, NLog::PRI_DEBUG); - - InitRoot(server, sender); - - CreateShardedTable(server, sender, "/Root", "table-1", 1); - CreateShardedTable(server, sender, "/Root", "table-2", 1); - - ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);")); - ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 1);")); - - TString senderRead2 = CreateSessionRPC(runtime); - TString senderRead1 = CreateSessionRPC(runtime); - - auto shards1 = GetTableShards(server, sender, "/Root/table-1"); - UNIT_ASSERT_VALUES_EQUAL(shards1.size(), 1u); - auto shards2 = GetTableShards(server, sender, "/Root/table-2"); - UNIT_ASSERT_VALUES_EQUAL(shards2.size(), 1u); - TVector tablets; - tablets.push_back(shards1[0]); - tablets.push_back(shards2[0]); - - // Capture and block some messages - bool captureTxCancel = false; - bool captureTxPropose = false; - bool captureTxProposeResult = false; - TVector> txCancels; - TVector> txProposes; - TVector> txProposeResults; - auto captureMessages = [&](TAutoPtr &event) -> auto { - switch (event->GetTypeRewrite()) { - case TEvDataShard::EvProposeTransaction: - Cerr << "---- observed EvProposeTransaction ----" << Endl; - if (captureTxPropose) { - txProposes.push_back(std::move(event)); - return TTestActorRuntime::EEventAction::DROP; - } - break; - case TEvDataShard::EvProposeTransactionResult: - Cerr << "---- observed EvProposeTransactionResult ----" << Endl; - if (captureTxProposeResult) { - txProposeResults.push_back(std::move(event)); - return TTestActorRuntime::EEventAction::DROP; - } - break; - case TEvDataShard::EvCancelTransactionProposal: - Cerr << "---- observed EvCancelTransactionProposal ----" << Endl; - if (captureTxCancel) { - txCancels.push_back(std::move(event)); - return TTestActorRuntime::EEventAction::DROP; - } - break; - default: - break; - } - return TTestActorRuntime::EEventAction::PROCESS; - }; - auto prevObserverFunc = runtime.SetObserverFunc(captureMessages); - - // Send a distributed read while capturing propose results - captureTxProposeResult = true; - auto fRead1 = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"( - SELECT * FROM `/Root/table-1` - UNION ALL - SELECT * FROM `/Root/table-2`)"), senderRead1, "", true)); - if (txProposeResults.size() < 2) { - TDispatchOptions options; - options.FinalEvents.emplace_back( - [&](IEventHandle &) -> bool { - return txProposeResults.size() >= 2; - }); - runtime.DispatchEvents(options); - } - UNIT_ASSERT_VALUES_EQUAL(txProposeResults.size(), 2u); - captureTxProposeResult = false; - - // Remember which senders claim to be which shards - TVector actors(2); - for (auto& event : txProposeResults) { - TActorId actor = event->Sender; - const auto* msg = event->Get(); - ui64 shard = msg->Record.GetOrigin(); - for (size_t i = 0; i < tablets.size(); ++i) { - if (tablets[i] == shard) { - actors[i] = actor; - } - } - runtime.Send(event.Release(), 0, /* via actor system */ true); - } - txProposeResults.clear(); - - // Wait for the first query result, it must succeed - { - auto response = AwaitResponse(runtime, fRead1); - UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); - } - - // Send a distributed read again, while blocking propose messages - captureTxPropose = true; - auto fRead2 = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"( - SELECT * FROM `/Root/table-1` - UNION ALL - SELECT * FROM `/Root/table-2`)"), senderRead2, "", true)); - if (txProposes.size() < 2) { - TDispatchOptions options; - options.FinalEvents.emplace_back( - [&](IEventHandle &) -> bool { - return txProposes.size() >= 2; - }); - runtime.DispatchEvents(options); - } - UNIT_ASSERT_VALUES_EQUAL(txProposes.size(), 2u); - captureTxPropose = false; - - // Simulate propose overloaded at the second table - captureTxCancel = true; - for (auto& event : txProposes) { - if (event && event->GetRecipientRewrite() == actors[1]) { - Cerr << "---- found propose for table-2 ----" << Endl; - const auto* msg = event->Get(); - TActorId target = event->Sender; - auto* result = new TEvDataShard::TEvProposeTransactionResult( - msg->GetTxKind(), - tablets[1], - msg->GetTxId(), - NKikimrTxDataShard::TEvProposeTransactionResult::OVERLOADED); - Cerr << "Sending error result from " << actors[1] << " to " << target << Endl; - runtime.Send(new IEventHandle(target, actors[1], result), 0, /* via actor system */ true); - event.Reset(); // drop this propose event - } - } - if (txCancels.size() < 1) { - TDispatchOptions options; - options.FinalEvents.emplace_back( - [&](IEventHandle &) -> bool { - return txCancels.size() >= 1; - }); - runtime.DispatchEvents(options); - } - UNIT_ASSERT_VALUES_EQUAL(txCancels.size(), 1u); - captureTxCancel = false; - - // Now send propose and cancel messages in quick succession - for (auto& event : txProposes) { - if (event) { - runtime.Send(event.Release(), 0, /* via actor system */ true); - } - } - for (auto& event : txCancels) { - runtime.Send(event.Release(), 0, /* via actor system */ true); - } - - // Wait for query to return an error - { - auto response = AwaitResponse(runtime, fRead2); - UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::OVERLOADED); - } - - // Sleep a little so in case of a bug transaction is left in WaitForPlan state - SimulateSleep(server, TDuration::Seconds(1)); - - // Split would fail otherwise :( - SetSplitMergePartCountLimit(server->GetRuntime(), -1); - - // Start split for table-1 - TInstant splitStart = TInstant::Now(); - auto senderSplit = runtime.AllocateEdgeActor(); - ui64 txId = AsyncSplitTable(server, senderSplit, "/Root/table-1", tablets[0], 100); - WaitTxNotification(server, senderSplit, txId); - - // Split shouldn't take too much time to complete - TDuration elapsed = TInstant::Now() - splitStart; - UNIT_ASSERT_C(elapsed < TDuration::Seconds(NValgrind::PlainOrUnderValgrind(2, 10)), - "Split needed " << elapsed.ToString() << " to complete, which is too long"); - - VerifyNoActiveTransactions(); -} - Y_UNIT_TEST(TestPlannedTimeoutSplit) { TPortManager pm; TServerSettings serverSettings(pm.GetPort(2134)); @@ -4447,11 +3815,9 @@ Y_UNIT_TEST(TestSnapshotReadAfterStuckRW) { } } -Y_UNIT_TEST_TWIN(TestSnapshotReadPriority, UnprotectedReads) { +Y_UNIT_TEST(TestSnapshotReadPriority) { TPortManager pm; TServerSettings::TControls controls; - controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); - controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(UnprotectedReads ? 1 : 0); TServerSettings serverSettings(pm.GetPort(2134)); serverSettings.SetDomainName("Root") .SetControls(controls) @@ -4722,8 +4088,6 @@ Y_UNIT_TEST_TWIN(TestSnapshotReadPriority, UnprotectedReads) { Y_UNIT_TEST(TestUnprotectedReadsThenWriteVisibility) { TPortManager pm; TServerSettings::TControls controls; - controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); - controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); TServerSettings serverSettings(pm.GetPort(2134)); serverSettings.SetDomainName("Root") .SetNodeCount(2) @@ -5134,8 +4498,6 @@ Y_UNIT_TEST(UncommittedReadSetAck) { Y_UNIT_TEST(UncommittedReads) { TPortManager pm; TServerSettings::TControls controls; - controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); - controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); TServerSettings serverSettings(pm.GetPort(2134)); serverSettings.SetDomainName("Root") diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp index 19ba5a426868..f34a7e09942b 100644 --- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp +++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp @@ -722,8 +722,6 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { Y_UNIT_TEST(MvccSnapshotTailCleanup) { TPortManager pm; TServerSettings::TControls controls; - controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); - controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); TServerSettings serverSettings(pm.GetPort(2134)); serverSettings.SetDomainName("Root") .SetUseRealThreads(false) @@ -806,8 +804,6 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { Y_UNIT_TEST(MvccSnapshotAndSplit) { TPortManager pm; TServerSettings::TControls controls; - controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); - controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); TServerSettings serverSettings(pm.GetPort(2134)); serverSettings.SetDomainName("Root") .SetUseRealThreads(false) @@ -971,8 +967,6 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { Y_UNIT_TEST(MvccSnapshotReadWithLongPlanQueue) { TPortManager pm; TServerSettings::TControls controls; - controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); - controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); TServerSettings serverSettings(pm.GetPort(2134)); serverSettings.SetDomainName("Root") @@ -1349,8 +1343,6 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { Y_UNIT_TEST(MvccSnapshotLockedWrites) { TPortManager pm; TServerSettings::TControls controls; - controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); - controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); controls.MutableDataShardControls()->SetEnableLockedWrites(1); TServerSettings serverSettings(pm.GetPort(2134)); @@ -1443,8 +1435,6 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { Y_UNIT_TEST(MvccSnapshotLockedWritesRestart) { TPortManager pm; TServerSettings::TControls controls; - controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); - controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); controls.MutableDataShardControls()->SetEnableLockedWrites(1); TServerSettings serverSettings(pm.GetPort(2134)); @@ -1544,8 +1534,6 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { Y_UNIT_TEST(MvccSnapshotLockedWritesWithoutConflicts) { TPortManager pm; TServerSettings::TControls controls; - controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); - controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); controls.MutableDataShardControls()->SetEnableLockedWrites(1); TServerSettings serverSettings(pm.GetPort(2134)); @@ -1673,8 +1661,6 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { Y_UNIT_TEST(MvccSnapshotLockedWritesWithConflicts) { TPortManager pm; TServerSettings::TControls controls; - controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); - controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); controls.MutableDataShardControls()->SetEnableLockedWrites(1); TServerSettings serverSettings(pm.GetPort(2134)); @@ -1839,8 +1825,6 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { Y_UNIT_TEST(MvccSnapshotReadLockedWrites) { TPortManager pm; TServerSettings::TControls controls; - controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); - controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); controls.MutableDataShardControls()->SetEnableLockedWrites(1); TServerSettings serverSettings(pm.GetPort(2134)); @@ -1997,8 +1981,6 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { Y_UNIT_TEST(MvccSnapshotLockedWritesWithReadConflicts) { TPortManager pm; TServerSettings::TControls controls; - controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); - controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); controls.MutableDataShardControls()->SetEnableLockedWrites(1); NKikimrConfig::TAppConfig app; @@ -2146,8 +2128,6 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { Y_UNIT_TEST(LockedWriteBulkUpsertConflict) { TPortManager pm; TServerSettings::TControls controls; - controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); - controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); controls.MutableDataShardControls()->SetEnableLockedWrites(1); TServerSettings serverSettings(pm.GetPort(2134)); @@ -2244,8 +2224,6 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { Y_UNIT_TEST(LockedWriteReuseAfterCommit) { TPortManager pm; TServerSettings::TControls controls; - controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); - controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); controls.MutableDataShardControls()->SetEnableLockedWrites(1); TServerSettings serverSettings(pm.GetPort(2134)); @@ -2329,8 +2307,6 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { Y_UNIT_TEST(LockedWriteDistributedCommitSuccess) { TPortManager pm; TServerSettings::TControls controls; - controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); - controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); controls.MutableDataShardControls()->SetEnableLockedWrites(1); TServerSettings serverSettings(pm.GetPort(2134)); @@ -2436,8 +2412,6 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { Y_UNIT_TEST(LockedWriteDistributedCommitAborted) { TPortManager pm; TServerSettings::TControls controls; - controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); - controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); controls.MutableDataShardControls()->SetEnableLockedWrites(1); TServerSettings serverSettings(pm.GetPort(2134)); @@ -2548,8 +2522,6 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { Y_UNIT_TEST(LockedWriteDistributedCommitFreeze) { TPortManager pm; TServerSettings::TControls controls; - controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); - controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); controls.MutableDataShardControls()->SetEnableLockedWrites(1); TServerSettings serverSettings(pm.GetPort(2134)); @@ -2675,8 +2647,6 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { Y_UNIT_TEST(LockedWriteDistributedCommitCrossConflict) { TPortManager pm; TServerSettings::TControls controls; - controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); - controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); controls.MutableDataShardControls()->SetEnableLockedWrites(1); TServerSettings serverSettings(pm.GetPort(2134)); @@ -2857,8 +2827,6 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { Y_UNIT_TEST(LockedWriteCleanupOnSplit) { TPortManager pm; TServerSettings::TControls controls; - controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); - controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); controls.MutableDataShardControls()->SetEnableLockedWrites(1); TServerSettings serverSettings(pm.GetPort(2134)); @@ -2954,8 +2922,6 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { Y_UNIT_TEST(LockedWriteCleanupOnCopyTable) { TPortManager pm; TServerSettings::TControls controls; - controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); - controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); controls.MutableDataShardControls()->SetEnableLockedWrites(1); TServerSettings serverSettings(pm.GetPort(2134)); @@ -3076,8 +3042,6 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { Y_UNIT_TEST_TWIN(LockedWriteWithAsyncIndex, WithRestart) { TPortManager pm; TServerSettings::TControls controls; - controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); - controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); controls.MutableDataShardControls()->SetEnableLockedWrites(1); TServerSettings serverSettings(pm.GetPort(2134)); @@ -3211,8 +3175,6 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { Y_UNIT_TEST(LockedWritesLimitedPerKey) { TPortManager pm; TServerSettings::TControls controls; - controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); - controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); controls.MutableDataShardControls()->SetEnableLockedWrites(1); controls.MutableDataShardControls()->SetMaxLockedWritesPerKey(2); @@ -3407,8 +3369,6 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { Y_UNIT_TEST(LockedWriteWithAsyncIndexAndVolatileCommit) { TPortManager pm; TServerSettings::TControls controls; - controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); - controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); controls.MutableDataShardControls()->SetEnableLockedWrites(1); TServerSettings serverSettings(pm.GetPort(2134)); diff --git a/ydb/core/tx/datashard/ut_common/datashard_ut_common.h b/ydb/core/tx/datashard/ut_common/datashard_ut_common.h index 752631f4e7e8..6b2482cd90c3 100644 --- a/ydb/core/tx/datashard/ut_common/datashard_ut_common.h +++ b/ydb/core/tx/datashard/ut_common/datashard_ut_common.h @@ -56,7 +56,6 @@ class TTester : public TNonCopyable { bool DelayReadSet; bool DelayData; bool RebootOnDelay; - std::optional Mvcc; ui64 ExecutorCacheSize; TOptions(ui64 firstStep = 0) @@ -71,7 +70,6 @@ class TTester : public TNonCopyable { void EnableOutOfOrder(ui32 num = 8) { NumActiveTx = num; } void EnableSoftUpdates() { SoftUpdates = true; } - void EnableMvcc(std::optional enabled = {true}) { Mvcc = enabled; } TString PartConfig() const { TString pipelineConfig = Sprintf(R"(PipelineConfig { diff --git a/ydb/core/tx/tx_proxy/datareq.cpp b/ydb/core/tx/tx_proxy/datareq.cpp index 88e7744cb4e2..d5c66d3f6ab4 100644 --- a/ydb/core/tx/tx_proxy/datareq.cpp +++ b/ydb/core/tx/tx_proxy/datareq.cpp @@ -993,7 +993,6 @@ void TDataReq::ProcessFlatMKQLResolve(NSchemeCache::TSchemeCacheRequest *cacheRe rsCount == 0 && engine.GetAffectedShardCount() > 1 && ((TxFlags & NTxDataShard::TTxFlags::ForceOnline) == 0) && - AppData(ctx)->FeatureFlags.GetEnableMvccSnapshotReads() && !DatabaseName.empty()); if (forceSnapshot) { diff --git a/ydb/tests/functional/serverless/test_serverless.py b/ydb/tests/functional/serverless/test_serverless.py index 2659b5db7a62..99d795fe989a 100644 --- a/ydb/tests/functional/serverless/test_serverless.py +++ b/ydb/tests/functional/serverless/test_serverless.py @@ -23,7 +23,6 @@ erasure=Erasure.NONE, nodes=1, enable_metering=True, - disable_mvcc=True, additional_log_configs={ 'TX_PROXY': LogLevels.DEBUG, 'KQP_PROXY': LogLevels.DEBUG, @@ -48,7 +47,10 @@ }, enforce_user_token_requirement=True, default_user_sid='user@builtin', - extra_feature_flags=['enable_serverless_exclusive_dynamic_nodes'] + extra_feature_flags=['enable_serverless_exclusive_dynamic_nodes'], + datashard_config={ + 'keep_snapshot_timeout': 5000, + }, ) diff --git a/ydb/tests/library/harness/kikimr_config.py b/ydb/tests/library/harness/kikimr_config.py index 41b409d66bb6..5a8c3a680758 100644 --- a/ydb/tests/library/harness/kikimr_config.py +++ b/ydb/tests/library/harness/kikimr_config.py @@ -143,7 +143,6 @@ def __init__( public_http_config=None, enable_datastreams=False, auth_config_path=None, - disable_mvcc=False, enable_public_api_external_blobs=False, node_kind=None, bs_cache_file_path=None, @@ -158,6 +157,7 @@ def __init__( extra_feature_flags=None, # list[str] extra_grpc_services=None, # list[str] hive_config=None, + datashard_config=None, enforce_user_token_requirement=False, default_user_sid=None ): @@ -260,7 +260,6 @@ def __init__( self.yaml_config["table_service_config"]["enable_kqp_data_query_stream_lookup"] = False self.yaml_config["feature_flags"]["enable_public_api_external_blobs"] = enable_public_api_external_blobs - self.yaml_config["feature_flags"]["enable_mvcc"] = "VALUE_FALSE" if disable_mvcc else "VALUE_TRUE" for extra_feature_flag in extra_feature_flags: self.yaml_config["feature_flags"][extra_feature_flag] = True if enable_alter_database_create_hive_first: @@ -344,6 +343,9 @@ def __init__( if hive_config: self.yaml_config["hive_config"] = hive_config + if datashard_config: + self.yaml_config["data_shard_config"] = datashard_config + self.__build() if self.grpc_ssl_enable: