Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ class TKqpDataExecuter: public TKqpExecuterBase<TKqpDataExecuter, EExecType::Dat
LOG_E("Shard " << tabletId << " transaction lost during reconnect: " << record.GetStatus());

CancelProposal(tabletId);
ReplyTxStateUnknown(tabletId);
ReplyUnavailable(TStringBuilder() << "Disconnected from shard " << tabletId);
}

void HandlePrepare(TEvDqCompute::TEvState::TPtr& ev) {
Expand Down Expand Up @@ -579,7 +579,7 @@ class TKqpDataExecuter: public TKqpExecuterBase<TKqpDataExecuter, EExecType::Dat
return ReplyUnavailable(TStringBuilder() << "Could not prepare program on shard " << msg->TabletId);
}

return ReplyTxStateUnknown(msg->TabletId);
return ReplyUnavailable(TStringBuilder() << "Disconnected from shard " << msg->TabletId);
}

case TShardState::EState::Prepared: {
Expand All @@ -599,7 +599,7 @@ class TKqpDataExecuter: public TKqpExecuterBase<TKqpDataExecuter, EExecType::Dat
<< (msg->NotDelivered ? ", last message not delivered" : ""));

CancelProposal(0);
return ReplyTxStateUnknown(msg->TabletId);
return ReplyUnavailable(TStringBuilder() << "Disconnected from shard " << msg->TabletId);
}

case TShardState::EState::Initial:
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ void TDataShard::SendDelayedAcks(const TActorContext& ctx, TVector<THolder<IEven
delayedAcks.clear();
}

