Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion ydb/core/fq/libs/init/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,6 @@ void Init(
lwmOptions.MkqlProgramHardMemoryLimit = protoConfig.GetResourceManager().GetMkqlTaskHardMemoryLimit();
lwmOptions.MkqlMinAllocSize = mkqlAllocSize;
lwmOptions.TaskRunnerActorFactory = NYql::NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory(
*appData->FunctionRegistry,
[=](NKikimr::NMiniKQL::TScopedAlloc& alloc, const NYql::NDq::TDqTaskSettings& task, NYql::NDqProto::EDqStatsMode statsMode, const NYql::NDq::TLogFunc&) {
return lwmOptions.Factory->Get(alloc, task, statsMode);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
}
}
std::tie(TaskRunnerActor, actor) = TaskRunnerActorFactory->Create(
this, GetTxId(), Task.GetId(), std::move(inputWithDisabledCheckpointing), InitMemoryQuota());
this, TBase::GetAllocatorPtr(), GetTxId(), Task.GetId(), std::move(inputWithDisabledCheckpointing), InitMemoryQuota());
TaskRunnerActorId = RegisterWithSameMailbox(actor);

TDqTaskRunnerMemoryLimits limits;
Expand Down
3 changes: 3 additions & 0 deletions ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1299,6 +1299,9 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
}

protected:
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> GetAllocatorPtr() {
return Alloc;
}
NKikimr::NMiniKQL::TScopedAlloc& GetAllocator() {
return *Alloc.get();
}
Expand Down
5 changes: 4 additions & 1 deletion ydb/library/yql/dq/actors/task_runner/task_runner_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,16 @@ struct ITaskRunnerActorFactory {

virtual std::tuple<ITaskRunnerActor*, NActors::IActor*> Create(
ITaskRunnerActor::ICallbacks* parent,
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
const TTxId& txId,
ui64 taskId,
THashSet<ui32>&& inputChannelsWithDisabledCheckpoints = {},
THolder<NYql::NDq::TDqMemoryQuota>&& memoryQuota = {}) = 0;
};

ITaskRunnerActorFactory::TPtr CreateLocalTaskRunnerActorFactory(const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry, const TTaskRunnerFactory& factory);
// temporary for YQL-17542
#define Y_YQL_DQ_TASK_RUNNER_ACTOR_FACTORY_COMPATIBILITY_1
ITaskRunnerActorFactory::TPtr CreateLocalTaskRunnerActorFactory(const TTaskRunnerFactory& factory);

} // namespace NTaskRunnerActor

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,16 @@ class TLocalTaskRunnerActor
public:
static constexpr char ActorName[] = "YQL_DQ_TASK_RUNNER";

TLocalTaskRunnerActor(ITaskRunnerActor::ICallbacks* parent, const TTaskRunnerFactory& factory, const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry, const TTxId& txId, ui64 taskId, THashSet<ui32>&& inputChannelsWithDisabledCheckpoints, THolder<NYql::NDq::TDqMemoryQuota>&& memoryQuota)
TLocalTaskRunnerActor(ITaskRunnerActor::ICallbacks* parent, const TTaskRunnerFactory& factory, std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, const TTxId& txId, ui64 taskId, THashSet<ui32>&& inputChannelsWithDisabledCheckpoints, THolder<NYql::NDq::TDqMemoryQuota>&& memoryQuota)
: TActor<TLocalTaskRunnerActor>(&TLocalTaskRunnerActor::Handler)
, FuncRegistry(funcRegistry)
, Alloc(alloc)
, Parent(parent)
, Factory(factory)
, TxId(txId)
, TaskId(taskId)
, InputChannelsWithDisabledCheckpoints(std::move(inputChannelsWithDisabledCheckpoints))
, MemoryQuota(std::move(memoryQuota))
{
Alloc = std::make_unique<NKikimr::NMiniKQL::TScopedAlloc>(
__LOCATION__,
NKikimr::TAlignedPagePoolCounters(),
FuncRegistry.SupportsSizedAllocators(),
false
);
}

