Skip to content

Commit

Permalink
htap locks (#9121)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Sep 12, 2024
1 parent c0d116a commit 67cacf0
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 23 deletions.
70 changes: 50 additions & 20 deletions ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
class TCommitOperation {
private:
const ui64 TabletId;
bool HtapFormat = false;

public:
using TPtr = std::shared_ptr<TCommitOperation>;
Expand All @@ -281,7 +282,7 @@ class TCommitOperation {

bool IsPrimary() const {
AFL_VERIFY(NeedSyncLocks());
return TabletId == *ReceivingShards.begin();
return TabletId == ArbiterColumnShard;
}

TCommitOperation(const ui64 tabletId)
Expand All @@ -293,13 +294,29 @@ class TCommitOperation {
auto& locks = evWrite.Record.GetLocks();
auto& lock = evWrite.Record.GetLocks().GetLocks()[0];
SendingShards = std::set<ui64>(locks.GetSendingShards().begin(), locks.GetSendingShards().end());
if ((ui32)locks.GetSendingShards().size() != SendingShards.size()) {
return TConclusionStatus::Fail("duplications in SendingShards proto field");
}
SendingColumnShards = std::set<ui64>(locks.GetSendingColumnShards().begin(), locks.GetSendingColumnShards().end());
ReceivingShards = std::set<ui64>(locks.GetReceivingShards().begin(), locks.GetReceivingShards().end());
if ((ui32)locks.GetReceivingShards().size() != ReceivingShards.size()) {
return TConclusionStatus::Fail("duplications in ReceivingShards proto field");
ReceivingColumnShards = std::set<ui64>(locks.GetReceivingColumnShards().begin(), locks.GetReceivingColumnShards().end());
HtapFormat = locks.HasArbiterColumnShard();
if (!ReceivingShards.size() || !SendingShards.size()) {
ReceivingShards.clear();
SendingShards.clear();
} else if (!HtapFormat) {
ArbiterColumnShard = *ReceivingShards.begin();
if (!ReceivingShards.contains(TabletId) && !SendingShards.contains(TabletId)) {
return TConclusionStatus::Fail("shard is incorrect for sending/receiving lists");
}
} else {
if (!ReceivingColumnShards.size() || !SendingColumnShards.size()) {
return TConclusionStatus::Fail("empty sending/receiving lists for columnshards is incorrect case");
}
ArbiterColumnShard = locks.GetArbiterColumnShard();
AFL_VERIFY(ArbiterColumnShard);
if (!ReceivingColumnShards.contains(TabletId) && !SendingColumnShards.contains(TabletId)) {
return TConclusionStatus::Fail("shard is incorrect for sending/receiving lists");
}
}

TxId = evWrite.Record.GetTxId();
LockId = lock.GetLockId();
Generation = lock.GetGeneration();
Expand All @@ -313,26 +330,36 @@ class TCommitOperation {
if (evWrite.Record.GetLocks().GetOp() != NKikimrDataEvents::TKqpLocks::Commit) {
return TConclusionStatus::Fail("incorrect message type");
}
if (!ReceivingShards.size() || !SendingShards.size()) {
ReceivingShards.clear();
SendingShards.clear();
} else {
if (!ReceivingShards.contains(TabletId) && !SendingShards.contains(TabletId)) {
return TConclusionStatus::Fail("shard is incorrect for sending/receiving lists");
}
}
return TConclusionStatus::Success();
}

std::unique_ptr<NColumnShard::TEvWriteCommitSyncTransactionOperator> CreateTxOperator(
const NKikimrTxColumnShard::ETransactionKind kind) const {
AFL_VERIFY(ReceivingShards.size());
if (IsPrimary()) {
return std::make_unique<NColumnShard::TEvWriteCommitPrimaryTransactionOperator>(
TFullTxInfo::BuildFake(kind), LockId, ReceivingShards, SendingShards);
if (HtapFormat) {
if (IsPrimary()) {
std::set<ui64> fullReceiving = ReceivingShards;
fullReceiving.insert(ReceivingColumnShards.begin(), ReceivingColumnShards.end());
AFL_VERIFY(fullReceiving.size() + 1 == ReceivingShards.size() + ReceivingColumnShards.size());

std::set<ui64> fullSending = SendingShards;
fullSending.insert(SendingColumnShards.begin(), SendingColumnShards.end());
AFL_VERIFY(fullSending.size() + 1 == SendingShards.size() + SendingColumnShards.size());

return std::make_unique<NColumnShard::TEvWriteCommitPrimaryTransactionOperator>(
TFullTxInfo::BuildFake(kind), LockId, fullReceiving, fullSending);
} else {
return std::make_unique<NColumnShard::TEvWriteCommitSecondaryTransactionOperator>(TFullTxInfo::BuildFake(kind), LockId,
ArbiterColumnShard, ReceivingColumnShards.contains(TabletId));
}
} else {
return std::make_unique<NColumnShard::TEvWriteCommitSecondaryTransactionOperator>(
TFullTxInfo::BuildFake(kind), LockId, *ReceivingShards.begin(), ReceivingShards.contains(TabletId));
if (IsPrimary()) {
return std::make_unique<NColumnShard::TEvWriteCommitPrimaryTransactionOperator>(
TFullTxInfo::BuildFake(kind), LockId, ReceivingShards, SendingShards);
} else {
return std::make_unique<NColumnShard::TEvWriteCommitSecondaryTransactionOperator>(TFullTxInfo::BuildFake(kind), LockId,
ArbiterColumnShard, ReceivingShards.contains(TabletId));
}
}
}

Expand All @@ -343,6 +370,9 @@ class TCommitOperation {
YDB_READONLY(ui64, TxId, 0);
YDB_READONLY_DEF(std::set<ui64>, SendingShards);
YDB_READONLY_DEF(std::set<ui64>, ReceivingShards);
YDB_READONLY_DEF(std::set<ui64>, SendingColumnShards);
YDB_READONLY_DEF(std::set<ui64>, ReceivingColumnShards);
ui64 ArbiterColumnShard = 0;
};

class TProposeWriteTransaction: public NTabletFlatExecutor::TTransactionBase<TColumnShard> {
Expand Down Expand Up @@ -430,7 +460,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
const auto source = ev->Sender;
const auto cookie = ev->Cookie;
const auto behaviourConclusion = TOperationsManager::GetBehaviour(*ev->Get());
// AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("ev_write", record.DebugString());
// AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("ev_write", record.DebugString());
if (behaviourConclusion.IsFail()) {
Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL);
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,9 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac
for (auto&& i : ReceivingShards) {
if (WaitShardsResultAck.contains(i)) {
NActors::TActivationContext::AsActorContext().Send(MakePipePerNodeCacheID(EPipePerNodeCache::Persistent),
new TEvPipeCache::TEvForward(
new TEvTxProcessing::TEvReadSet(0, GetTxId(), owner.TabletID(), i, owner.TabletID(), readSetData.SerializeAsString()), i,
true),
new TEvPipeCache::TEvForward(new TEvTxProcessing::TEvReadSet(TxInfo.PlanStep, GetTxId(), owner.TabletID(), i,
owner.TabletID(), readSetData.SerializeAsString()),
i, true),
IEventHandle::FlagTrackDelivery, GetTxId());
}
}
Expand Down

0 comments on commit 67cacf0

Please sign in to comment.