Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
042e320
TStoreWriteUnit
azevaykin Feb 5, 2024
90a7a28
TLoadWriteDetailsUnit
azevaykin Feb 6, 2024
1f9194a
Memory optimization
azevaykin Feb 7, 2024
01c0442
Tests
azevaykin Feb 7, 2024
7199f6c
TableAccessStats
azevaykin Feb 7, 2024
bff34e5
TEngineHostCounters in separate file
azevaykin Feb 7, 2024
ffe8f7b
Replace prepared EvProposeTransaction with EvWrite
azevaykin Feb 7, 2024
6abead2
Trivial fixes
azevaykin Feb 7, 2024
55b0103
Remove TFinalizeWriteTxPlanUnit
azevaykin Feb 8, 2024
879f2e3
std::optional KqpLocks
azevaykin Feb 8, 2024
42a8ba6
IsExtendedFormat
azevaykin Feb 8, 2024
1e2ca73
TPayloadHelper build fix
azevaykin Feb 9, 2024
86d5cfb
Store Record instead of Event
azevaykin Feb 9, 2024
4ad3ca6
GetTxBody/SetTxBody use proto for IsExtendedFormat
azevaykin Feb 9, 2024
5fb4675
bytes EventData
azevaykin Feb 9, 2024
720e6ec
Send propose to coordinator
azevaykin Feb 9, 2024
a0bd26a
remove TUserTable* TableInfo form TValidatedWriteTx
azevaykin Feb 12, 2024
e379b15
TEventHandle Leak fix. It should be under TAutoPtr to destroy it.
azevaykin Feb 12, 2024
b77853a
move event
azevaykin Feb 12, 2024
a833bd9
Rename Record->WriteRequest
azevaykin Feb 12, 2024
47a1991
Specify table in ReplaceEvProposeTransactionWithEvWrite
azevaykin Feb 12, 2024
cf14a70
TCompleteOperationUnit fix
azevaykin Feb 13, 2024
1eb47f8
KqpPrepareInReadsets fix
azevaykin Feb 13, 2024
1014949
static_pointer_cast
azevaykin Feb 13, 2024
10b0137
MakeOperation
azevaykin Feb 13, 2024
5f7fab9
TCompleteWriteUnit & TExecuteWriteUnit
azevaykin Feb 13, 2024
2991ba6
GetWriteResult
azevaykin Feb 14, 2024
46b95e4
NEvWrite::TConvertor
azevaykin Feb 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 1 addition & 50 deletions ydb/core/engine/minikql/minikql_engine_host.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "minikql_engine_host_counters.h"
#include "change_collector_iface.h"

