Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions ydb/core/base/feature_flags.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,6 @@ class TFeatureFlags: public NKikimrConfig::TFeatureFlags {
using TBase::TBase;
using TBase::operator=;

inline std::optional<bool> GetEnableMvcc() const {
switch (TBase::GetEnableMvcc()) {
case NKikimrConfig::TFeatureFlags::UNSET:
return std::nullopt;
case NKikimrConfig::TFeatureFlags::VALUE_TRUE:
return true;
case NKikimrConfig::TFeatureFlags::VALUE_FALSE:
return false;
}
}

inline void SetEnableBackgroundCompactionForTest(bool value) {
SetEnableBackgroundCompaction(value);
}
Expand Down
8 changes: 6 additions & 2 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
!ImmediateTx &&
!HasPersistentChannels &&
!HasOlapTable &&
(!Database.empty() || AppData()->EnableMvccSnapshotWithLegacyDomainRoot) &&
AppData()->FeatureFlags.GetEnableMvccSnapshotReads()
(!Database.empty() || AppData()->EnableMvccSnapshotWithLegacyDomainRoot)
);

return forceSnapshot;
Expand Down Expand Up @@ -1272,6 +1271,11 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
LOG_N("Shard " << msg->TabletId << " lost pipe while waiting for reply"
<< (msg->NotDelivered ? " (last message not delivered)" : ""));

if (ReadOnlyTx && msg->NotDelivered) {
CancelProposal(msg->TabletId);
return ReplyUnavailable(TStringBuilder() << "Could not deliver program to shard " << msg->TabletId);
}

return ReplyTxStateUnknown(msg->TabletId);
}

Expand Down
5 changes: 2 additions & 3 deletions ydb/core/kqp/session_actor/kqp_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,9 @@ TKqpTransactionInfo TKqpTransactionContext::GetInfo() const {
bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfiguration& config, bool rollbackTx,
bool commitTx, const NKqpProto::TKqpPhyQuery& physicalQuery)
{
if (*txCtx.EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE)
return false;
Y_UNUSED(config);

if (!config.FeatureFlags.GetEnableMvccSnapshotReads())
if (*txCtx.EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE)
return false;

if (txCtx.GetSnapshot().IsValid())
Expand Down
4 changes: 0 additions & 4 deletions ydb/core/kqp/ut/common/kqp_ut_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,6 @@ class TKikimrRunner {
return GetTestServer().GetRuntime()->WaitFuture(future);
}

bool IsUsingSnapshotReads() const {
return Server->GetRuntime()->GetAppData().FeatureFlags.GetEnableMvccSnapshotReads();
}

private:
void Initialize(const TKikimrSettings& settings);
void WaitForKqpProxyInit();
Expand Down
34 changes: 4 additions & 30 deletions ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -856,14 +856,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
SELECT * FROM `/Root/TwoShard` WHERE Key = 2;
)", TTxControl::Tx(*tx).CommitTx()).GetValueSync();

if (kikimr.IsUsingSnapshotReads()) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
} else {
UNIT_ASSERT(!result.IsSuccess());
result.GetIssues().PrintTo(Cerr);
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::ABORTED);
UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED));
}
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

Y_UNIT_TEST(LocksMultiShard) {
Expand All @@ -888,14 +881,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
SELECT * FROM `/Root/EightShard`;
)", TTxControl::Tx(*tx).CommitTx()).GetValueSync();

if (kikimr.IsUsingSnapshotReads()) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
} else {
UNIT_ASSERT(!result.IsSuccess());
result.GetIssues().PrintTo(Cerr);
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::ABORTED);
UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED));
}
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

Y_UNIT_TEST(LocksMultiShardOk) {
Expand Down Expand Up @@ -1016,13 +1002,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
SELECT 42;
)", TTxControl::Tx(*tx).CommitTx()).GetValueSync();

