Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix stale read of some acknowledged writes after a table split #2286

Merged
merged 2 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/tx/datashard/create_table_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ EExecutionStatus TCreateTableUnit::Execute(TOperation::TPtr op,
txc.DB.NoMoreReadsForTx();
DataShard.SetPersistState(TShardState::Ready, txc);
DataShard.CheckMvccStateChangeCanStart(ctx); // Recheck
DataShard.SendRegistrationRequestTimeCast(ctx);
}

return EExecutionStatus::DelayCompleteNoMoreRestarts;
Expand Down
68 changes: 58 additions & 10 deletions ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -398,8 +398,23 @@ void TDataShard::SendRegistrationRequestTimeCast(const TActorContext &ctx) {
if (RegistrationSended)
return;

if (!ProcessingParams)
if (!ProcessingParams) {
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID()
<< " not sending time cast registration request in state "
<< DatashardStateName(State)
<< ": missing processing params");
return;
}

if (State == TShardState::WaitScheme ||
State == TShardState::SplitDstReceivingSnapshot)
{
// We don't have all the necessary info yet
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID()
<< " not sending time cast registration request in state "
<< DatashardStateName(State));
return;
}

LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "Send registration request to time cast "
<< DatashardStateName(State) << " tabletId " << TabletID()
Expand Down Expand Up @@ -2027,6 +2042,13 @@ TRowVersion TDataShard::GetMvccTxVersion(EMvccTxMode mode, TOperation* op) const
}
}

LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "GetMvccTxVersion at " << TabletID()
<< " CompleteEdge# " << SnapshotManager.GetCompleteEdge()
<< " IncompleteEdge# " << SnapshotManager.GetIncompleteEdge()
<< " UnprotectedReadEdge# " << SnapshotManager.GetUnprotectedReadEdge()
<< " ImmediateWriteEdge# " << SnapshotManager.GetImmediateWriteEdge()
<< " ImmediateWriteEdgeReplied# " << SnapshotManager.GetImmediateWriteEdgeReplied());

TRowVersion edge;
TRowVersion readEdge = Max(
SnapshotManager.GetCompleteEdge(),
Expand Down Expand Up @@ -2141,6 +2163,8 @@ TDataShard::TPromotePostExecuteEdges TDataShard::PromoteImmediatePostExecuteEdge
// We need to wait for completion until the flag is committed
res.WaitCompletion = true;
}
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "PromoteImmediatePostExecuteEdges at " << TabletID()
<< " promoting UnprotectedReadEdge to " << version);
SnapshotManager.PromoteUnprotectedReadEdge(version);

