Skip to content

Commit 64bee25

Browse files
authored
Merge bb04d46 into 8c72ada
2 parents 8c72ada + bb04d46 commit 64bee25

36 files changed

+950
-473
lines changed

ydb/core/engine/minikql/minikql_engine_host.h

Lines changed: 1 addition & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#pragma once
22

3+
#include "minikql_engine_host_counters.h"
34
#include "change_collector_iface.h"
45

56
#include <util/generic/cast.h>
@@ -12,56 +13,6 @@
1213
namespace NKikimr {
1314
namespace NMiniKQL {
1415

15-
struct TEngineHostCounters {
16-
ui64 NSelectRow = 0;
17-
ui64 NSelectRange = 0;
18-
ui64 NUpdateRow = 0;
19-
ui64 NEraseRow = 0;
20-
21-
ui64 SelectRowRows = 0;
22-
ui64 SelectRowBytes = 0;
23-
ui64 SelectRangeRows = 0;
24-
ui64 SelectRangeBytes = 0;
25-
ui64 SelectRangeDeletedRowSkips = 0;
26-
ui64 UpdateRowBytes = 0;
27-
ui64 EraseRowBytes = 0;
28-
29-
ui64 InvisibleRowSkips = 0;
30-
31-
TEngineHostCounters& operator+=(const TEngineHostCounters& other) {
32-
NSelectRow += other.NSelectRow;
33-
NSelectRange += other.NSelectRange;
34-
NUpdateRow += other.NUpdateRow;
35-
NEraseRow += other.NEraseRow;
36-
SelectRowRows += other.SelectRowRows;
37-
SelectRowBytes += other.SelectRowBytes;
38-
SelectRangeRows += other.SelectRangeRows;
39-
SelectRangeBytes += other.SelectRangeBytes;
40-
SelectRangeDeletedRowSkips += other.SelectRangeDeletedRowSkips;
41-
UpdateRowBytes += other.UpdateRowBytes;
42-
EraseRowBytes += other.EraseRowBytes;
43-
InvisibleRowSkips += other.InvisibleRowSkips;
44-
return *this;
45-
}
46-
47-
TString ToString() const {
48-
return TStringBuilder()
49-
<< "{NSelectRow: " << NSelectRow
50-
<< ", NSelectRange: " << NSelectRange
51-
<< ", NUpdateRow: " << NUpdateRow
52-
<< ", NEraseRow: " << NEraseRow
53-
<< ", SelectRowRows: " << SelectRowRows
54-
<< ", SelectRowBytes: " << SelectRowBytes
55-
<< ", SelectRangeRows: " << SelectRangeRows
56-
<< ", SelectRangeBytes: " << SelectRangeBytes
57-
<< ", UpdateRowBytes: " << UpdateRowBytes
58-
<< ", EraseRowBytes: " << EraseRowBytes
59-
<< ", SelectRangeDeletedRowSkips: " << SelectRangeDeletedRowSkips
60-
<< ", InvisibleRowSkips: " << InvisibleRowSkips
61-
<< "}";
62-
}
63-
};
64-
6516
struct IKeyAccessSampler : public TThrRefBase {
6617
using TPtr = TIntrusivePtr<IKeyAccessSampler>;
6718
virtual void AddSample(const TTableId& tableId, const TArrayRef<const TCell>& key) = 0;
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
#pragma once
2+
3+
#include "util/string/builder.h"
4+
#include "util/system/types.h"
5+
6+
namespace NKikimr {
7+
namespace NMiniKQL {
8+
9+
struct TEngineHostCounters {
10+
ui64 NSelectRow = 0;
11+
ui64 NSelectRange = 0;
12+
ui64 NUpdateRow = 0;
13+
ui64 NEraseRow = 0;
14+
15+
ui64 SelectRowRows = 0;
16+
ui64 SelectRowBytes = 0;
17+
ui64 SelectRangeRows = 0;
18+
ui64 SelectRangeBytes = 0;
19+
ui64 SelectRangeDeletedRowSkips = 0;
20+
ui64 UpdateRowBytes = 0;
21+
ui64 EraseRowBytes = 0;
22+
23+
ui64 InvisibleRowSkips = 0;
24+
25+
TEngineHostCounters& operator+=(const TEngineHostCounters& other) {
26+
NSelectRow += other.NSelectRow;
27+
NSelectRange += other.NSelectRange;
28+
NUpdateRow += other.NUpdateRow;
29+
NEraseRow += other.NEraseRow;
30+
SelectRowRows += other.SelectRowRows;
31+
SelectRowBytes += other.SelectRowBytes;
32+
SelectRangeRows += other.SelectRangeRows;
33+
SelectRangeBytes += other.SelectRangeBytes;
34+
SelectRangeDeletedRowSkips += other.SelectRangeDeletedRowSkips;
35+
UpdateRowBytes += other.UpdateRowBytes;
36+
EraseRowBytes += other.EraseRowBytes;
37+
InvisibleRowSkips += other.InvisibleRowSkips;
38+
return *this;
39+
}
40+
41+
TString ToString() const {
42+
return TStringBuilder()
43+
<< "{NSelectRow: " << NSelectRow
44+
<< ", NSelectRange: " << NSelectRange
45+
<< ", NUpdateRow: " << NUpdateRow
46+
<< ", NEraseRow: " << NEraseRow
47+
<< ", SelectRowRows: " << SelectRowRows
48+
<< ", SelectRowBytes: " << SelectRowBytes
49+
<< ", SelectRangeRows: " << SelectRangeRows
50+
<< ", SelectRangeBytes: " << SelectRangeBytes
51+
<< ", UpdateRowBytes: " << UpdateRowBytes
52+
<< ", EraseRowBytes: " << EraseRowBytes
53+
<< ", SelectRangeDeletedRowSkips: " << SelectRangeDeletedRowSkips
54+
<< ", InvisibleRowSkips: " << InvisibleRowSkips
55+
<< "}";
56+
}
57+
};
58+
}}

ydb/core/tx/datashard/check_write_unit.cpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,15 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op,
113113
return EExecutionStatus::Executed;
114114
}
115115

116-
writeOp->SetWriteResult(NEvents::TDataEvents::TEvWriteResult::BuildPrepared(DataShard.TabletID(), op->GetTxId(), {op->GetMinStep(), op->GetMaxStep(), {}}));
116+
writeOp->SetWriteResult(NEvents::TDataEvents::TEvWriteResult::BuildPrepared(
117+
DataShard.TabletID(),
118+
op->GetTxId(),
119+
{
120+
op->GetMinStep(),
121+
op->GetMaxStep(),
122+
DataShard.GetProcessingParams() ? DataShard.GetProcessingParams()->GetCoordinators() : google::protobuf::RepeatedField<ui64>{}
123+
}
124+
));
117125
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Prepared " << *op << " at " << DataShard.TabletID());
118126
}
119127

ydb/core/tx/datashard/complete_data_tx_unit.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,9 @@ EExecutionStatus TCompleteOperationUnit::Execute(TOperation::TPtr op,
7777
void TCompleteOperationUnit::CompleteOperation(TOperation::TPtr op,
7878
const TActorContext &ctx)
7979
{
80-
TActiveTransaction *tx = dynamic_cast<TActiveTransaction*>(op.Get());
81-
Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind());
80+
TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get());
81+
TWriteOperation* writeOp = dynamic_cast<TWriteOperation*>(op.Get());
82+
Y_VERIFY_S(tx || writeOp, "cannot cast operation of kind " << op->GetKind());
8283

8384
auto duration = TAppData::TimeProvider->Now() - op->GetStartExecutionAt();
8485

ydb/core/tx/datashard/datashard__write.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ bool TDataShard::TTxWrite::Execute(TTransactionContext& txc, const TActorContext
7272
return true;
7373
}
7474

75-
TOperation::TPtr op = Self->Pipeline.BuildOperation(Ev, ReceivedAt, TieBreakerIndex, txc, ctx, std::move(DatashardTransactionSpan));
75+
TOperation::TPtr op = Self->Pipeline.BuildOperation(Ev, ReceivedAt, TieBreakerIndex, txc, std::move(DatashardTransactionSpan));
7676
TWriteOperation* writeOp = TWriteOperation::CastWriteOperation(op);
7777

7878
// Unsuccessful operation parse.

ydb/core/tx/datashard/datashard_active_transaction.cpp

Lines changed: 4 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self,
2424
, EngineBay(self, txc, ctx, stepTxId)
2525
, ErrCode(NKikimrTxDataShard::TError::OK)
2626
, TxSize(0)
27-
, TxCacheUsage(0)
2827
, IsReleased(false)
2928
, BuiltTaskRunner(false)
3029
, IsReadOnly(true)
@@ -280,11 +279,10 @@ bool TValidatedDataTx::CheckCancelled(ui64 tabletId) {
280279
TInstant now = AppData()->TimeProvider->Now();
281280
Cancelled = (now >= Deadline());
282281

283-
Cancelled = Cancelled || gCancelTxFailPoint.Check(tabletId, TxId());
282+
Cancelled = Cancelled || gCancelTxFailPoint.Check(tabletId, GetTxId());
284283

285284
if (Cancelled) {
286-
LOG_NOTICE_S(*TlsActivationContext->ExecutorThread.ActorSystem, NKikimrServices::TX_DATASHARD,
287-
"CANCELLED TxId " << TxId() << " at " << tabletId);
285+
LOG_NOTICE_S(*TlsActivationContext->ExecutorThread.ActorSystem, NKikimrServices::TX_DATASHARD, "CANCELLED TxId " << GetTxId() << " at " << tabletId);
288286
}
289287
return Cancelled;
290288
}
@@ -332,7 +330,7 @@ void TActiveTransaction::FillTxData(TValidatedDataTx::TPtr dataTx)
332330
Y_ABORT_UNLESS(!DataTx);
333331
Y_ABORT_UNLESS(TxBody.empty() || HasVolatilePrepareFlag());
334332

335-
Target = dataTx->Source();
333+
Target = dataTx->GetSource();
336334
DataTx = dataTx;
337335

338336
if (DataTx->HasStreamResponse())
@@ -543,9 +541,7 @@ void TActiveTransaction::ReleaseTxData(NTabletFlatExecutor::TTxMemoryProviderBas
543541
DataTx->ReleaseTxData();
544542
// Immediate transactions have no body stored.
545543
if (!IsImmediate() && !HasVolatilePrepareFlag()) {
546-
UntrackMemory();
547-
TxBody.clear();
548-
TrackMemory();
544+
ClearTxBody();
549545
}
550546

551547
//InReadSets.clear();
@@ -687,50 +683,6 @@ void TActiveTransaction::FinalizeDataTxPlan()
687683
RewriteExecutionPlan(plan);
688684
}
689685

690-
class TFinalizeDataTxPlanUnit : public TExecutionUnit {
691-
public:
692-
TFinalizeDataTxPlanUnit(TDataShard &dataShard, TPipeline &pipeline)
693-
: TExecutionUnit(EExecutionUnitKind::FinalizeDataTxPlan, false, dataShard, pipeline)
694-
{ }
695-
696-
bool IsReadyToExecute(TOperation::TPtr) const override {
697-
return true;
698-
}
699-
700-
EExecutionStatus Execute(TOperation::TPtr op,
701-
TTransactionContext &txc,
702-
const TActorContext &ctx) override
703-
{
704-
Y_UNUSED(txc);
705-
Y_UNUSED(ctx);
706-
707-
TActiveTransaction *tx = dynamic_cast<TActiveTransaction*>(op.Get());
708-
Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind());
709-
Y_VERIFY_S(tx->IsDataTx(), "unexpected non-data tx");
710-
711-
if (auto dataTx = tx->GetDataTx()) {
712-
// Restore transaction type flags
713-
if (dataTx->IsKqpDataTx() && !tx->IsKqpDataTransaction())
714-
tx->SetKqpDataTransactionFlag();
715-
Y_VERIFY_S(!dataTx->IsKqpScanTx(), "unexpected kqp scan tx");
716-
}
717-
718-
tx->FinalizeDataTxPlan();
719-
720-
return EExecutionStatus::Executed;
721-
}
722-
723-
void Complete(TOperation::TPtr op,
724-
const TActorContext &ctx) override
725-
{
726-
Y_UNUSED(op);
727-
Y_UNUSED(ctx);
728-
}
729-
};
730-
731-
THolder<TExecutionUnit> CreateFinalizeDataTxPlanUnit(TDataShard &dataShard, TPipeline &pipeline) {
732-
return THolder(new TFinalizeDataTxPlanUnit(dataShard, pipeline));
733-
}
734686