void TDataShard::GetCleanupReplies(const TOperation::TPtr& op, std::vector<std::unique_ptr<IEventHandle>>& cleanupReplies) {
void TDataShard::GetCleanupReplies(TOperation* op, std::vector<std::unique_ptr<IEventHandle>>& cleanupReplies) {
if (!op->HasOutputData()) {
// There are no replies
return;
Expand All @@ -562,6 +562,10 @@ void TDataShard::GetCleanupReplies(const TOperation::TPtr& op, std::vector<std::
expectedReadSets.clear();
}

void TDataShard::GetCleanupReplies(const TOperation::TPtr& op, std::vector<std::unique_ptr<IEventHandle>>& cleanupReplies) {
GetCleanupReplies(op.Get(), cleanupReplies);
}

void TDataShard::SendConfirmedReplies(TMonotonic ts, std::vector<std::unique_ptr<IEventHandle>>&& replies) {
if (replies.empty()) {
return;
Expand Down
33 changes: 31 additions & 2 deletions ydb/core/tx/datashard/datashard_active_transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -911,12 +911,12 @@ bool TActiveTransaction::OnStopping(TDataShard& self, const TActorContext& ctx)
<< " because datashard "
<< self.TabletID()
<< " is restarting";
auto result = MakeHolder<TEvDataShard::TEvProposeTransactionResult>(
auto result = std::make_unique<TEvDataShard::TEvProposeTransactionResult>(
kind, self.TabletID(), GetTxId(), rejectStatus);
result->AddError(NKikimrTxDataShard::TError::WRONG_SHARD_STATE, rejectReason);
LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, rejectReason);

ctx.Send(GetTarget(), result.Release(), 0, GetCookie());
ctx.Send(GetTarget(), result.release(), 0, GetCookie());

self.IncCounter(COUNTER_PREPARE_OVERLOADED);
self.IncCounter(COUNTER_PREPARE_COMPLETE);
Expand All @@ -925,6 +925,35 @@ bool TActiveTransaction::OnStopping(TDataShard& self, const TActorContext& ctx)

// Immediate ops become ready when stopping flag is set
return true;
} else if (HasVolatilePrepareFlag()) {
// Volatile transactions may be aborted at any time unless executed
// Note: we need to send the result (and discard the transaction) as
// soon as possible, because new transactions are unlikely to execute
// and commits will even more likely fail.
if (!HasResultSentFlag() && !Result() && !HasCompletedFlag()) {
auto kind = static_cast<NKikimrTxDataShard::ETransactionKind>(GetKind());
auto status = NKikimrTxDataShard::TEvProposeTransactionResult::ABORTED;
auto result = std::make_unique<TEvDataShard::TEvProposeTransactionResult>(
kind, self.TabletID(), GetTxId(), status);
result->AddError(NKikimrTxDataShard::TError::EXECUTION_CANCELLED, TStringBuilder()
<< "DataShard " << self.TabletID() << " is restarting");
ctx.Send(GetTarget(), result.release(), 0, GetCookie());

// Make sure we also send acks and nodata readsets to expecting participants
std::vector<std::unique_ptr<IEventHandle>> cleanupReplies;
self.GetCleanupReplies(this, cleanupReplies);

for (auto& ev : cleanupReplies) {
TActivationContext::Send(ev.release());
}

SetResultSentFlag();
return true;
}

// Executed transactions will have to wait until committed
// There is no way to hand-off committing volatile transactions for now
return false;
} else {
// Distributed operations send notification when proposed
if (GetTarget() && !HasCompletedFlag()) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/datashard/datashard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1493,6 +1493,7 @@ class TDataShard
TVector<THolder<TEvTxProcessing::TEvReadSet>> &&readsets);
void ResendReadSet(const TActorContext& ctx, ui64 step, ui64 txId, ui64 source, ui64 target, const TString& body, ui64 seqno);
void SendDelayedAcks(const TActorContext& ctx, TVector<THolder<IEventHandle>>& delayedAcks) const;
void GetCleanupReplies(TOperation* op, std::vector<std::unique_ptr<IEventHandle>>& cleanupReplies);
void GetCleanupReplies(const TOperation::TPtr& op, std::vector<std::unique_ptr<IEventHandle>>& cleanupReplies);
void SendConfirmedReplies(TMonotonic ts, std::vector<std::unique_ptr<IEventHandle>>&& replies);
void SendCommittedReplies(std::vector<std::unique_ptr<IEventHandle>>&& replies);
Expand Down
28 changes: 17 additions & 11 deletions ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -303,20 +303,26 @@ Y_UNIT_TEST(ProposeResultLost_RwTx) {
TestProposeResultLost(*fixture.Runtime, fixture.Client,
Q_(R"(
upsert into `/Root/table-1` (key, value) VALUES
(1, 1), (1073741823, 1073741823), (2147483647, 2147483647), (4294967295, 4294967295)
(1, 11), (1073741823, 1073741823), (2147483647, 2147483647), (4294967295, 4294967295)
)"),
[](const NKikimrKqp::TEvQueryResponse& record) {
UNIT_ASSERT_VALUES_EQUAL_C(record.GetYdbStatus(), Ydb::StatusIds::UNDETERMINED, record.DebugString());

TIssues issues;
IssuesFromMessage(record.GetResponse().GetQueryIssues(), issues);
UNIT_ASSERT_C(HasIssue(issues, NYql::TIssuesIds::KIKIMR_OPERATION_STATE_UNKNOWN,
"State of operation is unknown."), record.GetResponse().DebugString());

UNIT_ASSERT_C(HasIssue(issues, NKikimrIssues::TIssuesIds::TX_STATE_UNKNOWN, "", [] (const TIssue& issue) {
return issue.GetMessage().StartsWith("Tx state unknown for shard ");
}), record.GetResponse().DebugString());
UNIT_ASSERT_VALUES_EQUAL_C(record.GetYdbStatus(), Ydb::StatusIds::UNAVAILABLE, record.DebugString());

TIssues issues;
IssuesFromMessage(record.GetResponse().GetQueryIssues(), issues);
UNIT_ASSERT_C(
HasIssue(issues, NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE,
"Kikimr cluster or one of its subsystems was unavailable."),
record.GetResponse().DebugString());
});

// Verify that the transaction didn't commit
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleExec(*fixture.Runtime,
Q_("SELECT key, value FROM `/Root/table-1` ORDER BY key")),
"{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
"{ items { uint32_value: 2 } items { uint32_value: 2 } }, "
"{ items { uint32_value: 3 } items { uint32_value: 3 } }");
}

} // suite
Expand Down
210 changes: 206 additions & 4 deletions ydb/core/tx/datashard/datashard_ut_volatile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3020,10 +3020,12 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {

// We need to fill table with some data
Cerr << "========= Upserting initial values =========" << Endl;
KqpSimpleExec(runtime, R"(
UPSERT INTO `/Root/table` (key, subkey, value)
VALUES (1, 1), (11, 11)
)");
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleExec(runtime, R"(
UPSERT INTO `/Root/table` (key, value)
VALUES (1, 1), (11, 11)
)"),
"<empty>");

TForceVolatileProposeArbiter forceArbiter(runtime, shards.at(0));
TBlockEvents<TEvTxProcessing::TEvPlanStep> blockedPlan(runtime,
Expand Down Expand Up @@ -3080,6 +3082,206 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {
Cerr << "... split finished" << Endl;
}

Y_UNIT_TEST(DistributedUpsertRestartBeforePrepare) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
.SetEnableDataShardVolatileTransactions(true);

Tests::TServer::TPtr server = new TServer(serverSettings);
auto &runtime = *server->GetRuntime();
auto sender = runtime.AllocateEdgeActor();

runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::PIPE_CLIENT, NLog::PRI_TRACE);

InitRoot(server, sender);

Cerr << "========= Creating the table =========" << Endl;
UNIT_ASSERT_VALUES_EQUAL(
KqpSchemeExec(runtime, R"(
CREATE TABLE `/Root/table` (key uint32, value uint32, PRIMARY KEY (key))
WITH (PARTITION_AT_KEYS = (10));
)"),
"SUCCESS");

const auto shards = GetTableShards(server, sender, "/Root/table");
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u);

// We need to fill table with some data
Cerr << "========= Upserting initial values =========" << Endl;
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleExec(runtime, R"(
UPSERT INTO `/Root/table` (key, value)
VALUES (1, 1), (11, 11)
)"),
"<empty>");

