Skip to content

Commit 24cb186

Browse files
fixes for TxWriteIndex volume control, policies of exceptions on searching, eviction tasks for portions usage (#1894)
1 parent ebb5c19 commit 24cb186

35 files changed

+317
-144
lines changed

ydb/core/formats/arrow/program.cpp

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -139,18 +139,20 @@ class TKernelFunction : public IStepFunction<TAssignObject> {
139139
TKernelFunction(const TFunctionPtr kernelsFunction, arrow::compute::ExecContext* ctx)
140140
: TBase(ctx)
141141
, Function(kernelsFunction)
142-
{}
142+
{
143+
AFL_VERIFY(Function);
144+
}
143145

144146
arrow::Result<arrow::Datum> Call(const TAssignObject& assign, const TDatumBatch& batch) const override {
145147
auto arguments = TBase::BuildArgs(batch, assign.GetArguments());
146148
if (!arguments) {
147149
return arrow::Status::Invalid("Error parsing args.");
148150
}
149-
try {
151+
// try {
150152
return Function->Execute(*arguments, assign.GetOptions(), TBase::Ctx);
151-
} catch (const std::exception& ex) {
152-
return arrow::Status::ExecutionError(ex.what());
153-
}
153+
// } catch (const std::exception& ex) {
154+
// return arrow::Status::ExecutionError(ex.what());
155+
// }
154156
}
155157
};
156158

ydb/core/tx/columnshard/blobs_action/tier/write.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ void TWriteAction::DoOnExecuteTxAfterWrite(NColumnShard::TColumnShard& /*self*/,
2424
for (auto&& i : GetBlobsForWrite()) {
2525
dbBlobs.RemoveTierDraftBlobId(GetStorageId(), i.first);
2626
dbBlobs.AddTierBlobToDelete(GetStorageId(), i.first);
27-
GCInfo->MutableBlobsToDelete().emplace_back(i.first);
2827
}
2928
}
3029
}
@@ -41,4 +40,12 @@ NKikimr::NOlap::TUnifiedBlobId TWriteAction::AllocateNextBlobId(const TString& d
4140
return TUnifiedBlobId(Max<ui32>(), TLogoBlobID(TabletId, now.GetValue() >> 32, now.GetValue() & Max<ui32>(), TLogoBlobID::MaxChannel, data.size(), AtomicIncrement(Counter) % TLogoBlobID::MaxCookie, 1));
4241
}
4342

43+
void TWriteAction::DoOnCompleteTxAfterWrite(NColumnShard::TColumnShard& /*self*/, const bool blobsWroteSuccessfully) {
44+
if (!blobsWroteSuccessfully) {
45+
for (auto&& i : GetBlobsForWrite()) {
46+
GCInfo->MutableBlobsToDelete().emplace_back(i.first);
47+
}
48+
}
49+
}
50+
4451
}

ydb/core/tx/columnshard/blobs_action/tier/write.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,7 @@ class TWriteAction: public IBlobsWritingAction {
2727
}
2828

