Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions ydb/core/tx/columnshard/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,6 @@ ui64 TColumnShard::MemoryUsage() const {
ui64 memory =
ProgressTxController->GetMemoryUsage() +
ScanTxInFlight.size() * (sizeof(ui64) + sizeof(TInstant)) +
AltersInFlight.size() * sizeof(TAlterMeta) +
CommitsInFlight.size() * sizeof(TCommitMeta) +
LongTxWrites.size() * (sizeof(TWriteId) + sizeof(TLongTxWriteInfo)) +
LongTxWritesByUniqueId.size() * (sizeof(TULID) + sizeof(void*)) +
(WaitingScans.size()) * (sizeof(NOlap::TSnapshot) + sizeof(void*)) +
Expand Down
42 changes: 1 addition & 41 deletions ydb/core/tx/columnshard/columnshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ void TTxInit::SetDefaults() {
Self->LastPlannedTxId = 0;
Self->OwnerPathId = 0;
Self->OwnerPath.clear();
Self->AltersInFlight.clear();
Self->CommitsInFlight.clear();
Self->LongTxWrites.clear();
Self->LongTxWritesByUniqueId.clear();
}
Expand Down Expand Up @@ -180,18 +178,6 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx)
}
}

{
TMemoryProfileGuard g("TTxInit/CommitsInFlight");
for (const auto& pr : Self->CommitsInFlight) {
ui64 txId = pr.first;
for (TWriteId writeId : pr.second.WriteIds) {
Y_ABORT_UNLESS(Self->LongTxWrites.contains(writeId),
"TTxInit at %" PRIu64 " : Commit %" PRIu64 " references local write %" PRIu64 " that doesn't exist",
Self->TabletID(), txId, writeId);
Self->AddLongTxWrite(writeId, txId);
}
}
}
Self->UpdateInsertTableCounters();
Self->UpdateIndexCounters();
Self->UpdateResourceMetrics(ctx, {});
Expand Down Expand Up @@ -219,6 +205,7 @@ bool TTxInit::Execute(TTransactionContext& txc, const TActorContext& ctx) {
}

void TTxInit::Complete(const TActorContext& ctx) {
Self->ProgressTxController->OnTabletInit();
Self->SwitchToWork(ctx);
}

Expand Down Expand Up @@ -377,31 +364,4 @@ void TColumnShard::Handle(TEvPrivate::TEvNormalizerResult::TPtr& ev, const TActo
Execute(new TTxApplyNormalizer(this, ev->Get()->GetChanges()), ctx);
}

