Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions ydb/core/kqp/executer_actor/kqp_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,10 @@ std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> TKqpPlanner::SerializeReque
}

request.SetSchedulerGroup(UserRequestContext->PoolId);
request.SetMemoryPoolPercent(UserRequestContext->PoolConfig->QueryMemoryLimitPercentPerNode);
request.SetDatabase(Database);
if (UserRequestContext->PoolConfig.has_value()) {
request.SetMemoryPoolPercent(UserRequestContext->PoolConfig->QueryMemoryLimitPercentPerNode);
}

return result;
}
Expand Down Expand Up @@ -351,9 +354,14 @@ TString TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize)
NYql::NDqProto::TDqTask* taskDesc = ArenaSerializeTaskToProto(TasksGraph, task, true);
NYql::NDq::TComputeRuntimeSettings settings;
if (!TxInfo) {
double memoryPoolPercent = 100;
if (UserRequestContext->PoolConfig.has_value()) {
memoryPoolPercent = UserRequestContext->PoolConfig->QueryMemoryLimitPercentPerNode;
}

TxInfo = MakeIntrusive<NRm::TTxState>(
TxId, TInstant::Now(), ResourceManager_->GetCounters(),
UserRequestContext->PoolId, UserRequestContext->PoolConfig->QueryMemoryLimitPercentPerNode);
UserRequestContext->PoolId, memoryPoolPercent, Database);
}