TBlockEvents<TEvDataShard::TEvProposeTransaction> blockedPrepare(runtime);

Cerr << "========= Starting upsert 1 =========" << Endl;
auto upsertFuture1 = KqpSimpleSend(runtime, R"(
UPSERT INTO `/Root/table` (key, value)
VALUES (2, 2), (12, 12);
)");

runtime.WaitFor("prepare requests", [&]{ return blockedPrepare.size() >= 2; });
UNIT_ASSERT_VALUES_EQUAL(blockedPrepare.size(), 2u);

blockedPrepare.Stop();

Cerr << "========= Restarting shard 1 =========" << Endl;
GracefulRestartTablet(runtime, shards.at(0), sender);

UNIT_ASSERT_VALUES_EQUAL(
FormatResult(runtime.WaitFuture(std::move(upsertFuture1))),
"ERROR: UNAVAILABLE");
}

Y_UNIT_TEST(DistributedUpsertRestartAfterPrepare) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
.SetEnableDataShardVolatileTransactions(true);

Tests::TServer::TPtr server = new TServer(serverSettings);
auto &runtime = *server->GetRuntime();
auto sender = runtime.AllocateEdgeActor();

runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::PIPE_CLIENT, NLog::PRI_TRACE);

InitRoot(server, sender);

Cerr << "========= Creating the table =========" << Endl;
UNIT_ASSERT_VALUES_EQUAL(
KqpSchemeExec(runtime, R"(
CREATE TABLE `/Root/table` (key uint32, value uint32, PRIMARY KEY (key))
WITH (PARTITION_AT_KEYS = (10));
)"),
"SUCCESS");

const auto shards = GetTableShards(server, sender, "/Root/table");
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u);

// We need to fill table with some data
Cerr << "========= Upserting initial values =========" << Endl;
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleExec(runtime, R"(
UPSERT INTO `/Root/table` (key, value)
VALUES (1, 1), (11, 11)
)"),
"<empty>");

TBlockEvents<TEvDataShard::TEvProposeTransactionResult> blockedPrepare(runtime);

