Skip to content

Commit e17cbb5

Browse files
authored
Merge 71c80ed into 63653ea
2 parents 63653ea + 71c80ed commit e17cbb5

File tree

8 files changed

+81
-10
lines changed

8 files changed

+81
-10
lines changed

ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager {
6363
return TotalQueryAllocationsSize >= ReasonableSpillingTreshold;
6464
}
6565

66+
TString MemoryConsumptionDetails() const override {
67+
return ResourceManager->GetTxResourcesUsageDebugInfo(TxId);
68+
}
69+
6670
void TerminateHandler(bool success, const NYql::TIssues& issues) {
6771
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)
6872
("problem", "finish_compute_actor")

ydb/core/kqp/rm_service/kqp_rm_service.cpp

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,15 @@ struct TTxState {
107107
ui32 TxExecutionUnits = 0;
108108
TInstant CreatedAt;
109109

110+
TString ToString() const {
111+
return TStringBuilder() << "TxResourcesInfo{ "
112+
<< "Memory initially granted resources: " << TxExternalDataQueryMemory
113+
<< ", extra allocations " << TxScanQueryMemory
114+
<< ", execution units: " << TxExecutionUnits
115+
<< ", started at: " << CreatedAt
116+
<< " }";
117+
}
118+
110119
TTaskState& Allocated(ui64 taskId, TInstant now, const TKqpResourcesRequest& resources, bool memoryAsExternal = false) {
111120
ui64 externalMemory = resources.ExternalMemory;
112121
ui64 resourceBrokerMemory = 0;
@@ -564,6 +573,19 @@ class TKqpResourceManager : public IKqpResourceManager {
564573
ActorSystem->Send(SelfId, new TEvPrivate::TEvSchedulePublishResources);
565574
}
566575

576+
TString GetTxResourcesUsageDebugInfo(ui64 txId) override {
577+
auto& txBucket = TxBucket(txId);
578+
with_lock(txBucket.Lock) {
579+
auto it = txBucket.Txs.find(txId);
580+
if (it == txBucket.Txs.end()) {
581+
return "<empty info>";
582+
}
583+
584+
return it->second.ToString();
585+
}
586+
}
587+
588+
567589
void UpdatePatternCache(ui64 maxSizeBytes, ui64 maxCompiledSizeBytes, ui64 patternAccessTimesBeforeTryToCompile) {
568590
if (maxSizeBytes == 0) {
569591
PatternCache.reset();

ydb/core/kqp/rm_service/kqp_rm_service.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ class IKqpResourceManager : private TNonCopyable {
9494

9595
virtual void FreeResources(ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources) = 0;
9696
virtual void FreeResources(ui64 txId, ui64 taskId) = 0;
97+
virtual TString GetTxResourcesUsageDebugInfo(ui64 txId) = 0;
9798

9899
virtual void NotifyExternalResourcesAllocated(ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources) = 0;
99100

ydb/core/kqp/ut/query/kqp_limits_ut.cpp

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,27 @@ Y_UNIT_TEST_SUITE(KqpLimits) {
128128
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_REQUEST);
129129
}
130130

131+
Y_UNIT_TEST(ComputeActorMemoryAllocationFailure) {
132+
auto app = NKikimrConfig::TAppConfig();
133+
app.MutableTableServiceConfig()->MutableResourceManager()->SetMkqlLightProgramMemoryLimit(10);
134+
app.MutableTableServiceConfig()->MutableResourceManager()->SetQueryMemoryLimit(2000);
135+
136+
TKikimrRunner kikimr(app);
137+
CreateLargeTable(kikimr, 0, 0, 0);
138+
139+
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_SLOW_LOG, NActors::NLog::PRI_ERROR);
140+
141+
auto db = kikimr.GetTableClient();
142+
auto session = db.CreateSession().GetValueSync().GetSession();
143+
144+
auto result = session.ExecuteDataQuery(Q1_(R"(
145+
SELECT * FROM `/Root/LargeTable`;
146+
)"), TTxControl::BeginTx().CommitTx()).ExtractValueSync();
147+
result.GetIssues().PrintTo(Cerr);
148+
149+
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::OVERLOADED);
150+
}
151+
131152
Y_UNIT_TEST(DatashardProgramSize) {
132153
auto app = NKikimrConfig::TAppConfig();
133154
app.MutableTableServiceConfig()->MutableResourceManager()->SetMkqlLightProgramMemoryLimit(1'000'000'000);

ydb/library/yql/dq/actors/compute/dq_compute_actor.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,10 @@ struct TGuaranteeQuotaManager : public IMemoryQuotaManager {
331331
return MaxMemorySize;
332332
};
333333

334+
TString MemoryConsumptionDetails() const override {
335+
return TString();
336+
}
337+
334338
virtual bool AllocateExtraQuota(ui64) {
335339
return false;
336340
}

ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ struct IMemoryQuotaManager {
6868
virtual ui64 GetCurrentQuota() const = 0;
6969
virtual ui64 GetMaxMemorySize() const = 0;
7070
virtual bool IsReasonableToUseSpilling() const = 0;
71+
virtual TString MemoryConsumptionDetails() const = 0;
7172
};
7273

7374
// Source/transform.

ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -147,10 +147,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
147147

148148
static_cast<TDerived*>(this)->DoBootstrap();
149149
} catch (const NKikimr::TMemoryLimitExceededException& e) {
150-
InternalError(NYql::NDqProto::StatusIds::OVERLOADED, TIssuesIds::KIKIMR_PRECONDITION_FAILED, TStringBuilder()
151-
<< "Mkql memory limit exceeded, limit: " << GetMkqlMemoryLimit()
152-
<< ", host: " << HostName()
153-
<< ", canAllocateExtraMemory: " << CanAllocateExtraMemory);
150+
OnMemoryLimitExceptionHandler();
154151
} catch (const std::exception& e) {
155152
InternalError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TIssuesIds::UNEXPECTED, e.what());
156153
}
@@ -225,10 +222,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
225222
TComputeActorClass* self = static_cast<TComputeActorClass*>(this);
226223
(self->*FuncBody)(ev);
227224
} catch (const NKikimr::TMemoryLimitExceededException& e) {
228-
InternalError(NYql::NDqProto::StatusIds::OVERLOADED, TIssuesIds::KIKIMR_PRECONDITION_FAILED, TStringBuilder()
229-
<< "Mkql memory limit exceeded, limit: " << GetMkqlMemoryLimit()
230-
<< ", host: " << HostName()
231-
<< ", canAllocateExtraMemory: " << CanAllocateExtraMemory);
225+
OnMemoryLimitExceptionHandler();
232226
} catch (const std::exception& e) {
233227
if (PassExceptions) {
234228
throw;
@@ -316,6 +310,20 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
316310

317311
virtual bool DoHandleChannelsAfterFinishImpl() = 0;
318312

313+
void OnMemoryLimitExceptionHandler() {
314+
TString memoryConsumptionDetails = MemoryLimits.MemoryQuotaManager->MemoryConsumptionDetails();
315+
TStringBuilder failureReason = TStringBuilder()
316+
<< "Mkql memory limit exceeded, limit: " << GetMkqlMemoryLimit()
317+
<< ", host: " << HostName()
318+
<< ", canAllocateExtraMemory: " << CanAllocateExtraMemory;
319+
320+
if (!memoryConsumptionDetails.empty()) {
321+
failureReason << ", memory manager details: " << memoryConsumptionDetails;
322+
}
323+
324+
InternalError(NYql::NDqProto::StatusIds::OVERLOADED, TIssuesIds::KIKIMR_PRECONDITION_FAILED, failureReason);
325+
}
326+
319327
void ProcessOutputsImpl(ERunStatus status) {
320328
ProcessOutputsState.LastRunStatus = status;
321329

@@ -652,7 +660,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
652660
OutputTransformsMap.at(outputIndex).FinishIsAcknowledged = true;
653661
ContinueExecute(EResumeSource::CATransformFinished);
654662
}
655-
663+
656664
protected: //TDqComputeActorCheckpoints::ICallbacks
657665
//bool ReadyToCheckpoint() is pure and must be overriden in a derived class
658666

@@ -1491,7 +1499,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
14911499
if (inputDesc.HasSource()) {
14921500
const auto watermarksMode = inputDesc.GetSource().GetWatermarksMode();
14931501
auto result = SourcesMap.emplace(
1494-
i,
1502+
i,
14951503
static_cast<TDerived*>(this)->CreateInputHelper(LogPrefix, i, watermarksMode)
14961504
);
14971505
YQL_ENSURE(result.second);
@@ -1669,6 +1677,9 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
16691677
} else {
16701678
// in basic mode enum sources directly
16711679
for (auto& [inputIndex, sourceInfo] : SourcesMap) {
1680+
if (!sourceInfo.AsyncInput)
1681+
continue;
1682+
16721683
const auto& ingressStats = sourceInfo.AsyncInput->GetIngressStats();
16731684
ingressBytes += ingressStats.Bytes;
16741685
// ingress rows are usually not reported, so we count rows in task runner input
@@ -1691,6 +1702,9 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
16911702

16921703
for (auto& [outputIndex, sinkInfo] : SinksMap) {
16931704
if (auto* sink = GetSink(outputIndex, sinkInfo)) {
1705+
if (!sinkInfo.AsyncOutput)
1706+
continue;
1707+
16941708
const auto& egressStats = sinkInfo.AsyncOutput->GetEgressStats();
16951709
const auto& pushStats = sink->GetPushStats();
16961710
if (RuntimeSettings.CollectFull()) {

ydb/library/yql/providers/dq/actors/worker_actor.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,10 @@ class TDummyMemoryQuotaManager: public IMemoryQuotaManager {
8282
return std::numeric_limits<ui64>::max();
8383
}
8484

85+
TString MemoryConsumptionDetails() const override {
86+
return TString();
87+
}
88+
8589
bool IsReasonableToUseSpilling() const override {
8690
return false;
8791
}

0 commit comments

Comments
 (0)