735687
void TActiveTransaction::BuildExecutionPlan(bool loaded)
736688
{

ydb/core/tx/datashard/datashard_active_transaction.h

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ struct TSchemaOperation {
114114
};
115115

116116
/// @note This class incapsulates Engine stuff for minor needs. Do not return TEngine out of it.
117-
class TValidatedDataTx : TNonCopyable {
117+
class TValidatedDataTx : TNonCopyable, public TValidatedTx {
118118
public:
119119
using TPtr = std::shared_ptr<TValidatedDataTx>;
120120

@@ -128,13 +128,15 @@ class TValidatedDataTx : TNonCopyable {
128128

129129
~TValidatedDataTx();
130130

131+
EType GetType() const override { return EType::DataTx; };
132+
131133
static constexpr ui64 MaxReorderTxKeys() { return 100; }
132134

133135
NKikimrTxDataShard::TError::EKind Code() const { return ErrCode; }
134136
const TString GetErrors() const { return ErrStr; }
135137

136138
TStepOrder StepTxId() const { return StepTxId_; }
137-
ui64 TxId() const { return StepTxId_.TxId; }
139+
ui64 GetTxId() const override { return StepTxId_.TxId; }
138140
const TString& Body() const { return TxBody; }
139141

140142
ui64 LockTxId() const { return Tx.GetLockTxId(); }
@@ -150,7 +152,6 @@ class TValidatedDataTx : TNonCopyable {
150152

151153
bool Ready() const { return ErrCode == NKikimrTxDataShard::TError::OK; }
152154
bool RequirePrepare() const { return ErrCode == NKikimrTxDataShard::TError::SNAPSHOT_NOT_READY_YET; }
153-
bool RequireWrites() const { return TxInfo().HasWrites() || !Immediate(); }
154155
bool HasWrites() const { return TxInfo().HasWrites(); }
155156
bool HasLockedWrites() const { return HasWrites() && LockTxId(); }
156157
bool HasDynamicWrites() const { return TxInfo().DynKeysCount != 0; }
@@ -192,10 +193,7 @@ class TValidatedDataTx : TNonCopyable {
192193
std::optional<ui64> GetVolatileChangeGroup() const { return EngineBay.GetVolatileChangeGroup(); }
193194
bool GetVolatileCommitOrdered() const { return EngineBay.GetVolatileCommitOrdered(); }
194195

195-
TActorId Source() const { return Source_; }
196-
void SetSource(const TActorId& actorId) { Source_ = actorId; }
197196
void SetStep(ui64 step) { StepTxId_.Step = step; }
198-
bool IsProposed() const { return Source_ != TActorId(); }
199197

200198
bool IsTableRead() const { return Tx.HasReadTableTransaction(); }
201199

@@ -272,9 +270,9 @@ class TValidatedDataTx : TNonCopyable {
272270

273271
ui64 GetTxSize() const { return TxSize; }
274272
ui32 KeysCount() const { return TxInfo().ReadsCount + TxInfo().WritesCount; }
275-
276-
void SetTxCacheUsage(ui64 val) { TxCacheUsage = val; }
277-
ui64 GetTxCacheUsage() const { return TxCacheUsage; }
273+
ui64 GetMemoryConsumption() const override {
274+
return GetTxSize() + GetMemoryAllocated();
275+
}
278276

279277
void ReleaseTxData();
280278
bool IsTxDataReleased() const { return IsReleased; }
@@ -291,13 +289,11 @@ class TValidatedDataTx : TNonCopyable {
291289
private:
292290
TStepOrder StepTxId_;
293291
TString TxBody;
294-
TActorId Source_;
295292
TEngineBay EngineBay;
296293
NKikimrTxDataShard::TDataTransaction Tx;
297294
NKikimrTxDataShard::TError::EKind ErrCode;
298295
TString ErrStr;
299296
ui64 TxSize;
300-
ui64 TxCacheUsage;
301297
bool IsReleased;
302298
bool BuiltTaskRunner;
303299
TMaybe<ui64> PerShardKeysSizeLimitBytes_;
@@ -311,12 +307,6 @@ class TValidatedDataTx : TNonCopyable {
311307
void ComputeDeadline();
312308
};
313309

314-
enum class ERestoreDataStatus {
315-
Ok,
316-
Restart,
317-
Error,
318-
};
319-
320310
///
321311
class TDistributedEraseTx {
322312
public:

ydb/core/tx/datashard/datashard_impl.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2562,9 +2562,7 @@ class TDataShard
25622562
TInstant StartedKeyAccessSamplingAt;
25632563
TInstant StopKeyAccessSamplingAt;
25642564

2565-
using TTableInfos = THashMap<ui64, TUserTable::TCPtr>;
2566-
2567-
TTableInfos TableInfos; // tableId -> local table info
2565+
TUserTable::TTableInfos TableInfos; // tableId -> local table info
25682566
TTransQueue TransQueue;
25692567
TOutReadSets OutReadSets;
25702568
TPipeline Pipeline;

0 commit comments

Comments
 (0)