if (kikimr.IsUsingSnapshotReads()) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
} else {
Cerr << result.GetIssues().ToString() << Endl;
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString());
UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED));
}
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

Y_UNIT_TEST(BrokenLocksAtROTxSharded) {
Expand Down Expand Up @@ -1050,13 +1030,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
SELECT 42;
)", TTxControl::Tx(*tx).CommitTx()).GetValueSync();

if (kikimr.IsUsingSnapshotReads()) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
} else {
Cerr << result.GetIssues().ToString() << Endl;
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString());
UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED));
}
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

Y_UNIT_TEST(BrokenLocksOnUpdate) {
Expand Down
7 changes: 1 addition & 6 deletions ydb/core/kqp/ut/tx/kqp_tx_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -357,12 +357,7 @@ Y_UNIT_TEST_SUITE(KqpTx) {
}

auto commitResult = tx->Commit().ExtractValueSync();
if (kikimr.IsUsingSnapshotReads()) {
UNIT_ASSERT_VALUES_EQUAL_C(commitResult.GetStatus(), EStatus::SUCCESS, commitResult.GetIssues().ToString());
} else {
UNIT_ASSERT_VALUES_EQUAL_C(commitResult.GetStatus(), EStatus::ABORTED, commitResult.GetIssues().ToString());
UNIT_ASSERT(HasIssue(commitResult.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED));
}
UNIT_ASSERT_VALUES_EQUAL_C(commitResult.GetStatus(), EStatus::SUCCESS, commitResult.GetIssues().ToString());
}

Y_UNIT_TEST(EmptyTxOnCommit) {
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1112,13 +1112,14 @@ message TImmediateControlsConfig {
MaxValue: 134217728,
DefaultValue: 0 }];