Cerr << "========= Starting upsert 1 =========" << Endl;
auto upsertFuture1 = KqpSimpleSend(runtime, R"(
UPSERT INTO `/Root/table` (key, value)
VALUES (2, 2), (12, 12);
)");

runtime.WaitFor("prepare results", [&]{ return blockedPrepare.size() >= 2; });
UNIT_ASSERT_VALUES_EQUAL(blockedPrepare.size(), 2u);

for (auto& ev : blockedPrepare) {
auto* msg = ev->Get();
UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetStatus(), NKikimrTxDataShard::TEvProposeTransactionResult::PREPARED);
}

// Unblock prepare results and restart the first shard
blockedPrepare.Stop().Unblock();

Cerr << "========= Restarting shard 1 =========" << Endl;
GracefulRestartTablet(runtime, shards.at(0), sender);

UNIT_ASSERT_VALUES_EQUAL(
FormatResult(runtime.WaitFuture(std::move(upsertFuture1))),
"ERROR: ABORTED");
}

Y_UNIT_TEST(DistributedUpsertRestartAfterPlan) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
.SetEnableDataShardVolatileTransactions(true);

Tests::TServer::TPtr server = new TServer(serverSettings);
auto &runtime = *server->GetRuntime();
auto sender = runtime.AllocateEdgeActor();

runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::PIPE_CLIENT, NLog::PRI_TRACE);

InitRoot(server, sender);

Cerr << "========= Creating the table =========" << Endl;
UNIT_ASSERT_VALUES_EQUAL(
KqpSchemeExec(runtime, R"(
CREATE TABLE `/Root/table` (key uint32, value uint32, PRIMARY KEY (key))
WITH (PARTITION_AT_KEYS = (10));
)"),
"SUCCESS");

const auto shards = GetTableShards(server, sender, "/Root/table");
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u);

// We need to fill table with some data
Cerr << "========= Upserting initial values =========" << Endl;
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleExec(runtime, R"(
UPSERT INTO `/Root/table` (key, value)
VALUES (1, 1), (11, 11)
)"),
"<empty>");

TBlockEvents<TEvTxProcessing::TEvPlanStep> blockedPlan(runtime);

Cerr << "========= Starting upsert 1 =========" << Endl;
auto upsertFuture1 = KqpSimpleSend(runtime, R"(
UPSERT INTO `/Root/table` (key, value)
VALUES (2, 2), (12, 12);
)");

runtime.WaitFor("shard plans", [&]{ return blockedPlan.size() >= 2; });
UNIT_ASSERT_VALUES_EQUAL(blockedPlan.size(), 2u);

// Block TEvPrivate::TEvProgressTransaction for shard1
auto shard1actor = ResolveTablet(runtime, shards.at(0));
TBlockEvents<IEventHandle> blockedProgress(runtime,
[&](const TAutoPtr<IEventHandle>& ev) {
return ev->GetRecipientRewrite() == shard1actor &&
ev->GetTypeRewrite() == EventSpaceBegin(TKikimrEvents::ES_PRIVATE) + 0;
});

// Unblock prepare results and restart the first shard
blockedPlan.Stop().Unblock();
runtime.WaitFor("blocked progress", [&]{ return blockedProgress.size() >= 1; });
UNIT_ASSERT_VALUES_EQUAL(blockedProgress.size(), 1u);

Cerr << "... sleeping for 1 second" << Endl;
runtime.SimulateSleep(TDuration::Seconds(1));

Cerr << "========= Restarting shard 1 =========" << Endl;
GracefulRestartTablet(runtime, shards.at(0), sender);

UNIT_ASSERT_VALUES_EQUAL(
FormatResult(runtime.WaitFuture(std::move(upsertFuture1))),
"ERROR: ABORTED");

Cerr << "========= Checking table =========" << Endl;
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleExec(runtime, R"(
SELECT key, value FROM `/Root/table`
ORDER BY key;
)"),
"{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
"{ items { uint32_value: 11 } items { uint32_value: 11 } }");
}

} // Y_UNIT_TEST_SUITE(DataShardVolatile)

} // namespace NKikimr
Loading
Loading