~TLocalTaskRunnerActor()
Expand Down Expand Up @@ -471,8 +465,7 @@ class TLocalTaskRunnerActor
THolder<TEvDq::TEvAbortExecution> GetError(const TString& message) {
return MakeHolder<TEvDq::TEvAbortExecution>(NYql::NDqProto::StatusIds::BAD_REQUEST, TVector<TIssue>{TIssue(message).SetCode(TIssuesIds::DQ_GATEWAY_ERROR, TSeverityIds::S_ERROR)});
}
const NKikimr::NMiniKQL::IFunctionRegistry& FuncRegistry;
std::unique_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;

NActors::TActorId ParentId;
ITaskRunnerActor::ICallbacks* Parent;
Expand All @@ -487,32 +480,31 @@ class TLocalTaskRunnerActor
};

struct TLocalTaskRunnerActorFactory: public ITaskRunnerActorFactory {
TLocalTaskRunnerActorFactory(const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry, const TTaskRunnerFactory& factory)
TLocalTaskRunnerActorFactory(const TTaskRunnerFactory& factory)
: Factory(factory)
, FuncRegistry(funcRegistry)
{ }

std::tuple<ITaskRunnerActor*, NActors::IActor*> Create(
ITaskRunnerActor::ICallbacks* parent,
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
const TTxId& txId,
ui64 taskId,
THashSet<ui32>&& inputChannelsWithDisabledCheckpoints,
THolder<NYql::NDq::TDqMemoryQuota>&& memoryQuota) override
{
auto* actor = new TLocalTaskRunnerActor(parent, Factory, FuncRegistry, txId, taskId, std::move(inputChannelsWithDisabledCheckpoints), std::move(memoryQuota));
auto* actor = new TLocalTaskRunnerActor(parent, Factory, alloc, txId, taskId, std::move(inputChannelsWithDisabledCheckpoints), std::move(memoryQuota));
return std::make_tuple(
static_cast<ITaskRunnerActor*>(actor),
static_cast<NActors::IActor*>(actor)
);
}

TTaskRunnerFactory Factory;
const NKikimr::NMiniKQL::IFunctionRegistry& FuncRegistry;
};

ITaskRunnerActorFactory::TPtr CreateLocalTaskRunnerActorFactory(const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry, const TTaskRunnerFactory& factory)
ITaskRunnerActorFactory::TPtr CreateLocalTaskRunnerActorFactory(const TTaskRunnerFactory& factory)
{
return ITaskRunnerActorFactory::TPtr(new TLocalTaskRunnerActorFactory(funcRegistry, factory));
return ITaskRunnerActorFactory::TPtr(new TLocalTaskRunnerActorFactory(factory));
}

} // namespace NTaskRunnerActor
Expand Down
11 changes: 10 additions & 1 deletion ydb/library/yql/providers/dq/actors/worker_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,16 @@ class TDqWorker: public TRichActor<TDqWorker>
}