2929
virtual void DoOnExecuteTxAfterWrite(NColumnShard::TColumnShard& self, NColumnShard::TBlobManagerDb& dbBlobs, const bool blobsWroteSuccessfully) override;
30-
virtual void DoOnCompleteTxAfterWrite(NColumnShard::TColumnShard& /*self*/, const bool /*blobsWroteSuccessfully*/) override {
31-
32-
}
30+
virtual void DoOnCompleteTxAfterWrite(NColumnShard::TColumnShard& /*self*/, const bool blobsWroteSuccessfully) override;
3331
public:
3432
virtual bool NeedDraftTransaction() const override {
3533
return true;

ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ void TTxWrite::Complete(const TActorContext& ctx) {
111111
for (ui32 i = 0; i < buffer.GetAggregations().size(); ++i) {
112112
const auto& writeMeta = buffer.GetAggregations()[i]->GetWriteData()->GetWriteMeta();
113113
ctx.Send(writeMeta.GetSource(), Results[i].release());
114-
Self->CSCounters.OnWriteTxComplete((now - writeMeta.GetWriteStartInstant()).MilliSeconds());
114+
Self->CSCounters.OnWriteTxComplete(now - writeMeta.GetWriteStartInstant());
115115
Self->CSCounters.OnSuccessWriteResponse();
116116
}
117117

ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,4 +85,11 @@ TTxWriteIndex::TTxWriteIndex(TColumnShard* self, TEvPrivate::TEvWriteIndex::TPtr
8585
Y_ABORT_UNLESS(Ev && Ev->Get()->IndexChanges);
8686
}
8787

88+
void TTxWriteIndex::Describe(IOutputStream& out) const noexcept {
89+
out << TypeName(*this);
90+
if (Ev->Get()->IndexChanges) {
91+
out << ": " << Ev->Get()->IndexChanges->DebugString();
92+
}
93+
}
94+
8895
}

ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ class TTxWriteIndex: public TTransactionBase<TColumnShard> {
1616
bool Execute(TTransactionContext& txc, const TActorContext& ctx) override;
1717
void Complete(const TActorContext& ctx) override;
1818
TTxType GetTxType() const override { return TXTYPE_WRITE_INDEX; }
19+
virtual void Describe(IOutputStream& out) const noexcept override;
1920

2021
private:
2122

ydb/core/tx/columnshard/blobs_reader/actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ void TActor::Handle(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult::TPtr& ev)
1515
bool aborted = false;
1616
if (event.Status != NKikimrProto::EReplyStatus::OK) {
1717
WaitingBlobsCount.Sub(Task->GetWaitingCount());
18-
if (!Task->AddError(event.BlobRange, IBlobsReadingAction::TErrorStatus::Fail(event.Status, "cannot get blob"))) {
18+
if (!Task->AddError(event.BlobRange, IBlobsReadingAction::TErrorStatus::Fail(event.Status, "cannot get blob: " + event.Data.substr(1024)))) {
1919
aborted = true;
2020
}
2121
} else {

ydb/core/tx/columnshard/columnshard__write.cpp

Lines changed: 43 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "columnshard_impl.h"
22
#include "blobs_action/transaction/tx_write.h"
33
#include "blobs_action/transaction/tx_draft.h"
4+
#include "counters/columnshard.h"
45
#include "operations/slice_builder.h"
56
#include "operations/write_data.h"
67

@@ -21,9 +22,17 @@ void TColumnShard::OverloadWriteFail(const EOverloadStatus overloadReason, const
2122
IncCounter(COUNTER_WRITE_OVERLOAD);
2223
CSCounters.OnOverloadInsertTable(writeData.GetSize());
2324
break;
24-
case EOverloadStatus::Shard:
25+
case EOverloadStatus::ShardTxInFly:
2526
IncCounter(COUNTER_WRITE_OVERLOAD);
26-
CSCounters.OnOverloadShard(writeData.GetSize());
27+
CSCounters.OnOverloadShardTx(writeData.GetSize());
28+
break;
29+
case EOverloadStatus::ShardWritesInFly:
30+
IncCounter(COUNTER_WRITE_OVERLOAD);
31+
CSCounters.OnOverloadShardWrites(writeData.GetSize());
32+
break;
33+
case EOverloadStatus::ShardWritesSizeInFly:
34+
IncCounter(COUNTER_WRITE_OVERLOAD);
35+
CSCounters.OnOverloadShardWritesSize(writeData.GetSize());
2736
break;
2837
case EOverloadStatus::None:
2938
Y_ABORT("invalid function usage");
@@ -45,8 +54,20 @@ TColumnShard::EOverloadStatus TColumnShard::CheckOverloaded(const ui64 tableId)
4554
return EOverloadStatus::InsertTable;
4655
}
4756

48-
if (WritesMonitor.ShardOverloaded()) {
49-
return EOverloadStatus::Shard;
57+
ui64 txLimit = Settings.OverloadTxInFlight;
58+
ui64 writesLimit = Settings.OverloadWritesInFlight;
59+
ui64 writesSizeLimit = Settings.OverloadWritesSizeInFlight;
60+
if (txLimit && Executor()->GetStats().TxInFly > txLimit) {
61+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "shard_overload")("reason", "tx_in_fly")("sum", Executor()->GetStats().TxInFly)("limit", txLimit);
62+
return EOverloadStatus::ShardTxInFly;
63+
}
64+
if (writesLimit && WritesMonitor.GetWritesInFlight() > writesLimit) {
65+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "shard_overload")("reason", "writes_in_fly")("sum", WritesMonitor.GetWritesInFlight())("limit", writesLimit);
66+
return EOverloadStatus::ShardWritesInFly;
67+
}
68+
if (writesSizeLimit && WritesMonitor.GetWritesSizeInFlight() > writesSizeLimit) {
69+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "shard_overload")("reason", "writes_size_in_fly")("sum", WritesMonitor.GetWritesSizeInFlight())("limit", writesSizeLimit);
70+
return EOverloadStatus::ShardWritesSizeInFly;
5071
}
5172
return EOverloadStatus::None;
5273
}
@@ -57,7 +78,8 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo
5778
auto& putResult = ev->Get()->GetPutResult();
5879
OnYellowChannels(putResult);
5980
NOlap::TWritingBuffer& wBuffer = ev->Get()->MutableWritesBuffer();
60-
auto& baseAggregations = wBuffer.GetAggregations();
81+
auto baseAggregations = wBuffer.GetAggregations();
82+
wBuffer.InitReplyReceived(TMonotonic::Now());
6183

6284
auto wg = WritesMonitor.FinishWrite(wBuffer.GetSumSize(), wBuffer.GetAggregations().size());
6385

@@ -70,13 +92,13 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo
7092

7193
auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, NKikimrTxColumnShard::EResultStatus::ERROR);
7294
ctx.Send(writeMeta.GetSource(), result.release());
73-
CSCounters.OnFailedWriteResponse();
95+
CSCounters.OnFailedWriteResponse(EWriteFailReason::NoTable);
7496
wBuffer.RemoveData(aggr, StoragesManager->GetInsertOperator());
7597
continue;
7698
}
7799