// We want to promote the complete edge when protected reads are
Expand Down Expand Up @@ -2303,6 +2327,19 @@ void TDataShard::SendAfterMediatorStepActivate(ui64 mediatorStep, const TActorCo
for (auto it = MediatorDelayedReplies.begin(); it != MediatorDelayedReplies.end();) {
const ui64 step = it->first.Step;

if (SrcSplitDescription) {
if (State == TShardState::SplitSrcSendingSnapshot ||
State == TShardState::SplitSrcWaitForPartitioningChanged ||
State == TShardState::PreOffline ||
State == TShardState::Offline)
{
// We cannot send replies, since dst shard is now in charge
// of keeping track of acknowledged writes. So we expect
// split src logic to reboot this shard later.
break;
}
}

if (step <= mediatorStep) {
SnapshotManager.PromoteImmediateWriteEdgeReplied(it->first);
Send(it->second.Target, it->second.Event.Release(), 0, it->second.Cookie);
Expand Down Expand Up @@ -2370,13 +2407,16 @@ void TDataShard::CheckMediatorStateRestored() {
// HEAD reads must include that in their results.
const ui64 waitStep = CoordinatorPrevReadStepMax;
const ui64 readStep = CoordinatorPrevReadStepMax;

LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "CheckMediatorStateRestored: waitStep# " << waitStep << " readStep# " << readStep);
const ui64 observedStep = GetMaxObservedStep();
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "CheckMediatorStateRestored at " << TabletID() << ":"
<< " waitStep# " << waitStep
<< " readStep# " << readStep
<< " observedStep# " << observedStep);

// WARNING: we must perform this check BEFORE we update unprotected read edge
// We may enter this code path multiple times, and we expect that the above
// read step may be refined while we wait based on pessimistic backup step.
if (GetMaxObservedStep() < waitStep) {
if (observedStep < waitStep) {
// We need to wait until we observe mediator step that is at least
// as large as the step we found.
if (MediatorTimeCastWaitingSteps.insert(waitStep).second) {
Expand All @@ -2397,7 +2437,10 @@ void TDataShard::CheckMediatorStateRestored() {
SnapshotManager.GetImmediateWriteEdge().Step > SnapshotManager.GetCompleteEdge().Step
? SnapshotManager.GetImmediateWriteEdge().Prev()
: TRowVersion::Min();
SnapshotManager.PromoteUnprotectedReadEdge(Max(lastReadEdge, preImmediateWriteEdge));
const TRowVersion edge = Max(lastReadEdge, preImmediateWriteEdge);
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "CheckMediatorStateRestored at " << TabletID()
<< " promoting UnprotectedReadEdge to " << edge);
SnapshotManager.PromoteUnprotectedReadEdge(edge);
}

// Promote the replied immediate write edge up to the currently observed step
Expand All @@ -2406,7 +2449,7 @@ void TDataShard::CheckMediatorStateRestored() {
// data that is definitely not replied yet.
if (SnapshotManager.GetImmediateWriteEdgeReplied() < SnapshotManager.GetImmediateWriteEdge()) {
const ui64 writeStep = SnapshotManager.GetImmediateWriteEdge().Step;
const TRowVersion edge(GetMaxObservedStep(), Max<ui64>());
const TRowVersion edge(observedStep, Max<ui64>());
SnapshotManager.PromoteImmediateWriteEdgeReplied(
Min(edge, SnapshotManager.GetImmediateWriteEdge()));
// Try to ensure writes become visible sooner rather than later
Expand Down Expand Up @@ -2543,6 +2586,10 @@ bool TDataShard::CheckDataTxReject(const TString& opDescr,
rejectDescriptions.push_back(TStringBuilder()
<< "is in process of split opId " << DstSplitOpId
<< " state " << DatashardStateName(State));
} else if (State == TShardState::WaitScheme) {
reject = true;
rejectReasons |= ERejectReasons::WrongState;
rejectDescriptions.push_back("is not created yet");
} else if (State == TShardState::PreOffline || State == TShardState::Offline) {
reject = true;
rejectStatus = NKikimrTxDataShard::TEvProposeTransactionResult::ERROR;
Expand Down Expand Up @@ -2705,6 +2752,11 @@ void TDataShard::Handle(TEvDataShard::TEvProposeTransaction::TPtr &ev, const TAc
auto* msg = ev->Get();
LWTRACK(ProposeTransactionRequest, msg->Orbit);

if (CheckDataTxRejectAndReply(ev, ctx)) {
IncCounter(COUNTER_PREPARE_REQUEST);
return;
}

// Check if we need to delay an immediate transaction
if (MediatorStateWaiting &&
(ev->Get()->GetFlags() & TTxFlags::Immediate) &&
Expand Down Expand Up @@ -2737,10 +2789,6 @@ void TDataShard::Handle(TEvDataShard::TEvProposeTransaction::TPtr &ev, const TAc

IncCounter(COUNTER_PREPARE_REQUEST);

if (CheckDataTxRejectAndReply(ev, ctx)) {
return;
}

switch (ev->Get()->GetTxKind()) {
case NKikimrTxDataShard::TX_KIND_DATA:
case NKikimrTxDataShard::TX_KIND_SCAN:
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/datashard/datashard_split_dst.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ class TDataShard::TTxSplitTransferSnapshot : public NTabletFlatExecutor::TTransa

LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " Received snapshot for split/merge TxId " << opId
<< " from tabeltId " << srcTabletId);
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " Received snapshot: " << record.DebugString());

if (!Self->DstSplitSchemaInitialized) {
LegacyInitSchema(txc);
Expand Down Expand Up @@ -293,6 +294,7 @@ class TDataShard::TTxSplitTransferSnapshot : public NTabletFlatExecutor::TTransa

Self->State = TShardState::Ready;
Self->PersistSys(db, Schema::Sys_State, Self->State);
Self->SendRegistrationRequestTimeCast(ctx);
}

return true;
Expand Down
9 changes: 9 additions & 0 deletions ydb/core/tx/datashard/datashard_split_src.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,15 @@ class TDataShard::TTxSplitPartitioningChanged : public NTabletFlatExecutor::TTra
}
}

if (!Self->MediatorDelayedReplies.empty()) {
// We have some pending mediator replies, which must not be replied.
// Unfortunately we may linger around for a long time, and clients
// would keep awaiting replies for all that time. We have to make
// sure those clients receive an appropriate disconnection error
// instead.
ctx.Send(Self->SelfId(), new TEvents::TEvPoison);
}

// TODO: properly check if there are no loans
Self->CheckStateChange(ctx);
}
Expand Down
Loading
Loading