// Note: these settings are deprecated (always enabled)
optional uint64 PrioritizedMvccSnapshotReads = 8 [(ControlOptions) = {
Description: "Enables prioritized mvcc snapshot reads over immediate writes",
Description: "Enables prioritized mvcc snapshot reads over immediate writes (deprecated, always enabled)",
MinValue: 0,
MaxValue: 1,
DefaultValue: 1 }];
optional uint64 UnprotectedMvccSnapshotReads = 9 [(ControlOptions) = {
Description: "Enables unprotected (fully readonly) mvcc snapshot reads",
Description: "Enables unprotected (fully readonly) mvcc snapshot reads (deprecated, always enabled)",
MinValue: 0,
MaxValue: 1,
DefaultValue: 1 }];
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/protos/feature_flags.proto
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ message TFeatureFlags {
optional bool EnableKqpScanOverPersistentSnapshot = 43 [default = true]; // deprecated: always true
optional bool EnableOlapSchemaOperations = 44 [default = true];
optional bool EnableVPatch = 45 [default = true];
optional bool EnableMvccSnapshotReads = 46 [default = true];
optional Tribool EnableMvcc = 47 [default = VALUE_TRUE];
optional bool EnableMvccSnapshotReads = 46 [default = true]; // deprecated: always true
optional Tribool EnableMvcc = 47 [default = VALUE_TRUE]; // deprecated: always true
optional bool EnableSchemeTransactionsAtSchemeShard = 48 [default = true];
optional bool EnableArrowFormatAtDatashard = 49 [default = false];
optional bool Enable3x3RequestsForMirror3DCMinLatencyPut = 50 [default = false];
Expand Down
1 change: 0 additions & 1 deletion ydb/core/testlib/basics/feature_flags.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ class TTestFeatureFlagsHolder {
FEATURE_FLAG_SETTER(EnableDataColumnForIndexTable)
FEATURE_FLAG_SETTER(EnableClockGettimeForUserCpuAccounting)
FEATURE_FLAG_SETTER(EnableOlapSchemaOperations)
FEATURE_FLAG_SETTER(EnableMvccSnapshotReads)
FEATURE_FLAG_SETTER(EnableBackgroundCompaction)
FEATURE_FLAG_SETTER(EnableBackgroundCompactionServerless)
FEATURE_FLAG_SETTER(EnableBorrowedSplitCompaction)
Expand Down
31 changes: 10 additions & 21 deletions ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,6 @@ TDataShard::TDataShard(const TActorId &tablet, TTabletStorageInfo *info)
, BackupReadAheadHi(0, 0, 128*1024*1024)
, TtlReadAheadLo(0, 0, 64*1024*1024)
, TtlReadAheadHi(0, 0, 128*1024*1024)
, EnablePrioritizedMvccSnapshotReads(1, 0, 1)
, EnableUnprotectedMvccSnapshotReads(1, 0, 1)
, EnableLockedWrites(1, 0, 1)
, MaxLockedWritesPerKey(1000, 0, 1000000)
, EnableLeaderLeases(1, 0, 1)
Expand Down Expand Up @@ -325,8 +323,6 @@ void TDataShard::IcbRegister() {
appData->Icb->RegisterSharedControl(TtlReadAheadLo, "DataShardControls.TtlReadAheadLo");
appData->Icb->RegisterSharedControl(TtlReadAheadHi, "DataShardControls.TtlReadAheadHi");

appData->Icb->RegisterSharedControl(EnablePrioritizedMvccSnapshotReads, "DataShardControls.PrioritizedMvccSnapshotReads");
appData->Icb->RegisterSharedControl(EnableUnprotectedMvccSnapshotReads, "DataShardControls.UnprotectedMvccSnapshotReads");
appData->Icb->RegisterSharedControl(EnableLockedWrites, "DataShardControls.EnableLockedWrites");
appData->Icb->RegisterSharedControl(MaxLockedWritesPerKey, "DataShardControls.MaxLockedWritesPerKey");

Expand Down Expand Up @@ -2098,27 +2094,20 @@ TDataShard::TPromotePostExecuteEdges TDataShard::PromoteImmediatePostExecuteEdge
break;

case EPromotePostExecuteEdges::RepeatableRead: {
bool unprotectedReads = GetEnableUnprotectedMvccSnapshotReads();
if (unprotectedReads) {
// We want to use unprotected reads, but we need to make sure it's properly marked first
if (!SnapshotManager.GetPerformedUnprotectedReads()) {
SnapshotManager.SetPerformedUnprotectedReads(true, txc);
res.HadWrites = true;
}
if (!res.HadWrites && !SnapshotManager.IsPerformedUnprotectedReadsCommitted()) {
// We need to wait for completion until the flag is committed
res.WaitCompletion = true;
}
SnapshotManager.PromoteUnprotectedReadEdge(version);
} else if (SnapshotManager.GetPerformedUnprotectedReads()) {
// We want to drop the flag as soon as possible
SnapshotManager.SetPerformedUnprotectedReads(false, txc);
// We want to use unprotected reads, but we need to make sure it's properly marked first
if (!SnapshotManager.GetPerformedUnprotectedReads()) {
SnapshotManager.SetPerformedUnprotectedReads(true, txc);
res.HadWrites = true;
}
if (!res.HadWrites && !SnapshotManager.IsPerformedUnprotectedReadsCommitted()) {
// We need to wait for completion until the flag is committed
res.WaitCompletion = true;
}
SnapshotManager.PromoteUnprotectedReadEdge(version);

// We want to promote the complete edge when protected reads are
// used or when we're already writing something anyway.
if (res.HadWrites || !unprotectedReads) {
if (res.HadWrites) {
res.HadWrites |= SnapshotManager.PromoteCompleteEdge(version, txc);
if (!res.HadWrites && SnapshotManager.GetCommittedCompleteEdge() < version) {
// We need to wait for completion because some other transaction
Expand Down Expand Up @@ -3333,7 +3322,7 @@ bool TDataShard::CheckTxNeedWait(const TEvDataShard::TEvProposeTransaction::TPtr
auto& rec = msg->Record;
if (rec.HasMvccSnapshot()) {
TRowVersion rowVersion(rec.GetMvccSnapshot().GetStep(), rec.GetMvccSnapshot().GetTxId());
TRowVersion unreadableEdge = Pipeline.GetUnreadableEdge(GetEnablePrioritizedMvccSnapshotReads());
TRowVersion unreadableEdge = Pipeline.GetUnreadableEdge();
if (rowVersion >= unreadableEdge) {
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "New transaction reads from " << rowVersion << " which is not before unreadable edge " << unreadableEdge);
LWTRACK(ProposeTransactionWaitSnapshot, msg->Orbit, rowVersion.Step, rowVersion.TxId);
Expand Down
10 changes: 4 additions & 6 deletions ydb/core/tx/datashard/datashard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -589,13 +589,11 @@ class TDataShard::TTxInitSchema : public TTransactionBase<TDataShard> {

Self->PersistSys(db, Schema::Sys_State, Self->State);

if (AppData(ctx)->FeatureFlags.GetEnableMvcc()) {
auto state = *AppData(ctx)->FeatureFlags.GetEnableMvcc() ? EMvccState::MvccEnabled : EMvccState::MvccDisabled;
Self->PersistSys(db, Schema::SysMvcc_State, (ui32)state);
auto state = EMvccState::MvccEnabled;
Self->PersistSys(db, Schema::SysMvcc_State, (ui32)state);

LOG_DEBUG(ctx, NKikimrServices::TX_DATASHARD, TStringBuilder() << "TxInitSchema.Execute"
<< " MVCC state switched to" << (*AppData(ctx)->FeatureFlags.GetEnableMvcc() ? " enabled" : " disabled") << " state");
}
LOG_DEBUG(ctx, NKikimrServices::TX_DATASHARD, TStringBuilder() << "TxInitSchema.Execute"
<< " MVCC state switched to enabled state");

Self->MvccSwitchState = TSwitchState::DONE;
}
Expand Down
3 changes: 1 addition & 2 deletions ydb/core/tx/datashard/datashard__read_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2026,8 +2026,7 @@ class TDataShard::TTxReadViaPipeline : public NTabletFlatExecutor::TTransactionB
snapshotUnavailable = true;
}
} else {
auto prioritizedMvccSnapshotReads = Self->GetEnablePrioritizedMvccSnapshotReads();
TRowVersion unreadableEdge = Self->Pipeline.GetUnreadableEdge(prioritizedMvccSnapshotReads);
TRowVersion unreadableEdge = Self->Pipeline.GetUnreadableEdge();
if (state.ReadVersion >= unreadableEdge) {
LWTRACK(ReadWaitSnapshot, request->Orbit, state.ReadVersion.Step, state.ReadVersion.TxId);
Self->Pipeline.AddWaitingReadIterator(state.ReadVersion, std::move(Ev), ctx);
Expand Down
12 changes: 0 additions & 12 deletions ydb/core/tx/datashard/datashard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1646,16 +1646,6 @@ class TDataShard
return TtlReadAheadHi;
}

bool GetEnablePrioritizedMvccSnapshotReads() const {
ui64 value = EnablePrioritizedMvccSnapshotReads;
return value != 0;
}

bool GetEnableUnprotectedMvccSnapshotReads() const {
ui64 value = EnableUnprotectedMvccSnapshotReads;
return value != 0;
}

bool GetEnableLockedWrites() const {
ui64 value = EnableLockedWrites;
return value != 0;
Expand Down Expand Up @@ -2649,8 +2639,6 @@ class TDataShard
TControlWrapper TtlReadAheadLo;
TControlWrapper TtlReadAheadHi;

TControlWrapper EnablePrioritizedMvccSnapshotReads;
TControlWrapper EnableUnprotectedMvccSnapshotReads;
TControlWrapper EnableLockedWrites;
TControlWrapper MaxLockedWritesPerKey;

Expand Down
Loading