Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_volatile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include <ydb/core/base/blobstorage.h>
#include <ydb/core/kqp/executer_actor/kqp_executer.h>
#include <ydb/core/testlib/actors/block_events.h>

namespace NKikimr {

Expand Down Expand Up @@ -2988,6 +2989,94 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {
"ERROR: ABORTED");
}

Y_UNIT_TEST(UpsertDependenciesShardsRestart) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
.SetEnableDataShardVolatileTransactions(true);

Tests::TServer::TPtr server = new TServer(serverSettings);
auto &runtime = *server->GetRuntime();
auto sender = runtime.AllocateEdgeActor();

runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::PIPE_CLIENT, NLog::PRI_TRACE);

InitRoot(server, sender);

UNIT_ASSERT_VALUES_EQUAL(
KqpSchemeExec(runtime, R"(
CREATE TABLE `/Root/table` (key uint32, value uint32, PRIMARY KEY (key))
WITH (PARTITION_AT_KEYS = (10));
)"),
"SUCCESS");

const auto shards = GetTableShards(server, sender, "/Root/table");
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u);

// We need to fill table with some data
Cerr << "========= Upserting initial values =========" << Endl;
KqpSimpleExec(runtime, R"(
UPSERT INTO `/Root/table` (key, subkey, value)
VALUES (1, 1), (11, 11)
)");

TForceVolatileProposeArbiter forceArbiter(runtime, shards.at(0));
TBlockEvents<TEvTxProcessing::TEvPlanStep> blockedPlan(runtime,
[actor = ResolveTablet(runtime, shards.at(0))](const auto& ev) {
return ev->GetRecipientRewrite() == actor;
});

Cerr << "========= Starting upsert 1 =========" << Endl;
auto upsertFuture1 = KqpSimpleSend(runtime, R"(
UPSERT INTO `/Root/table` (key, value)
VALUES (2, 2), (12, 12);
)");
runtime.SimulateSleep(TDuration::Seconds(1));

Cerr << "========= Starting upsert 2 =========" << Endl;
auto upsertFuture2 = KqpSimpleSend(runtime, R"(
UPSERT INTO `/Root/table` (key, value)
VALUES (2, 1002), (12, 1012);
)");
runtime.SimulateSleep(TDuration::Seconds(1));

UNIT_ASSERT_VALUES_EQUAL(blockedPlan.size(), 2u);

// We expect transaction to execute at shards[1]
// However at shards[0] it didn't even start due to blocked plans
// Now we need to restart both shards, without giving them a chance to communicate
std::vector<TActorId> shardActors{
ResolveTablet(runtime, shards.at(0)),
ResolveTablet(runtime, shards.at(1)),
};
for (auto& shardActor : shardActors) {
Cerr << "... killing actor " << shardActor << Endl;
// Perform a synchronous send, this makes sure both shards handle TEvPoison before anything else
runtime.Send(new IEventHandle(shardActor, TActorId(), new TEvents::TEvPoison), 0, /* viaActorSystem */ false);
}

blockedPlan.Stop().clear();

// Both queries should abort with UNDETERMINED
Cerr << "... waiting for query results" << Endl;
UNIT_ASSERT_VALUES_EQUAL(
FormatResult(runtime.WaitFuture(std::move(upsertFuture1))),
"ERROR: UNDETERMINED");
UNIT_ASSERT_VALUES_EQUAL(
FormatResult(runtime.WaitFuture(std::move(upsertFuture2))),
"ERROR: UNDETERMINED");

// Split the second shard, which makes sure it's not stuck
Cerr << "========= Splitting shard 2 =========" << Endl;
SetSplitMergePartCountLimit(server->GetRuntime(), -1);
ui64 txId = AsyncSplitTable(server, sender, "/Root/table", shards.at(1), 15);
Cerr << "... split txId# " << txId << " started" << Endl;
WaitTxNotification(server, sender, txId);
Cerr << "... split finished" << Endl;
}

} // Y_UNIT_TEST_SUITE(DataShardVolatile)

} // namespace NKikimr
9 changes: 4 additions & 5 deletions ydb/core/tx/datashard/volatile_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,6 @@ namespace NKikimr::NDataShard {

void TVolatileTxManager::Start(const TActorContext& ctx) {
for (auto& pr : VolatileTxs) {
if (!pr.second->Dependencies.empty()) {
continue;
}
switch (pr.second->State) {
case EVolatileTxState::Waiting:
for (ui64 target : pr.second->Participants) {
Expand Down Expand Up @@ -875,7 +872,7 @@ namespace NKikimr::NDataShard {
if (info->AddCommitted) {
RunCommitCallbacks(info);
}
if (info->Dependencies.empty() && ReadyToDbCommit(info)) {
if (ReadyToDbCommit(info)) {
AddPendingCommit(txId);
}
}
Expand Down Expand Up @@ -926,7 +923,9 @@ namespace NKikimr::NDataShard {
case EVolatileTxState::Waiting:
break;
case EVolatileTxState::Committed:
AddPendingCommit(dependentTxId);
if (ReadyToDbCommit(dependent)) {
AddPendingCommit(dependentTxId);
}
break;
case EVolatileTxState::Aborting:
Y_ABORT("FIXME: unexpected dependency removed from aborting tx");
Expand Down