NActors::IActor* actor;
std::tie(Actor, actor) = TaskRunnerActorFactory->Create(this, TraceId, Task.GetId());
std::tie(Actor, actor) = TaskRunnerActorFactory->Create(
this,
std::make_shared<NKikimr::NMiniKQL::TScopedAlloc>(
__LOCATION__,
NKikimr::TAlignedPagePoolCounters(),
true,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Вкусовщина, но я бы добавил inline комменты с именами параметров для этих булевых значений.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

И еще вопрос, хотя worker_actor планируется вообще выпилить. но FuncRegistry.SupportsSizedAllocators() всегда true выдает?

Copy link
Collaborator Author

@zverevgeny zverevgeny Jan 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Не знаю, но не стал тут усложнять. Тесты проходят, а до конца недели я надеюсь выпилить TDqWorkerActor. Уже даже PR есть: https://github.com/ydb-platform/ydb/pulls/zverevgeny

false
),
TraceId,
Task.GetId());
TaskRunnerActor = RegisterLocalChild(actor);
TDqTaskRunnerMemoryLimits limits; // used for local mode only
limits.ChannelBufferSize = 20_MB;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ class TGlobalWorkerManagerTest: public TTestBase {
NYql::NDqs::TLocalWorkerManagerOptions lwmOptions;
lwmOptions.TaskRunnerInvokerFactory = new NDqs::TTaskRunnerInvokerFactory();
lwmOptions.TaskRunnerActorFactory = NYql::NDq::NTaskRunnerActor::CreateTaskRunnerActorFactory(
lwmOptions.Factory, lwmOptions.TaskRunnerInvokerFactory, nullptr);
lwmOptions.Factory, lwmOptions.TaskRunnerInvokerFactory);
auto localWM = CreateLocalWorkerManager(lwmOptions);
ActorRuntime_->AddLocalService(MakeWorkerManagerActorID(NodeId(i)),
TActorSetupCmd{localWM, TMailboxType::Simple, 0}, i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ class TLocalServiceHolder {
lwmOptions.FunctionRegistry = functionRegistry;
lwmOptions.TaskRunnerInvokerFactory = new NDqs::TTaskRunnerInvokerFactory();
lwmOptions.TaskRunnerActorFactory = NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory(
*functionRegistry,
[factory=lwmOptions.Factory](NKikimr::NMiniKQL::TScopedAlloc& alloc, const NDq::TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const NDq::TLogFunc& )
{
return factory->Get(alloc, task, statsMode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,18 +191,18 @@ class TTaskRunnerActor

TTaskRunnerActor(
ITaskRunnerActor::ICallbacks* parent,
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
const NTaskRunnerProxy::IProxyFactory::TPtr& factory,
const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry,
const ITaskRunnerInvoker::TPtr& invoker,
const TTxId& txId,
ui64 taskId,
TWorkerRuntimeData* runtimeData)
: TActor<TTaskRunnerActor>(&TTaskRunnerActor::Handler)
, Parent(parent)
, Alloc(alloc)
, TraceId(TStringBuilder() << txId)
, TaskId(taskId)
, Factory(factory)
, FuncRegistry(funcRegistry)
, Invoker(invoker)
, Local(Invoker->IsLocal())
, Settings(MakeIntrusive<TDqConfiguration>())
Expand Down Expand Up @@ -608,14 +608,6 @@ class TTaskRunnerActor
StageId = taskMeta.GetStageId();

NDq::TDqTaskSettings settings(&ev->Get()->Task);
YQL_ENSURE(!Alloc);
YQL_ENSURE(FuncRegistry);
Alloc = std::make_unique<NKikimr::NMiniKQL::TScopedAlloc>(
__LOCATION__,
NKikimr::TAlignedPagePoolCounters(),
FuncRegistry->SupportsSizedAllocators(),
false
);
TaskRunner = Factory->GetOld(*Alloc.get(), settings, TraceId);
} catch (...) {
TString message = "Could not create TaskRunner for " + ToString(taskId) + " on node " + ToString(replyTo.NodeId()) + ", error: " + CurrentExceptionMessage();
Expand Down Expand Up @@ -746,13 +738,12 @@ class TTaskRunnerActor
}
}

std::unique_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
NActors::TActorId ParentId;
ITaskRunnerActor::ICallbacks* Parent;
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
const TString TraceId;
const ui64 TaskId;
NTaskRunnerProxy::IProxyFactory::TPtr Factory;
const NKikimr::NMiniKQL::IFunctionRegistry* FuncRegistry;
NTaskRunnerProxy::ITaskRunner::TPtr TaskRunner;
ITaskRunnerInvoker::TPtr Invoker;
bool Local;
Expand All @@ -773,22 +764,21 @@ class TTaskRunnerActorFactory: public ITaskRunnerActorFactory {
TTaskRunnerActorFactory(
const NTaskRunnerProxy::IProxyFactory::TPtr& proxyFactory,
const NDqs::ITaskRunnerInvokerFactory::TPtr& invokerFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry,
TWorkerRuntimeData* runtimeData)
TWorkerRuntimeData* runtimeData)
: ProxyFactory(proxyFactory)
, InvokerFactory(invokerFactory)
, FuncRegistry(funcRegistry)
, RuntimeData(runtimeData)
{ }

std::tuple<ITaskRunnerActor*, NActors::IActor*> Create(
ITaskRunnerActor::ICallbacks* parent,
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
const TTxId& txId,
ui64 taskId,
THashSet<ui32>&&,
THolder<NYql::NDq::TDqMemoryQuota>&&) override
{
auto* actor = new TTaskRunnerActor(parent, ProxyFactory, FuncRegistry, InvokerFactory->Create(), txId, taskId, RuntimeData);
auto* actor = new TTaskRunnerActor(parent, alloc, ProxyFactory, InvokerFactory->Create(), txId, taskId, RuntimeData);
return std::make_tuple(
static_cast<ITaskRunnerActor*>(actor),
static_cast<NActors::IActor*>(actor)
Expand All @@ -798,17 +788,15 @@ class TTaskRunnerActorFactory: public ITaskRunnerActorFactory {
private:
NTaskRunnerProxy::IProxyFactory::TPtr ProxyFactory;
NDqs::ITaskRunnerInvokerFactory::TPtr InvokerFactory;
const NKikimr::NMiniKQL::IFunctionRegistry* FuncRegistry;
TWorkerRuntimeData* RuntimeData;
};

ITaskRunnerActorFactory::TPtr CreateTaskRunnerActorFactory(
const NTaskRunnerProxy::IProxyFactory::TPtr& proxyFactory,
const NDqs::ITaskRunnerInvokerFactory::TPtr& invokerFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry,
TWorkerRuntimeData* runtimeData)
{
return ITaskRunnerActorFactory::TPtr(new TTaskRunnerActorFactory(proxyFactory, invokerFactory, funcRegistry, runtimeData));
return ITaskRunnerActorFactory::TPtr(new TTaskRunnerActorFactory(proxyFactory, invokerFactory, runtimeData));
}

} // namespace NTaskRunnerActor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ namespace NTaskRunnerActor {
ITaskRunnerActorFactory::TPtr CreateTaskRunnerActorFactory(
const NTaskRunnerProxy::IProxyFactory::TPtr& proxyFactory,
const NDqs::ITaskRunnerInvokerFactory::TPtr& invokerFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry,
TWorkerRuntimeData* runtimeData = nullptr);

} // namespace NTaskRunnerActor
Expand Down
4 changes: 2 additions & 2 deletions ydb/library/yql/tools/dq/worker_node/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -400,11 +400,11 @@ int main(int argc, char** argv) {
: TTaskRunnerInvokerFactory::TPtr(new TConcurrentInvokerFactory(2*capacity));
YQL_ENSURE(functionRegistry);
lwmOptions.TaskRunnerActorFactory = disablePipe
? NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory(*functionRegistry.Get(), [=](NKikimr::NMiniKQL::TScopedAlloc& alloc, const NDq::TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const NDq::TLogFunc& )
? NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory([=](NKikimr::NMiniKQL::TScopedAlloc& alloc, const NDq::TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const NDq::TLogFunc& )
{
return lwmOptions.Factory->Get(alloc, task, statsMode);
})
: NTaskRunnerActor::CreateTaskRunnerActorFactory(lwmOptions.Factory, lwmOptions.TaskRunnerInvokerFactory, functionRegistry.Get());
: NTaskRunnerActor::CreateTaskRunnerActorFactory(lwmOptions.Factory, lwmOptions.TaskRunnerInvokerFactory);
lwmOptions.ComputeActorOwnsCounters = true;
bool enableSpilling = res.Has("enable-spilling");
auto resman = NDqs::CreateLocalWorkerManager(lwmOptions);
Expand Down