Skip to content

Commit f94a19d

Browse files
authored
Merge 5f7fab9 into 57cf0e9
2 parents 57cf0e9 + 5f7fab9 commit f94a19d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+1282
-694
lines changed

ydb/core/engine/minikql/minikql_engine_host.h

Lines changed: 1 addition & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#pragma once
22

3+
#include "minikql_engine_host_counters.h"
34
#include "change_collector_iface.h"
45

56
#include <util/generic/cast.h>
@@ -12,56 +13,6 @@
1213
namespace NKikimr {
1314
namespace NMiniKQL {
1415

15-
struct TEngineHostCounters {
16-
ui64 NSelectRow = 0;
17-
ui64 NSelectRange = 0;
18-
ui64 NUpdateRow = 0;
19-
ui64 NEraseRow = 0;
20-
21-
ui64 SelectRowRows = 0;
22-
ui64 SelectRowBytes = 0;
23-
ui64 SelectRangeRows = 0;
24-
ui64 SelectRangeBytes = 0;
25-
ui64 SelectRangeDeletedRowSkips = 0;
26-
ui64 UpdateRowBytes = 0;
27-
ui64 EraseRowBytes = 0;
28-
29-
ui64 InvisibleRowSkips = 0;
30-
31-
TEngineHostCounters& operator+=(const TEngineHostCounters& other) {
32-
NSelectRow += other.NSelectRow;
33-
NSelectRange += other.NSelectRange;
34-
NUpdateRow += other.NUpdateRow;
35-
NEraseRow += other.NEraseRow;
36-
SelectRowRows += other.SelectRowRows;
37-
SelectRowBytes += other.SelectRowBytes;
38-
SelectRangeRows += other.SelectRangeRows;
39-
SelectRangeBytes += other.SelectRangeBytes;
40-
SelectRangeDeletedRowSkips += other.SelectRangeDeletedRowSkips;
41-
UpdateRowBytes += other.UpdateRowBytes;
42-
EraseRowBytes += other.EraseRowBytes;
43-
InvisibleRowSkips += other.InvisibleRowSkips;
44-
return *this;
45-
}
46-
47-
TString ToString() const {
48-
return TStringBuilder()
49-
<< "{NSelectRow: " << NSelectRow
50-
<< ", NSelectRange: " << NSelectRange
51-
<< ", NUpdateRow: " << NUpdateRow
52-
<< ", NEraseRow: " << NEraseRow
53-
<< ", SelectRowRows: " << SelectRowRows
54-
<< ", SelectRowBytes: " << SelectRowBytes
55-
<< ", SelectRangeRows: " << SelectRangeRows
56-
<< ", SelectRangeBytes: " << SelectRangeBytes
57-
<< ", UpdateRowBytes: " << UpdateRowBytes
58-
<< ", EraseRowBytes: " << EraseRowBytes
59-
<< ", SelectRangeDeletedRowSkips: " << SelectRangeDeletedRowSkips
60-
<< ", InvisibleRowSkips: " << InvisibleRowSkips
61-
<< "}";
62-
}
63-
};
64-
6516
struct IKeyAccessSampler : public TThrRefBase {
6617
using TPtr = TIntrusivePtr<IKeyAccessSampler>;
6718
virtual void AddSample(const TTableId& tableId, const TArrayRef<const TCell>& key) = 0;
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
#pragma once
2+
3+
#include "util/string/builder.h"
4+
#include "util/system/types.h"
5+
6+
namespace NKikimr {
7+
namespace NMiniKQL {
8+
9+
struct TEngineHostCounters {
10+
ui64 NSelectRow = 0;
11+
ui64 NSelectRange = 0;
12+
ui64 NUpdateRow = 0;
13+
ui64 NEraseRow = 0;
14+
15+
ui64 SelectRowRows = 0;
16+
ui64 SelectRowBytes = 0;
17+
ui64 SelectRangeRows = 0;
18+
ui64 SelectRangeBytes = 0;
19+
ui64 SelectRangeDeletedRowSkips = 0;
20+
ui64 UpdateRowBytes = 0;
21+
ui64 EraseRowBytes = 0;
22+
23+
ui64 InvisibleRowSkips = 0;
24+
25+
TEngineHostCounters& operator+=(const TEngineHostCounters& other) {
26+
NSelectRow += other.NSelectRow;
27+
NSelectRange += other.NSelectRange;
28+
NUpdateRow += other.NUpdateRow;
29+
NEraseRow += other.NEraseRow;
30+
SelectRowRows += other.SelectRowRows;
31+
SelectRowBytes += other.SelectRowBytes;
32+
SelectRangeRows += other.SelectRangeRows;
33+
SelectRangeBytes += other.SelectRangeBytes;
34+
SelectRangeDeletedRowSkips += other.SelectRangeDeletedRowSkips;
35+
UpdateRowBytes += other.UpdateRowBytes;
36+
EraseRowBytes += other.EraseRowBytes;
37+
InvisibleRowSkips += other.InvisibleRowSkips;
38+
return *this;
39+
}
40+
41+
TString ToString() const {
42+
return TStringBuilder()
43+
<< "{NSelectRow: " << NSelectRow
44+
<< ", NSelectRange: " << NSelectRange
45+
<< ", NUpdateRow: " << NUpdateRow
46+
<< ", NEraseRow: " << NEraseRow
47+
<< ", SelectRowRows: " << SelectRowRows
48+
<< ", SelectRowBytes: " << SelectRowBytes
49+
<< ", SelectRangeRows: " << SelectRangeRows
50+
<< ", SelectRangeBytes: " << SelectRangeBytes
51+
<< ", UpdateRowBytes: " << UpdateRowBytes
52+
<< ", EraseRowBytes: " << EraseRowBytes
53+
<< ", SelectRangeDeletedRowSkips: " << SelectRangeDeletedRowSkips
54+
<< ", InvisibleRowSkips: " << InvisibleRowSkips
55+
<< "}";
56+
}
57+
};
58+
}}

ydb/core/protos/tx_datashard.proto

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1894,3 +1894,11 @@ message TEvOverloadReady {
18941894
message TEvOverloadUnsubscribe {
18951895
optional uint64 SeqNo = 1;
18961896
}
1897+
1898+
// Used for events serialization/deserialization
1899+
message TSerializedEvent {
1900+
// Serialized TEventPBBase event
1901+
optional bytes EventData = 1;
1902+
// TEventSerializationInfo::IsExtendedFormat flag
1903+
optional bool IsExtendedFormat = 2;
1904+
}

ydb/core/tx/datashard/check_write_unit.cpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,15 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op,
113113
return EExecutionStatus::Executed;
114114
}
115115

116-
writeOp->SetWriteResult(NEvents::TDataEvents::TEvWriteResult::BuildPrepared(DataShard.TabletID(), op->GetTxId(), {op->GetMinStep(), op->GetMaxStep(), {}}));
116+
writeOp->SetWriteResult(NEvents::TDataEvents::TEvWriteResult::BuildPrepared(
117+
DataShard.TabletID(),
118+
op->GetTxId(),
119+
{
120+
op->GetMinStep(),
121+
op->GetMaxStep(),
122+
DataShard.GetProcessingParams() ? DataShard.GetProcessingParams()->GetCoordinators() : google::protobuf::RepeatedField<ui64>{}
123+
}
124+
));
117125
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Prepared " << *op << " at " << DataShard.TabletID());
118126
}
119127

ydb/core/tx/datashard/complete_data_tx_unit.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ void TCompleteOperationUnit::CompleteOperation(TOperation::TPtr op,
9797
if (result) {
9898
result->Record.SetProposeLatency(duration.MilliSeconds());
9999

100-
DataShard.FillExecutionStats(op->GetExecutionProfile(), *result);
100+
DataShard.FillExecutionStats(op->GetExecutionProfile(), *result->Record.MutableTxStats());
101101

102102
if (!gSkipRepliesFailPoint.Check(DataShard.TabletID(), op->GetTxId())) {
103103
result->Orbit = std::move(op->Orbit);
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
#include "datashard_failpoints.h"
2+
#include "datashard_impl.h"
3+
#include "datashard_pipeline.h"
4+
#include "execution_unit_ctors.h"
5+
#include "probes.h"
6+
7+
#include <ydb/core/engine/minikql/minikql_engine_host.h>
8+
9+
LWTRACE_USING(DATASHARD_PROVIDER)
10+
11+
namespace NKikimr {
12+
namespace NDataShard {
13+
14+
using namespace NMiniKQL;
15+
16+
class TCompleteWriteUnit : public TExecutionUnit {
17+
public:
18+
TCompleteWriteUnit(TDataShard &dataShard, TPipeline &pipeline);
19+
~TCompleteWriteUnit() override;
20+
21+
bool IsReadyToExecute(TOperation::TPtr op) const override;
22+
EExecutionStatus Execute(TOperation::TPtr op, TTransactionContext &txc,const TActorContext &ctx) override;
23+
void Complete(TOperation::TPtr op, const TActorContext &ctx) override;
24+
25+
private:
26+
void CompleteWrite(TOperation::TPtr op, const TActorContext &ctx);
27+
};
28+
29+
TCompleteWriteUnit::TCompleteWriteUnit(TDataShard &dataShard, TPipeline &pipeline)
30+
: TExecutionUnit(EExecutionUnitKind::CompleteWrite, false, dataShard, pipeline)
31+
{
32+
}
33+
34+
TCompleteWriteUnit::~TCompleteWriteUnit()
35+
{
36+
}
37+
38+
bool TCompleteWriteUnit::IsReadyToExecute(TOperation::TPtr) const
39+
{
40+
return true;
41+
}
42+
43+
EExecutionStatus TCompleteWriteUnit::Execute(TOperation::TPtr op,
44+
TTransactionContext &txc,
45+
const TActorContext &ctx)
46+
{
47+
Pipeline.DeactivateOp(op, txc, ctx);
48+
49+
TOutputOpData::TResultPtr &result = op->Result();
50+
if (result) {
51+
auto execLatency = op->GetCompletedAt() - op->GetStartExecutionAt();
52+
result->Record.SetExecLatency(execLatency.MilliSeconds());
53+
}
54+
55+
if (result) {
56+
Pipeline.AddCompletingOp(op);
57+
}
58+
59+
// TODO: release snapshot used by a planned tx (not currently used)
60+
// TODO: prepared txs may be cancelled until planned, in which case we may
61+
// end up with a dangling snapshot reference. Such references would have
62+
// to be handled in a restart-safe manner too.
63+
Y_DEBUG_ABORT_UNLESS(!op->HasAcquiredSnapshotKey());
64+
65+
return EExecutionStatus::DelayComplete;
66+
}
67+
68+
void TCompleteWriteUnit::CompleteWrite(TOperation::TPtr op, const TActorContext& ctx)
69+
{
70+
auto duration = TAppData::TimeProvider->Now() - op->GetStartExecutionAt();
71+
72+
if (DataShard.GetDataTxProfileLogThresholdMs()
73+
&& duration.MilliSeconds() >= DataShard.GetDataTxProfileLogThresholdMs()) {
74+
LOG_WARN_S(ctx, NKikimrServices::TX_DATASHARD,
75+
op->ExecutionProfileLogString(DataShard.TabletID()));
76+
}
77+
78+
if (DataShard.GetDataTxProfileBufferThresholdMs()
79+
&& duration.MilliSeconds() >= DataShard.GetDataTxProfileBufferThresholdMs()) {
80+
Pipeline.HoldExecutionProfile(op);
81+
}
82+
83+
TWriteOperation* writeOp = TWriteOperation::CastWriteOperation(op);
84+
85+
auto result = writeOp->ReleaseWriteResult();
86+
if (result) {
87+
DataShard.FillExecutionStats(op->GetExecutionProfile(), *result->Record.MutableTxStats());
88+
89+
if (!gSkipRepliesFailPoint.Check(DataShard.TabletID(), op->GetTxId())) {
90+
result->SetOrbit(std::move(op->Orbit));
91+
DataShard.SendWriteResult(ctx, result, op->GetTarget(), op->GetStep(), op->GetTxId());
92+
}
93+
}
94+
95+
Pipeline.RemoveCompletingOp(op);
96+
}
97+
98+
void TCompleteWriteUnit::Complete(TOperation::TPtr op, const TActorContext &ctx)
99+
{
100+
Pipeline.RemoveCommittingOp(op);
101+
Pipeline.RemoveTx(op->GetStepOrder());
102+
DataShard.IncCounter(COUNTER_WRITE_SUCCESS);
103+
104+
CompleteWrite(op, ctx);
105+
106+
DataShard.SendDelayedAcks(ctx, op->DelayedAcks());
107+
108+
DataShard.EnqueueChangeRecords(std::move(op->ChangeRecords()));
109+
DataShard.EmitHeartbeats();
110+
}
111+
112+
THolder<TExecutionUnit> CreateCompleteWriteUnit(TDataShard &dataShard, TPipeline &pipeline)
113+
{
114+
return THolder(new TCompleteWriteUnit(dataShard, pipeline));
115+
}
116+
117+
} // namespace NDataShard
118+
} // namespace NKikimr

ydb/core/tx/datashard/datashard.cpp

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -637,14 +637,33 @@ void TDataShard::SendResult(const TActorContext &ctx,
637637
ctx.Send(target, res.Release(), flags);
638638
}
639639

640-
void TDataShard::FillExecutionStats(const TExecutionProfile& execProfile, TEvDataShard::TEvProposeTransactionResult& result) const {
640+
void TDataShard::SendWriteResult(const TActorContext& ctx, std::unique_ptr<NEvents::TDataEvents::TEvWriteResult>& result, const TActorId& target, ui64 step, ui64 txId) {
641+
Y_ABORT_UNLESS(txId == result->Record.GetTxId(), "%" PRIu64 " vs %" PRIu64, txId, result->Record.GetTxId());
642+
643+
// TODO: Volatile
644+
/*
645+
if (VolatileTxManager.FindByTxId(txId)) {
646+
// This is a volatile transaction, and we need to wait until it is resolved
647+
bool ok = VolatileTxManager.AttachVolatileTxCallback(txId, new TSendVolatileResult(this, std::move(result), target, step, txId));
648+
Y_ABORT_UNLESS(ok);
649+
return;
650+
}
651+
*/
652+
653+
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Complete write [" << step << " : " << txId << "] from " << TabletID() << " at tablet " << TabletID() << " send result to client " << target);
654+
655+
LWTRACK(ProposeTransactionSendResult, result->GetOrbit());
656+
ctx.Send(target, result.release(), 0);
657+
}
658+
659+
void TDataShard::FillExecutionStats(const TExecutionProfile& execProfile, NKikimrQueryStats::TTxStats& txStats) const {
641660
TDuration totalCpuTime;
642661
for (const auto& unit : execProfile.UnitProfiles) {
643662
totalCpuTime += unit.second.ExecuteTime;
644663
totalCpuTime += unit.second.CompleteTime;
645664
}
646-
result.Record.MutableTxStats()->MutablePerShardStats()->Clear();
647-
auto& stats = *result.Record.MutableTxStats()->AddPerShardStats();
665+
txStats.MutablePerShardStats()->Clear();
666+
auto& stats = *txStats.AddPerShardStats();
648667
stats.SetShardId(TabletID());
649668
stats.SetCpuTimeUsec(totalCpuTime.MicroSeconds());
650669
}

ydb/core/tx/datashard/datashard__write.cpp

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,9 @@ bool TDataShard::TTxWrite::Execute(TTransactionContext& txc, const TActorContext
7272
return true;
7373
}
7474

75-
TOperation::TPtr op = Self->Pipeline.BuildOperation(Ev, ReceivedAt, TieBreakerIndex, txc, ctx, std::move(DatashardTransactionSpan));
75+
TOperation::TPtr op = Self->Pipeline.BuildOperation(std::move(Ev), ReceivedAt, TieBreakerIndex, txc, std::move(DatashardTransactionSpan));
76+
Y_ABORT_UNLESS(!Ev);
77+
7678
TWriteOperation* writeOp = TWriteOperation::CastWriteOperation(op);
7779

7880
// Unsuccessful operation parse.
@@ -90,7 +92,6 @@ bool TDataShard::TTxWrite::Execute(TTransactionContext& txc, const TActorContext
9092
Self->Pipeline.GetExecutionUnit(op->GetCurrentUnit()).AddOperation(op);
9193

9294
Op = op;
93-
Ev = nullptr;
9495
Op->IncrementInProgress();
9596
}
9697

@@ -292,4 +293,22 @@ NKikimrDataEvents::TEvWriteResult::EStatus EvWrite::Convertor::ConvertErrCode(NK
292293
return NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR;
293294
}
294295
}
296+
297+
TOperation::TPtr EvWrite::Convertor::MakeOperation(EOperationKind kind, const TBasicOpInfo& info, ui64 tabletId) {
298+
switch (kind) {
299+
case EOperationKind::DataTx:
300+
case EOperationKind::SchemeTx:
301+
case EOperationKind::Snapshot:
302+
case EOperationKind::DistributedErase:
303+
case EOperationKind::CommitWrites:
304+
case EOperationKind::ReadTable:
305+
return MakeIntrusive<TActiveTransaction>(info);
306+
case EOperationKind::WriteTx:
307+
return MakeIntrusive<TWriteOperation>(info, tabletId);
308+
case EOperationKind::DirectTx:
309+
case EOperationKind::ReadTx:
310+
case EOperationKind::Unknown:
311+
Y_ABORT("Unsupported");
312+
}
313+
}
295314
}

0 commit comments

Comments
 (0)