78100
if (putResult.GetPutStatus() != NKikimrProto::OK) {
79-
CSCounters.OnWritePutBlobsFail((TMonotonic::Now() - writeMeta.GetWriteStartInstant()).MilliSeconds());
101+
CSCounters.OnWritePutBlobsFail(TMonotonic::Now() - writeMeta.GetWriteStartInstant());
80102
IncCounter(COUNTER_WRITE_FAIL);
81103

82104
auto errCode = NKikimrTxColumnShard::EResultStatus::STORAGE_ERROR;
@@ -97,16 +119,17 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo
97119
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), operation->GetTxId(), NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, "put data fails");
98120
ctx.Send(writeMeta.GetSource(), result.release());
99121
}
100-
CSCounters.OnFailedWriteResponse();
122+
CSCounters.OnFailedWriteResponse(EWriteFailReason::PutBlob);
101123
wBuffer.RemoveData(aggr, StoragesManager->GetInsertOperator());
102124
} else {
103125
const TMonotonic now = TMonotonic::Now();
104-
CSCounters.OnWritePutBlobsSuccess((now - writeMeta.GetWriteStartInstant()).MilliSeconds());
105-
CSCounters.OnWriteMiddle1PutBlobsSuccess((now - writeMeta.GetWriteMiddle1StartInstant()).MilliSeconds());
106-
CSCounters.OnWriteMiddle2PutBlobsSuccess((now - writeMeta.GetWriteMiddle2StartInstant()).MilliSeconds());
107-
CSCounters.OnWriteMiddle3PutBlobsSuccess((now - writeMeta.GetWriteMiddle3StartInstant()).MilliSeconds());
108-
CSCounters.OnWriteMiddle4PutBlobsSuccess((now - writeMeta.GetWriteMiddle4StartInstant()).MilliSeconds());
109-
CSCounters.OnWriteMiddle5PutBlobsSuccess((now - writeMeta.GetWriteMiddle5StartInstant()).MilliSeconds());
126+
CSCounters.OnWritePutBlobsSuccess(now - writeMeta.GetWriteStartInstant());
127+
CSCounters.OnWriteMiddle1PutBlobsSuccess(now - writeMeta.GetWriteMiddle1StartInstant());
128+
CSCounters.OnWriteMiddle2PutBlobsSuccess(now - writeMeta.GetWriteMiddle2StartInstant());
129+
CSCounters.OnWriteMiddle3PutBlobsSuccess(now - writeMeta.GetWriteMiddle3StartInstant());
130+
CSCounters.OnWriteMiddle4PutBlobsSuccess(now - writeMeta.GetWriteMiddle4StartInstant());
131+
CSCounters.OnWriteMiddle5PutBlobsSuccess(now - writeMeta.GetWriteMiddle5StartInstant());
132+
CSCounters.OnWriteMiddle6PutBlobsSuccess(now - writeMeta.GetWriteMiddle6StartInstant());
110133
LOG_S_DEBUG("Write (record) into pathId " << writeMeta.GetTableId()
111134
<< (writeMeta.GetWriteId() ? (" writeId " + ToString(writeMeta.GetWriteId())).c_str() : "") << " at tablet " << TabletID());
112135

@@ -139,18 +162,20 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
139162
IncCounter(signalIndex);
140163

141164
ctx.Send(source, std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, NKikimrTxColumnShard::EResultStatus::ERROR));
142-
CSCounters.OnFailedWriteResponse();
143165
return;
144166
};
145167

146168
if (!AppDataVerified().ColumnShardConfig.GetWritingEnabled()) {
147169
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_writing")("reason", "disabled");
170+
CSCounters.OnFailedWriteResponse(EWriteFailReason::Disabled);
148171
return returnFail(COUNTER_WRITE_FAIL);
149172
}
150173

151174
if (!TablesManager.IsReadyForWrite(tableId)) {
152175
LOG_S_NOTICE("Write (fail) into pathId:" << writeMeta.GetTableId() << (TablesManager.HasPrimaryIndex()? "": " no index")
153176
<< " at tablet " << TabletID());
177+
178+
CSCounters.OnFailedWriteResponse(EWriteFailReason::NoTable);
154179
return returnFail(COUNTER_WRITE_FAIL);
155180
}
156181

