Skip to content

Commit b331ffe

Browse files
committed
TableAccessStats
1 parent f3139c8 commit b331ffe

File tree

5 files changed

+27
-13
lines changed

5 files changed

+27
-13
lines changed

ydb/core/tx/datashard/datashard_kqp.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -953,10 +953,8 @@ void KqpUpdateDataShardStatCounters(TDataShard& dataShard, const NMiniKQL::TEngi
953953
}
954954
}
955955

956-
void KqpFillTxStats(TDataShard& dataShard, const NMiniKQL::TEngineHostCounters& counters,
957-
TEvDataShard::TEvProposeTransactionResult& result)
956+
void KqpFillTxStats(TDataShard& dataShard, const NMiniKQL::TEngineHostCounters& counters, NKikimrQueryStats::TTxStats& stats)
958957
{
959-
auto& stats = *result.Record.MutableTxStats();
960958
auto& perTable = *stats.AddTableAccessStats();
961959
perTable.MutableTableInfo()->SetSchemeshardId(dataShard.GetPathOwnerId());
962960
Y_ABORT_UNLESS(dataShard.GetUserTables().size() == 1, "TODO: Fix handling of collocated tables");

ydb/core/tx/datashard/datashard_kqp.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
#include <ydb/core/engine/minikql/minikql_engine_host.h>
1010
#include <ydb/core/kqp/runtime/kqp_tasks_runner.h>
11+
#include <ydb/core/protos/query_stats.pb.h>
1112
#include <ydb/core/tx/locks/locks_db.h>
1213

1314
#include <util/generic/ptr.h>
@@ -48,8 +49,7 @@ void KqpCommitLocks(ui64 tabletId, const NKikimrDataEvents::TKqpLocks* kqpLocks,
4849

4950
void KqpUpdateDataShardStatCounters(TDataShard& dataShard, const NMiniKQL::TEngineHostCounters& counters);
5051

51-
void KqpFillTxStats(TDataShard& dataShard, const NMiniKQL::TEngineHostCounters& counters,
52-
TEvDataShard::TEvProposeTransactionResult& result);
52+
void KqpFillTxStats(TDataShard& dataShard, const NMiniKQL::TEngineHostCounters& counters, NKikimrQueryStats::TTxStats& stats);
5353

5454
void KqpFillStats(TDataShard& dataShard, const NKqp::TKqpTasksRunner& tasksRunner,
5555
NMiniKQL::TKqpDatashardComputeContext& computeCtx, const NYql::NDqProto::EDqStatsMode& statsMode,

ydb/core/tx/datashard/datashard_ut_write.cpp

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,19 +59,22 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
5959
auto opts = TShardedTableOptions().Columns({{"key", "Uint32", true, false}, {"value", "Uint32", false, false}});
6060
auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", "table-1", opts);
6161
const ui64 shard = shards[0];
62-
6362
const ui32 rowCount = 3;
63+
6464
ui64 txId = 100;
6565

6666
Cout << "========= Send immediate write =========\n";
6767
{
6868
const auto writeResult = Write(runtime, sender, shard, tableId, opts.Columns_, rowCount, txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
69+
6970
UNIT_ASSERT_VALUES_EQUAL(writeResult.GetOrigin(), shard);
7071
UNIT_ASSERT_VALUES_EQUAL(writeResult.GetStep(), 0);
7172
UNIT_ASSERT_VALUES_EQUAL(writeResult.GetOrderId(), txId);
7273
UNIT_ASSERT_VALUES_EQUAL(writeResult.GetTxId(), txId);
73-
UNIT_ASSERT_VALUES_EQUAL(writeResult.GetDomainCoordinators().size(), 1);
74-
UNIT_ASSERT_VALUES_EQUAL(writeResult.GetTabletInfo().GetTabletId(), shard);
74+
75+
const auto& tableAccessStats = writeResult.GetTxStats().GetTableAccessStats(0);
76+
UNIT_ASSERT_VALUES_EQUAL(tableAccessStats.GetTableInfo().GetName(), "/Root/table-1");
77+
UNIT_ASSERT_VALUES_EQUAL(tableAccessStats.GetUpdateRow().GetCount(), rowCount);
7578
}
7679

7780
Cout << "========= Read table =========\n";
@@ -87,13 +90,14 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
8790
auto opts = TShardedTableOptions().Columns({{"key64", "Uint64", true, false}, {"key32", "Uint32", true, false},
8891
{"value64", "Uint64", false, false}, {"value32", "Uint32", false, false}, {"valueUtf8", "Utf8", false, false}});
8992
auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", "table-1", opts);
90-
93+
const ui64 shard = shards[0];
9194
const ui32 rowCount = 3;
95+
9296
ui64 txId = 100;
9397

9498
Cout << "========= Send immediate write =========\n";
9599
{
96-
Write(runtime, sender, shards[0], tableId, opts.Columns_, rowCount, txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
100+
Write(runtime, sender, shard, tableId, opts.Columns_, rowCount, txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
97101
}
98102

99103
Cout << "========= Read table =========\n";
@@ -110,6 +114,7 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
110114

111115
auto opts = TShardedTableOptions().Columns({{"key", "Utf8", true, false}});
112116
auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", "table-1", opts);
117+
const ui64 shard = shards[0];
113118

114119
Cout << "========= Send immediate write =========\n";
115120
{
@@ -120,7 +125,7 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
120125
ui64 payloadIndex = NKikimr::NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(matrix.ReleaseBuffer());
121126
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, tableId, {1}, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC);
122127

123-
const auto writeResult = Write(runtime, sender, shards[0], std::move(evWrite), NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST);
128+
const auto writeResult = Write(runtime, sender, shard, std::move(evWrite), NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST);
124129
UNIT_ASSERT_VALUES_EQUAL(writeResult.GetIssues().size(), 1);
125130
UNIT_ASSERT(writeResult.GetIssues(0).message().Contains("Row key size of 1049601 bytes is larger than the allowed threshold 1049600"));
126131
}
@@ -133,13 +138,14 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
133138
const auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", "table-1", opts);
134139
const ui64 shard = shards[0];
135140
const ui64 coordinator = ChangeStateStorage(Coordinator, server->GetSettings().Domain);
141+
const ui32 rowCount = 3;
136142

137143
ui64 txId = 100;
138144
ui64 stepId;
139145

140146
Cout << "========= Send prepare =========\n";
141147
{
142-
const auto writeResult = Write(runtime, sender, shard, tableId, opts.Columns_, 3, txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
148+
const auto writeResult = Write(runtime, sender, shard, tableId, opts.Columns_, rowCount, txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
143149

144150
UNIT_ASSERT_VALUES_EQUAL(writeResult.GetStatus(), NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED);
145151
UNIT_ASSERT_GT(writeResult.GetMinStep(), 0);
@@ -162,11 +168,16 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
162168
{
163169
auto ev = runtime.GrabEdgeEventRethrow<NEvents::TDataEvents::TEvWriteResult>(sender);
164170
auto writeResult = ev->Get()->Record;
171+
165172
UNIT_ASSERT_C(writeResult.GetStatus() == NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED, "Status: " << writeResult.GetStatus() << " Issues: " << writeResult.GetIssues());
166173
UNIT_ASSERT_VALUES_EQUAL(writeResult.GetOrigin(), shard);
167174
UNIT_ASSERT_VALUES_EQUAL(writeResult.GetStep(), stepId);
168175
UNIT_ASSERT_VALUES_EQUAL(writeResult.GetOrderId(), txId);
169176
UNIT_ASSERT_VALUES_EQUAL(writeResult.GetTxId(), txId);
177+
178+
const auto& tableAccessStats = writeResult.GetTxStats().GetTableAccessStats(0);
179+
UNIT_ASSERT_VALUES_EQUAL(tableAccessStats.GetTableInfo().GetName(), "/Root/table-1");
180+
UNIT_ASSERT_VALUES_EQUAL(tableAccessStats.GetUpdateRow().GetCount(), rowCount);
170181
}
171182

172183
Cout << "========= Read table =========\n";

ydb/core/tx/datashard/execute_data_tx_unit.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ void TExecuteDataTxUnit::ExecuteDataTx(TOperation::TPtr op,
313313

314314
KqpUpdateDataShardStatCounters(DataShard, counters);
315315
if (tx->GetDataTx()->CollectStats()) {
316-
KqpFillTxStats(DataShard, counters, *result);
316+
KqpFillTxStats(DataShard, counters, *result->Record.MutableTxStats());
317317
}
318318

319319
if (counters.InvisibleRowSkips && op->LockTxId()) {

ydb/core/tx/datashard/write_unit.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ class TWriteUnit : public TExecutionUnit {
144144
const TValidatedWriteTx::TPtr& writeTx = writeOp->GetWriteTx();
145145

146146
DataShard.ReleaseCache(*writeOp);
147+
writeTx->GetUserDb().ResetCounters();
147148

148149
if (writeOp->IsTxDataReleased()) {
149150
switch (Pipeline.RestoreDataTx(writeOp, txc)) {
@@ -329,6 +330,10 @@ class TWriteUnit : public TExecutionUnit {
329330
op->ChangeRecords() = std::move(changes);
330331
}
331332

333+
auto& counters = writeTx->GetUserDb().GetCounters();
334+
KqpUpdateDataShardStatCounters(DataShard, counters);
335+
KqpFillTxStats(DataShard, counters, *writeResult->Record.MutableTxStats());
336+
332337
} catch (const TNeedGlobalTxId&) {
333338
Y_VERIFY_S(op->GetGlobalTxId() == 0,
334339
"Unexpected TNeedGlobalTxId exception for write operation with TxId# " << op->GetGlobalTxId());

0 commit comments

Comments
 (0)