#include <util/generic/cast.h>
Expand All @@ -12,56 +13,6 @@
namespace NKikimr {
namespace NMiniKQL {

struct TEngineHostCounters {
ui64 NSelectRow = 0;
ui64 NSelectRange = 0;
ui64 NUpdateRow = 0;
ui64 NEraseRow = 0;

ui64 SelectRowRows = 0;
ui64 SelectRowBytes = 0;
ui64 SelectRangeRows = 0;
ui64 SelectRangeBytes = 0;
ui64 SelectRangeDeletedRowSkips = 0;
ui64 UpdateRowBytes = 0;
ui64 EraseRowBytes = 0;

ui64 InvisibleRowSkips = 0;

TEngineHostCounters& operator+=(const TEngineHostCounters& other) {
NSelectRow += other.NSelectRow;
NSelectRange += other.NSelectRange;
NUpdateRow += other.NUpdateRow;
NEraseRow += other.NEraseRow;
SelectRowRows += other.SelectRowRows;
SelectRowBytes += other.SelectRowBytes;
SelectRangeRows += other.SelectRangeRows;
SelectRangeBytes += other.SelectRangeBytes;
SelectRangeDeletedRowSkips += other.SelectRangeDeletedRowSkips;
UpdateRowBytes += other.UpdateRowBytes;
EraseRowBytes += other.EraseRowBytes;
InvisibleRowSkips += other.InvisibleRowSkips;
return *this;
}

TString ToString() const {
return TStringBuilder()
<< "{NSelectRow: " << NSelectRow
<< ", NSelectRange: " << NSelectRange
<< ", NUpdateRow: " << NUpdateRow
<< ", NEraseRow: " << NEraseRow
<< ", SelectRowRows: " << SelectRowRows
<< ", SelectRowBytes: " << SelectRowBytes
<< ", SelectRangeRows: " << SelectRangeRows
<< ", SelectRangeBytes: " << SelectRangeBytes
<< ", UpdateRowBytes: " << UpdateRowBytes
<< ", EraseRowBytes: " << EraseRowBytes
<< ", SelectRangeDeletedRowSkips: " << SelectRangeDeletedRowSkips
<< ", InvisibleRowSkips: " << InvisibleRowSkips
<< "}";
}
};

struct IKeyAccessSampler : public TThrRefBase {
using TPtr = TIntrusivePtr<IKeyAccessSampler>;
virtual void AddSample(const TTableId& tableId, const TArrayRef<const TCell>& key) = 0;
Expand Down
58 changes: 58 additions & 0 deletions ydb/core/engine/minikql/minikql_engine_host_counters.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#pragma once

#include "util/string/builder.h"
#include "util/system/types.h"

namespace NKikimr {
namespace NMiniKQL {

struct TEngineHostCounters {
ui64 NSelectRow = 0;
ui64 NSelectRange = 0;
ui64 NUpdateRow = 0;
ui64 NEraseRow = 0;

ui64 SelectRowRows = 0;
ui64 SelectRowBytes = 0;
ui64 SelectRangeRows = 0;
ui64 SelectRangeBytes = 0;
ui64 SelectRangeDeletedRowSkips = 0;
ui64 UpdateRowBytes = 0;
ui64 EraseRowBytes = 0;

ui64 InvisibleRowSkips = 0;

TEngineHostCounters& operator+=(const TEngineHostCounters& other) {
NSelectRow += other.NSelectRow;
NSelectRange += other.NSelectRange;
NUpdateRow += other.NUpdateRow;
NEraseRow += other.NEraseRow;
SelectRowRows += other.SelectRowRows;
SelectRowBytes += other.SelectRowBytes;
SelectRangeRows += other.SelectRangeRows;
SelectRangeBytes += other.SelectRangeBytes;
SelectRangeDeletedRowSkips += other.SelectRangeDeletedRowSkips;
UpdateRowBytes += other.UpdateRowBytes;
EraseRowBytes += other.EraseRowBytes;
InvisibleRowSkips += other.InvisibleRowSkips;
return *this;
}

TString ToString() const {
return TStringBuilder()
<< "{NSelectRow: " << NSelectRow
<< ", NSelectRange: " << NSelectRange
<< ", NUpdateRow: " << NUpdateRow
<< ", NEraseRow: " << NEraseRow
<< ", SelectRowRows: " << SelectRowRows
<< ", SelectRowBytes: " << SelectRowBytes
<< ", SelectRangeRows: " << SelectRangeRows
<< ", SelectRangeBytes: " << SelectRangeBytes
<< ", UpdateRowBytes: " << UpdateRowBytes
<< ", EraseRowBytes: " << EraseRowBytes
<< ", SelectRangeDeletedRowSkips: " << SelectRangeDeletedRowSkips
<< ", InvisibleRowSkips: " << InvisibleRowSkips
<< "}";
}
};
}}
8 changes: 8 additions & 0 deletions ydb/core/protos/tx_datashard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1894,3 +1894,11 @@ message TEvOverloadReady {
message TEvOverloadUnsubscribe {
optional uint64 SeqNo = 1;
}

// Used for events serialization/deserialization
message TSerializedEvent {
// Serialized TEventPBBase event
optional bytes EventData = 1;
// TEventSerializationInfo::IsExtendedFormat flag
optional bool IsExtendedFormat = 2;
}
10 changes: 9 additions & 1 deletion ydb/core/tx/datashard/check_write_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,15 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op,
return EExecutionStatus::Executed;
}

writeOp->SetWriteResult(NEvents::TDataEvents::TEvWriteResult::BuildPrepared(DataShard.TabletID(), op->GetTxId(), {op->GetMinStep(), op->GetMaxStep(), {}}));
writeOp->SetWriteResult(NEvents::TDataEvents::TEvWriteResult::BuildPrepared(
DataShard.TabletID(),
op->GetTxId(),
{
op->GetMinStep(),
op->GetMaxStep(),
DataShard.GetProcessingParams() ? DataShard.GetProcessingParams()->GetCoordinators() : google::protobuf::RepeatedField<ui64>{}
}
));
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Prepared " << *op << " at " << DataShard.TabletID());
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/complete_data_tx_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ void TCompleteOperationUnit::CompleteOperation(TOperation::TPtr op,
if (result) {
result->Record.SetProposeLatency(duration.MilliSeconds());

DataShard.FillExecutionStats(op->GetExecutionProfile(), *result);
DataShard.FillExecutionStats(op->GetExecutionProfile(), *result->Record.MutableTxStats());

if (!gSkipRepliesFailPoint.Check(DataShard.TabletID(), op->GetTxId())) {
result->Orbit = std::move(op->Orbit);
Expand Down
114 changes: 114 additions & 0 deletions ydb/core/tx/datashard/complete_write_unit.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
#include "datashard_failpoints.h"
#include "datashard_impl.h"
#include "datashard_pipeline.h"
#include "execution_unit_ctors.h"
#include "probes.h"

#include <ydb/core/engine/minikql/minikql_engine_host.h>

LWTRACE_USING(DATASHARD_PROVIDER)

namespace NKikimr {
namespace NDataShard {

using namespace NMiniKQL;

class TCompleteWriteUnit : public TExecutionUnit {
public:
TCompleteWriteUnit(TDataShard &dataShard, TPipeline &pipeline);
~TCompleteWriteUnit() override;

bool IsReadyToExecute(TOperation::TPtr op) const override;
EExecutionStatus Execute(TOperation::TPtr op, TTransactionContext &txc,const TActorContext &ctx) override;
void Complete(TOperation::TPtr op, const TActorContext &ctx) override;

private:
void CompleteWrite(TOperation::TPtr op, const TActorContext &ctx);
};

TCompleteWriteUnit::TCompleteWriteUnit(TDataShard &dataShard, TPipeline &pipeline)
: TExecutionUnit(EExecutionUnitKind::CompleteWrite, false, dataShard, pipeline)
{
}

TCompleteWriteUnit::~TCompleteWriteUnit()
{
}

bool TCompleteWriteUnit::IsReadyToExecute(TOperation::TPtr) const
{
return true;
}

EExecutionStatus TCompleteWriteUnit::Execute(TOperation::TPtr op,
TTransactionContext &txc,
const TActorContext &ctx)
{
TWriteOperation* writeOp = TWriteOperation::CastWriteOperation(op);

Pipeline.DeactivateOp(op, txc, ctx);

if (writeOp->GetWriteResult()) {
Pipeline.AddCompletingOp(op);
}

// TODO: release snapshot used by a planned tx (not currently used)
// TODO: prepared txs may be cancelled until planned, in which case we may
// end up with a dangling snapshot reference. Such references would have
// to be handled in a restart-safe manner too.
Y_DEBUG_ABORT_UNLESS(!op->HasAcquiredSnapshotKey());

return EExecutionStatus::DelayComplete;
}

void TCompleteWriteUnit::CompleteWrite(TOperation::TPtr op, const TActorContext& ctx)
{
auto duration = TAppData::TimeProvider->Now() - op->GetStartExecutionAt();

if (DataShard.GetDataTxProfileLogThresholdMs()
&& duration.MilliSeconds() >= DataShard.GetDataTxProfileLogThresholdMs()) {
LOG_WARN_S(ctx, NKikimrServices::TX_DATASHARD,
op->ExecutionProfileLogString(DataShard.TabletID()));
}

if (DataShard.GetDataTxProfileBufferThresholdMs()
&& duration.MilliSeconds() >= DataShard.GetDataTxProfileBufferThresholdMs()) {
Pipeline.HoldExecutionProfile(op);
}

TWriteOperation* writeOp = TWriteOperation::CastWriteOperation(op);

auto result = writeOp->ReleaseWriteResult();
if (result) {
DataShard.FillExecutionStats(op->GetExecutionProfile(), *result->Record.MutableTxStats());

if (!gSkipRepliesFailPoint.Check(DataShard.TabletID(), op->GetTxId())) {
result->SetOrbit(std::move(op->Orbit));
DataShard.SendWriteResult(ctx, result, op->GetTarget(), op->GetStep(), op->GetTxId());
}
}

Pipeline.RemoveCompletingOp(op);
}

void TCompleteWriteUnit::Complete(TOperation::TPtr op, const TActorContext &ctx)
{
Pipeline.RemoveCommittingOp(op);
Pipeline.RemoveTx(op->GetStepOrder());
DataShard.IncCounter(COUNTER_WRITE_SUCCESS);

CompleteWrite(op, ctx);

DataShard.SendDelayedAcks(ctx, op->DelayedAcks());

DataShard.EnqueueChangeRecords(std::move(op->ChangeRecords()));
DataShard.EmitHeartbeats();
}

THolder<TExecutionUnit> CreateCompleteWriteUnit(TDataShard &dataShard, TPipeline &pipeline)
{
return THolder(new TCompleteWriteUnit(dataShard, pipeline));
}

} // namespace NDataShard
} // namespace NKikimr
25 changes: 22 additions & 3 deletions ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -637,14 +637,33 @@ void TDataShard::SendResult(const TActorContext &ctx,
ctx.Send(target, res.Release(), flags);
}

void TDataShard::FillExecutionStats(const TExecutionProfile& execProfile, TEvDataShard::TEvProposeTransactionResult& result) const {
void TDataShard::SendWriteResult(const TActorContext& ctx, std::unique_ptr<NEvents::TDataEvents::TEvWriteResult>& result, const TActorId& target, ui64 step, ui64 txId) {
Y_ABORT_UNLESS(txId == result->Record.GetTxId(), "%" PRIu64 " vs %" PRIu64, txId, result->Record.GetTxId());

// TODO: Volatile
/*
if (VolatileTxManager.FindByTxId(txId)) {
// This is a volatile transaction, and we need to wait until it is resolved
bool ok = VolatileTxManager.AttachVolatileTxCallback(txId, new TSendVolatileResult(this, std::move(result), target, step, txId));
Y_ABORT_UNLESS(ok);
return;
}
*/

LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Complete write [" << step << " : " << txId << "] from " << TabletID() << " at tablet " << TabletID() << " send result to client " << target);

LWTRACK(ProposeTransactionSendResult, result->GetOrbit());
ctx.Send(target, result.release(), 0);
}

void TDataShard::FillExecutionStats(const TExecutionProfile& execProfile, NKikimrQueryStats::TTxStats& txStats) const {
TDuration totalCpuTime;
for (const auto& unit : execProfile.UnitProfiles) {
totalCpuTime += unit.second.ExecuteTime;
totalCpuTime += unit.second.CompleteTime;
}
result.Record.MutableTxStats()->MutablePerShardStats()->Clear();
auto& stats = *result.Record.MutableTxStats()->AddPerShardStats();
txStats.MutablePerShardStats()->Clear();
auto& stats = *txStats.AddPerShardStats();
stats.SetShardId(TabletID());
stats.SetCpuTimeUsec(totalCpuTime.MicroSeconds());
}
Expand Down
Loading