@@ -159,6 +184,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
159184
if (!arrowData->ParseFromProto(record)) {
160185
LOG_S_ERROR("Write (fail) " << record.GetData().size() << " bytes into pathId " << writeMeta.GetTableId()
161186
<< " at tablet " << TabletID());
187+
CSCounters.OnFailedWriteResponse(EWriteFailReason::IncorrectSchema);
162188
return returnFail(COUNTER_WRITE_FAIL);
163189
}
164190

@@ -167,7 +193,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
167193
if (overloadStatus != EOverloadStatus::None) {
168194
std::unique_ptr<NActors::IEventBase> result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeData.GetWriteMeta(), NKikimrTxColumnShard::EResultStatus::OVERLOADED);
169195
OverloadWriteFail(overloadStatus, writeData, std::move(result), ctx);
170-
CSCounters.OnFailedWriteResponse();
196+
CSCounters.OnFailedWriteResponse(EWriteFailReason::Overload);
171197
} else {
172198
if (ui64 writeId = (ui64)HasLongTxWrite(writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId())) {
173199
LOG_S_DEBUG("Write (duplicate) into pathId " << writeMeta.GetTableId()
@@ -179,7 +205,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
179205
auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(
180206
TabletID(), writeMeta, writeId, NKikimrTxColumnShard::EResultStatus::SUCCESS);
181207
ctx.Send(writeMeta.GetSource(), result.release());
182-
CSCounters.OnFailedWriteResponse();
208+
CSCounters.OnFailedWriteResponse(EWriteFailReason::LongTxDuplication);
183209
return;
184210
}
185211

ydb/core/tx/columnshard/columnshard_impl.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,9 @@ void TColumnShard::TryAbortWrites(NIceDb::TNiceDb& db, NOlap::TDbWrapper& dbTabl
296296
failedAborts.push_back(writeId);
297297
}
298298
}
299+
if (failedAborts.size()) {
300+
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "failed_aborts")("count", failedAborts.size())("writes_count", writesToAbort.size());
301+
}
299302
for (auto& writeId : failedAborts) {
300303
writesToAbort.erase(writeId);
301304
}
@@ -812,6 +815,7 @@ void TColumnShard::SetupCleanupInsertTable() {
812815
if (!InsertTable->GetAborted().size() && !writeIdsToCleanup.size()) {
813816
return;
814817
}
818+
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "cleanup_started")("aborted", InsertTable->GetAborted().size())("to_cleanup", writeIdsToCleanup.size());
815819

816820
Execute(new TTxInsertTableCleanup(this, std::move(writeIdsToCleanup)), TActorContext::AsActorContext());
817821
}

ydb/core/tx/columnshard/columnshard_impl.h

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,9 @@ class TColumnShard
214214
void OnTieringModified();
215215
public:
216216
enum class EOverloadStatus {
217-
Shard /* "shard" */,
217+
ShardTxInFly /* "shard_tx" */,
218+
ShardWritesInFly /* "shard_writes" */,
219+
ShardWritesSizeInFly /* "shard_writes_size" */,
218220
InsertTable /* "insert_table" */,
219221
Disk /* "disk" */,
220222
None /* "none" */
@@ -298,8 +300,8 @@ class TColumnShard
298300
class TWritesMonitor {
299301
private:
300302
TColumnShard& Owner;
301-
ui64 WritesInFlight = 0;
302-
ui64 WritesSizeInFlight = 0;
303+
YDB_READONLY(ui64, WritesInFlight, 0);
304+
YDB_READONLY(ui64, WritesSizeInFlight, 0);
303305

304306
public:
305307
class TGuard: public TNonCopyable {
@@ -335,17 +337,8 @@ class TColumnShard
335337
return TGuard(*this);
336338
}
337339

338-
bool ShardOverloaded() const {
339-
ui64 txLimit = Owner.Settings.OverloadTxInFlight;
340-
ui64 writesLimit = Owner.Settings.OverloadWritesInFlight;
341-
ui64 writesSizeLimit = Owner.Settings.OverloadWritesSizeInFlight;
342-
return (txLimit && Owner.Executor()->GetStats().TxInFly > txLimit) ||
343-
(writesLimit && WritesInFlight > writesLimit) ||
344-
(writesSizeLimit && WritesSizeInFlight > writesSizeLimit);
345-
}
346-
347340
TString DebugString() const {
348-
return TStringBuilder() << "TWritesMonitor: inflight " << WritesInFlight << " (" << WritesSizeInFlight << " bytes)";
341+
return TStringBuilder() << "{object=write_monitor;count=" << WritesInFlight << ";size=" << WritesSizeInFlight << "}";
349342
}
350343

351344
private:

0 commit comments

Comments
 (0)