bool TColumnShard::LoadTx(const ui64 txId, const NKikimrTxColumnShard::ETransactionKind& txKind, const TString& txBody) {
switch (txKind) {
case NKikimrTxColumnShard::TX_KIND_SCHEMA: {
TColumnShard::TAlterMeta meta;
Y_ABORT_UNLESS(meta.Body.ParseFromString(txBody));
AltersInFlight.emplace(txId, std::move(meta));
break;
}
case NKikimrTxColumnShard::TX_KIND_COMMIT: {
NKikimrTxColumnShard::TCommitTxBody body;
Y_ABORT_UNLESS(body.ParseFromString(txBody));

TColumnShard::TCommitMeta meta;
for (auto& id : body.GetWriteIds()) {
meta.AddWriteId(TWriteId{id});
}

CommitsInFlight.emplace(txId, std::move(meta));
break;
}
default: {
Y_ABORT("Unsupported TxKind stored in the TxInfo table");
}
}
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@ class TTxNotifyTxCompletion : public TTransactionBase<TColumnShard> {
LOG_S_DEBUG("TTxNotifyTxCompletion.Execute at tablet " << Self->TabletID());

const ui64 txId = Ev->Get()->Record.GetTxId();

if (Self->AltersInFlight.contains(txId)) {
Self->AltersInFlight[txId].NotifySubscribers.insert(Ev->Sender);
auto txOperator = Self->ProgressTxController->GetTxOperator(txId);
if (txOperator) {
txOperator->RegisterSubscriber(Ev->Sender);
return true;
}

Result.reset(new TEvColumnShard::TEvNotifyTxCompletionResult(Self->TabletID(), txId));
return true;
}
Expand Down
121 changes: 5 additions & 116 deletions ydb/core/tx/columnshard/columnshard__progress_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,55 +7,6 @@
namespace NKikimr::NColumnShard {

class TColumnShard::TTxProgressTx : public TTransactionBase<TColumnShard> {
private:
struct TEvent {
TActorId Target;
ui64 Cookie;
THolder<IEventBase> Event;

TEvent(TActorId target, ui64 cookie, THolder<IEventBase> event)
: Target(target)
, Cookie(cookie)
, Event(std::move(event))
{ }
};

struct TResultEvent {
TTxController::TBasicTxInfo TxInfo;
NKikimrTxColumnShard::EResultStatus Status;

TResultEvent(TTxController::TBasicTxInfo&& txInfo, NKikimrTxColumnShard::EResultStatus status)
: TxInfo(std::move(txInfo))
, Status(status)
{}

std::unique_ptr<IEventBase> MakeEvent(ui64 tabletId) const {
if (TxInfo.TxKind == NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE) {
auto result = NEvents::TDataEvents::TEvWriteResult::BuildCommited(tabletId, TxInfo.TxId);
return result;
} else {
auto result = std::make_unique<TEvColumnShard::TEvProposeTransactionResult>(
tabletId, TxInfo.TxKind, TxInfo.TxId, Status);
result->Record.SetStep(TxInfo.PlanStep);
return result;
}
}
};

enum class ETriggerActivities {
NONE,
POST_INSERT,
POST_SCHEMA
};

TStringBuilder TxPrefix() const {
return TStringBuilder() << "TxProgressTx[" << ToString(TabletTxNo) << "] ";
}

TString TxSuffix() const {
return TStringBuilder() << " at tablet " << Self->TabletID();
}

public:
TTxProgressTx(TColumnShard* self)
: TTransactionBase(self)
Expand All @@ -81,60 +32,8 @@ class TColumnShard::TTxProgressTx : public TTransactionBase<TColumnShard> {
ui64 step = plannedItem->PlanStep;
ui64 txId = plannedItem->TxId;

TTxController::TBasicTxInfo txInfo = *plannedItem;
switch (txInfo.TxKind) {
case NKikimrTxColumnShard::TX_KIND_SCHEMA:
{
auto& meta = Self->AltersInFlight.at(txId);
Self->RunSchemaTx(meta.Body, NOlap::TSnapshot(step, txId), txc);
Self->ProtectSchemaSeqNo(meta.Body.GetSeqNo(), txc);
for (TActorId subscriber : meta.NotifySubscribers) {
TxEvents.emplace_back(subscriber, 0,
MakeHolder<TEvColumnShard::TEvNotifyTxCompletionResult>(Self->TabletID(), txId));
}
Self->AltersInFlight.erase(txId);
Trigger = ETriggerActivities::POST_SCHEMA;
break;
}
case NKikimrTxColumnShard::TX_KIND_COMMIT: {
const auto& meta = Self->CommitsInFlight.at(txId);

TBlobGroupSelector dsGroupSelector(Self->Info());
NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);

auto pathExists = [&](ui64 pathId) {
return Self->TablesManager.HasTable(pathId);
};

auto counters = Self->InsertTable->Commit(dbTable, step, txId, meta.WriteIds,
pathExists);
Self->IncCounter(COUNTER_BLOBS_COMMITTED, counters.Rows);
Self->IncCounter(COUNTER_BYTES_COMMITTED, counters.Bytes);
Self->IncCounter(COUNTER_RAW_BYTES_COMMITTED, counters.RawBytes);

NIceDb::TNiceDb db(txc.DB);
for (TWriteId writeId : meta.WriteIds) {
Self->RemoveLongTxWrite(db, writeId, txId);
}
Self->CommitsInFlight.erase(txId);
Self->UpdateInsertTableCounters();
Trigger = ETriggerActivities::POST_INSERT;
break;
}
case NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE: {
NOlap::TSnapshot snapshot(step, txId);
Y_ABORT_UNLESS(Self->OperationsManager->CommitTransaction(*Self, txId, txc, snapshot));
Trigger = ETriggerActivities::POST_INSERT;
break;
}
default: {
Y_ABORT("Unexpected TxKind");
}
}

// Currently transactions never fail and there are no dependencies between them
TxResults.emplace_back(TResultEvent(std::move(txInfo), NKikimrTxColumnShard::SUCCESS));

TxOperator = Self->ProgressTxController->GetVerifiedTxOperator(txId);
AFL_VERIFY(TxOperator->Progress(*Self, NOlap::TSnapshot(step, txId), txc));
Self->ProgressTxController->FinishPlannedTx(txId, txc);
Self->RescheduleWaitingReads();
}
Expand All @@ -148,24 +47,14 @@ class TColumnShard::TTxProgressTx : public TTransactionBase<TColumnShard> {

void Complete(const TActorContext& ctx) override {
NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "complete");

for (auto& rec : TxEvents) {
ctx.Send(rec.Target, rec.Event.Release(), 0, rec.Cookie);
}

for (auto& res : TxResults) {
Self->ProgressTxController->CompleteRunningTx(TTxController::TPlanQueueItem(res.TxInfo.PlanStep, res.TxInfo.TxId));

auto event = res.MakeEvent(Self->TabletID());
ctx.Send(res.TxInfo.Source, event.release(), 0, res.TxInfo.Cookie);
if (TxOperator) {
TxOperator->Complete(*Self, ctx);
}
Self->SetupIndexation();
}

private:
std::vector<TResultEvent> TxResults;
std::vector<TEvent> TxEvents;
ETriggerActivities Trigger{ETriggerActivities::NONE};
TTxController::ITransactionOperatior::TPtr TxOperator;
const ui32 TabletTxNo;
};

Expand Down
Loading