auto startResult = CaFactory_->CreateKqpComputeActor({
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/node_service/kqp_node_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {

TIntrusivePtr<NRm::TTxState> txInfo = MakeIntrusive<NRm::TTxState>(
txId, TInstant::Now(), ResourceManager_->GetCounters(),
msg.GetSchedulerGroup(), msg.GetMemoryPoolPercent());
msg.GetSchedulerGroup(), msg.GetMemoryPoolPercent(),
msg.GetDatabase());

const ui32 tasksCount = msg.GetTasks().size();
for (auto& dqTask: *msg.MutableTasks()) {
Expand Down
34 changes: 25 additions & 9 deletions ydb/core/kqp/rm_service/kqp_rm_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ class TMemoryResource : public TAtomicRefCount<TMemoryResource> {
SpillingCookie->SpillingPercentReached.store(Available() < OverLimit);
}

ui64 GetUsed() const {
return Used;
}

void Release(ui64 value) {
if (Used > value) {
Used -= value;
Expand All @@ -103,7 +107,7 @@ class TMemoryResource : public TAtomicRefCount<TMemoryResource> {
}

void SetNewLimit(ui64 baseLimit, double memoryPoolPercent, double overPercent) {
if (abs(memoryPoolPercent - MemoryPoolPercent) < MYEPS && baseLimit != BaseLimit)
if (abs(memoryPoolPercent - MemoryPoolPercent) < MYEPS && baseLimit == BaseLimit)
return;

BaseLimit = baseLimit;
Expand Down Expand Up @@ -256,7 +260,7 @@ class TKqpResourceManager : public IKqpResourceManager {
task->TotalMemoryCookie = TotalMemoryResource->GetSpillingCookie();

if (hasScanQueryMemory && !tx->PoolId.empty() && tx->MemoryPoolPercent > 0) {
auto [it, success] = MemoryNamedPools.emplace(tx->PoolId, nullptr);
auto [it, success] = MemoryNamedPools.emplace(tx->MakePoolId(), nullptr);

if (success) {
it->second = MakeIntrusive<TMemoryResource>(TotalMemoryResource->GetLimit(), tx->MemoryPoolPercent, SpillingPercent.load());
Expand Down Expand Up @@ -291,9 +295,15 @@ class TKqpResourceManager : public IKqpResourceManager {
Counters->RmNotEnoughMemory->Inc();
with_lock (Lock) {
TotalMemoryResource->Release(resources.Memory);
auto it = MemoryNamedPools.find(tx->PoolId);
if (it != MemoryNamedPools.end()) {
it->second->Release(resources.Memory);
if (!tx->PoolId.empty()) {
auto it = MemoryNamedPools.find(tx->MakePoolId());
if (it != MemoryNamedPools.end()) {
it->second->Release(resources.Memory);
}

if (it->second->GetUsed() == 0) {
MemoryNamedPools.erase(it);
}
}
}
}
Expand Down Expand Up @@ -356,9 +366,15 @@ class TKqpResourceManager : public IKqpResourceManager {
if (resources.Memory > 0) {
with_lock (Lock) {
TotalMemoryResource->Release(resources.Memory);
auto it = MemoryNamedPools.find(tx->PoolId);
if (it != MemoryNamedPools.end()) {
it->second->Release(resources.Memory);
if (!tx->PoolId.empty()) {
auto it = MemoryNamedPools.find(tx->MakePoolId());
if (it != MemoryNamedPools.end()) {
it->second->Release(resources.Memory);

if (it->second->GetUsed() == 0) {
MemoryNamedPools.erase(it);
}
}
}
}
}
Expand Down Expand Up @@ -509,7 +525,7 @@ class TKqpResourceManager : public IKqpResourceManager {
std::shared_ptr<TResourceSnapshotState> ResourceSnapshotState;
TActorId ResourceInfoExchanger = TActorId();

absl::flat_hash_map<TString, TIntrusivePtr<TMemoryResource>> MemoryNamedPools;
absl::flat_hash_map<std::pair<TString, TString>, TIntrusivePtr<TMemoryResource>, THash<std::pair<TString, TString>>> MemoryNamedPools;
};

struct TResourceManagers {
Expand Down
25 changes: 20 additions & 5 deletions ydb/core/kqp/rm_service/kqp_rm_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <array>
#include <bitset>
#include <functional>
#include <utility>


namespace NKikimr {
Expand Down Expand Up @@ -115,31 +116,45 @@ class TTxState : public TAtomicRefCount<TTxState> {
TIntrusivePtr<TKqpCounters> Counters;
const TString PoolId;
const double MemoryPoolPercent;
const TString Database;

private:
std::atomic<ui64> TxScanQueryMemory = 0;
std::atomic<ui64> TxExternalDataQueryMemory = 0;
std::atomic<ui32> TxExecutionUnits = 0;

public:
explicit TTxState(ui64 txId, TInstant now, TIntrusivePtr<TKqpCounters> counters, const TString& poolId, const double memoryPoolPercent)
explicit TTxState(ui64 txId, TInstant now, TIntrusivePtr<TKqpCounters> counters, const TString& poolId, const double memoryPoolPercent,
const TString& database)
: TxId(txId)
, CreatedAt(now)
, Counters(std::move(counters))
, PoolId(poolId)
, MemoryPoolPercent(memoryPoolPercent)
, Database(database)
{}

std::pair<TString, TString> MakePoolId() const {
return std::make_pair(Database, PoolId);
}

TString ToString() const {
return TStringBuilder() << "TxResourcesInfo{ "
auto res = TStringBuilder() << "TxResourcesInfo{ "
<< "TxId: " << TxId
<< ", PoolId: " << PoolId
<< ", MemoryPoolPercent: " << MemoryPoolPercent
<< ", memory initially granted resources: " << TxExternalDataQueryMemory.load()
<< "Database: " << Database;

if (!PoolId.empty()) {
res << ", PoolId: " << PoolId
<< ", MemoryPoolPercent: " << Sprintf("%.2f", MemoryPoolPercent);
}

res << ", memory initially granted resources: " << TxExternalDataQueryMemory.load()
<< ", extra allocations " << TxScanQueryMemory.load()
<< ", execution units: " << TxExecutionUnits.load()
<< ", started at: " << CreatedAt
<< " }";

return res;
}

ui64 GetExtraMemoryAllocatedSize() {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/rm_service/kqp_rm_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ class KqpRm : public TTestBase {
}

TIntrusivePtr<NRm::TTxState> MakeTx(ui64 txId, std::shared_ptr<NRm::IKqpResourceManager> rm) {
return MakeIntrusive<NRm::TTxState>(txId, TInstant::Now(), rm->GetCounters(), "", (double)100);
return MakeIntrusive<NRm::TTxState>(txId, TInstant::Now(), rm->GetCounters(), "", (double)100, "");
}

TIntrusivePtr<NRm::TTaskState> MakeTask(ui64 taskId, TIntrusivePtr<NRm::TTxState> tx) {
Expand Down
35 changes: 35 additions & 0 deletions ydb/core/kqp/ut/query/kqp_limits_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <ydb/core/kqp/counters/kqp_counters.h>
#include <ydb/library/ydb_issue/proto/issue_id.pb.h>

#include <ydb/core/tablet/resource_broker.h>
#include <util/random/random.h>

namespace NKikimr {
Expand All @@ -11,6 +12,38 @@ namespace NKqp {
using namespace NYdb;
using namespace NYdb::NTable;

using namespace NResourceBroker;

NKikimrResourceBroker::TResourceBrokerConfig MakeResourceBrokerTestConfig() {
NKikimrResourceBroker::TResourceBrokerConfig config;

auto queue = config.AddQueues();
queue->SetName("queue_default");
queue->SetWeight(5);
queue->MutableLimit()->AddResource(4);

queue = config.AddQueues();
queue->SetName("queue_kqp_resource_manager");
queue->SetWeight(20);
queue->MutableLimit()->AddResource(4);
queue->MutableLimit()->AddResource(50'000);

auto task = config.AddTasks();
task->SetName("unknown");
task->SetQueueName("queue_default");
task->SetDefaultDuration(TDuration::Seconds(5).GetValue());

task = config.AddTasks();
task->SetName(NLocalDb::KqpResourceManagerTaskName);
task->SetQueueName("queue_kqp_resource_manager");
task->SetDefaultDuration(TDuration::Seconds(5).GetValue());

config.MutableResourceLimit()->AddResource(10);
config.MutableResourceLimit()->AddResource(100'000);

return config;
}

namespace {
bool IsRetryable(const EStatus& status) {
return status == EStatus::OVERLOADED;
Expand Down Expand Up @@ -133,6 +166,8 @@ Y_UNIT_TEST_SUITE(KqpLimits) {
app.MutableTableServiceConfig()->MutableResourceManager()->SetMkqlLightProgramMemoryLimit(10);
app.MutableTableServiceConfig()->MutableResourceManager()->SetQueryMemoryLimit(2000);

app.MutableResourceBrokerConfig()->CopyFrom(MakeResourceBrokerTestConfig());

TKikimrRunner kikimr(app);
CreateLargeTable(kikimr, 0, 0, 0);

Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/kqp.proto
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,7 @@ message TEvStartKqpTasksRequest {
optional string SerializedGUCSettings = 8;
optional string SchedulerGroup = 9;
optional double MemoryPoolPercent = 10 [default = 100];
optional string Database = 11;
}

message TEvStartKqpTasksResponse {
Expand Down