diff --git a/ydb/core/fq/libs/init/init.cpp b/ydb/core/fq/libs/init/init.cpp index b2158db7dffe..0714425a6eb5 100644 --- a/ydb/core/fq/libs/init/init.cpp +++ b/ydb/core/fq/libs/init/init.cpp @@ -258,7 +258,6 @@ void Init( lwmOptions.MkqlProgramHardMemoryLimit = protoConfig.GetResourceManager().GetMkqlTaskHardMemoryLimit(); lwmOptions.MkqlMinAllocSize = mkqlAllocSize; lwmOptions.TaskRunnerActorFactory = NYql::NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory( - *appData->FunctionRegistry, [=](NKikimr::NMiniKQL::TScopedAlloc& alloc, const NYql::NDq::TDqTaskSettings& task, NYql::NDqProto::EDqStatsMode statsMode, const NYql::NDq::TLogFunc&) { return lwmOptions.Factory->Get(alloc, task, statsMode); }); diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp index 50ead996d9f9..4c2b2beedfcd 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp @@ -113,7 +113,7 @@ class TDqAsyncComputeActor : public TDqComputeActorBaseCreate( - this, GetTxId(), Task.GetId(), std::move(inputWithDisabledCheckpointing), InitMemoryQuota()); + this, TBase::GetAllocatorPtr(), GetTxId(), Task.GetId(), std::move(inputWithDisabledCheckpointing), InitMemoryQuota()); TaskRunnerActorId = RegisterWithSameMailbox(actor); TDqTaskRunnerMemoryLimits limits; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index c06bfb99d37f..321f9e44fe22 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -1299,6 +1299,9 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped } protected: + std::shared_ptr GetAllocatorPtr() { + return Alloc; + } NKikimr::NMiniKQL::TScopedAlloc& GetAllocator() { return *Alloc.get(); } diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor.h b/ydb/library/yql/dq/actors/task_runner/task_runner_actor.h index ebd3dd083883..295e5ce693ca 100644 --- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor.h +++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor.h @@ -43,13 +43,16 @@ struct ITaskRunnerActorFactory { virtual std::tuple Create( ITaskRunnerActor::ICallbacks* parent, + std::shared_ptr alloc, const TTxId& txId, ui64 taskId, THashSet&& inputChannelsWithDisabledCheckpoints = {}, THolder&& memoryQuota = {}) = 0; }; -ITaskRunnerActorFactory::TPtr CreateLocalTaskRunnerActorFactory(const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry, const TTaskRunnerFactory& factory); +// temporary for YQL-17542 +#define Y_YQL_DQ_TASK_RUNNER_ACTOR_FACTORY_COMPATIBILITY_1 +ITaskRunnerActorFactory::TPtr CreateLocalTaskRunnerActorFactory(const TTaskRunnerFactory& factory); } // namespace NTaskRunnerActor diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp index 9cf25192b63a..2cb8e72eeb0b 100644 --- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp +++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp @@ -38,9 +38,9 @@ class TLocalTaskRunnerActor public: static constexpr char ActorName[] = "YQL_DQ_TASK_RUNNER"; - TLocalTaskRunnerActor(ITaskRunnerActor::ICallbacks* parent, const TTaskRunnerFactory& factory, const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry, const TTxId& txId, ui64 taskId, THashSet&& inputChannelsWithDisabledCheckpoints, THolder&& memoryQuota) + TLocalTaskRunnerActor(ITaskRunnerActor::ICallbacks* parent, const TTaskRunnerFactory& factory, std::shared_ptr alloc, const TTxId& txId, ui64 taskId, THashSet&& inputChannelsWithDisabledCheckpoints, THolder&& memoryQuota) : TActor(&TLocalTaskRunnerActor::Handler) - , FuncRegistry(funcRegistry) + , Alloc(alloc) , Parent(parent) , Factory(factory) , TxId(txId) @@ -48,12 +48,6 @@ class TLocalTaskRunnerActor , InputChannelsWithDisabledCheckpoints(std::move(inputChannelsWithDisabledCheckpoints)) , MemoryQuota(std::move(memoryQuota)) { - Alloc = std::make_unique( - __LOCATION__, - NKikimr::TAlignedPagePoolCounters(), - FuncRegistry.SupportsSizedAllocators(), - false - ); } ~TLocalTaskRunnerActor() @@ -471,8 +465,7 @@ class TLocalTaskRunnerActor THolder GetError(const TString& message) { return MakeHolder(NYql::NDqProto::StatusIds::BAD_REQUEST, TVector{TIssue(message).SetCode(TIssuesIds::DQ_GATEWAY_ERROR, TSeverityIds::S_ERROR)}); } - const NKikimr::NMiniKQL::IFunctionRegistry& FuncRegistry; - std::unique_ptr Alloc; + std::shared_ptr Alloc; NActors::TActorId ParentId; ITaskRunnerActor::ICallbacks* Parent; @@ -487,19 +480,19 @@ class TLocalTaskRunnerActor }; struct TLocalTaskRunnerActorFactory: public ITaskRunnerActorFactory { - TLocalTaskRunnerActorFactory(const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry, const TTaskRunnerFactory& factory) + TLocalTaskRunnerActorFactory(const TTaskRunnerFactory& factory) : Factory(factory) - , FuncRegistry(funcRegistry) { } std::tuple Create( ITaskRunnerActor::ICallbacks* parent, + std::shared_ptr alloc, const TTxId& txId, ui64 taskId, THashSet&& inputChannelsWithDisabledCheckpoints, THolder&& memoryQuota) override { - auto* actor = new TLocalTaskRunnerActor(parent, Factory, FuncRegistry, txId, taskId, std::move(inputChannelsWithDisabledCheckpoints), std::move(memoryQuota)); + auto* actor = new TLocalTaskRunnerActor(parent, Factory, alloc, txId, taskId, std::move(inputChannelsWithDisabledCheckpoints), std::move(memoryQuota)); return std::make_tuple( static_cast(actor), static_cast(actor) @@ -507,12 +500,11 @@ struct TLocalTaskRunnerActorFactory: public ITaskRunnerActorFactory { } TTaskRunnerFactory Factory; - const NKikimr::NMiniKQL::IFunctionRegistry& FuncRegistry; }; -ITaskRunnerActorFactory::TPtr CreateLocalTaskRunnerActorFactory(const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry, const TTaskRunnerFactory& factory) +ITaskRunnerActorFactory::TPtr CreateLocalTaskRunnerActorFactory(const TTaskRunnerFactory& factory) { - return ITaskRunnerActorFactory::TPtr(new TLocalTaskRunnerActorFactory(funcRegistry, factory)); + return ITaskRunnerActorFactory::TPtr(new TLocalTaskRunnerActorFactory(factory)); } } // namespace NTaskRunnerActor diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp index 0d1e760b2ee8..a4588de0a19b 100644 --- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp @@ -253,7 +253,16 @@ class TDqWorker: public TRichActor } NActors::IActor* actor; - std::tie(Actor, actor) = TaskRunnerActorFactory->Create(this, TraceId, Task.GetId()); + std::tie(Actor, actor) = TaskRunnerActorFactory->Create( + this, + std::make_shared( + __LOCATION__, + NKikimr::TAlignedPagePoolCounters(), + true, + false + ), + TraceId, + Task.GetId()); TaskRunnerActor = RegisterLocalChild(actor); TDqTaskRunnerMemoryLimits limits; // used for local mode only limits.ChannelBufferSize = 20_MB; diff --git a/ydb/library/yql/providers/dq/global_worker_manager/global_worker_manager_ut.cpp b/ydb/library/yql/providers/dq/global_worker_manager/global_worker_manager_ut.cpp index 2f4f4ed03252..07438e01c698 100644 --- a/ydb/library/yql/providers/dq/global_worker_manager/global_worker_manager_ut.cpp +++ b/ydb/library/yql/providers/dq/global_worker_manager/global_worker_manager_ut.cpp @@ -182,7 +182,7 @@ class TGlobalWorkerManagerTest: public TTestBase { NYql::NDqs::TLocalWorkerManagerOptions lwmOptions; lwmOptions.TaskRunnerInvokerFactory = new NDqs::TTaskRunnerInvokerFactory(); lwmOptions.TaskRunnerActorFactory = NYql::NDq::NTaskRunnerActor::CreateTaskRunnerActorFactory( - lwmOptions.Factory, lwmOptions.TaskRunnerInvokerFactory, nullptr); + lwmOptions.Factory, lwmOptions.TaskRunnerInvokerFactory); auto localWM = CreateLocalWorkerManager(lwmOptions); ActorRuntime_->AddLocalService(MakeWorkerManagerActorID(NodeId(i)), TActorSetupCmd{localWM, TMailboxType::Simple, 0}, i); diff --git a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp index 06b76ae2129a..ac5552080c78 100644 --- a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp +++ b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp @@ -67,7 +67,6 @@ class TLocalServiceHolder { lwmOptions.FunctionRegistry = functionRegistry; lwmOptions.TaskRunnerInvokerFactory = new NDqs::TTaskRunnerInvokerFactory(); lwmOptions.TaskRunnerActorFactory = NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory( - *functionRegistry, [factory=lwmOptions.Factory](NKikimr::NMiniKQL::TScopedAlloc& alloc, const NDq::TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const NDq::TLogFunc& ) { return factory->Get(alloc, task, statsMode); diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp index 771e27e764ad..55f844a07663 100644 --- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp +++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp @@ -191,18 +191,18 @@ class TTaskRunnerActor TTaskRunnerActor( ITaskRunnerActor::ICallbacks* parent, + std::shared_ptr alloc, const NTaskRunnerProxy::IProxyFactory::TPtr& factory, - const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, const ITaskRunnerInvoker::TPtr& invoker, const TTxId& txId, ui64 taskId, TWorkerRuntimeData* runtimeData) : TActor(&TTaskRunnerActor::Handler) , Parent(parent) + , Alloc(alloc) , TraceId(TStringBuilder() << txId) , TaskId(taskId) , Factory(factory) - , FuncRegistry(funcRegistry) , Invoker(invoker) , Local(Invoker->IsLocal()) , Settings(MakeIntrusive()) @@ -608,14 +608,6 @@ class TTaskRunnerActor StageId = taskMeta.GetStageId(); NDq::TDqTaskSettings settings(&ev->Get()->Task); - YQL_ENSURE(!Alloc); - YQL_ENSURE(FuncRegistry); - Alloc = std::make_unique( - __LOCATION__, - NKikimr::TAlignedPagePoolCounters(), - FuncRegistry->SupportsSizedAllocators(), - false - ); TaskRunner = Factory->GetOld(*Alloc.get(), settings, TraceId); } catch (...) { TString message = "Could not create TaskRunner for " + ToString(taskId) + " on node " + ToString(replyTo.NodeId()) + ", error: " + CurrentExceptionMessage(); @@ -746,13 +738,12 @@ class TTaskRunnerActor } } - std::unique_ptr Alloc; NActors::TActorId ParentId; ITaskRunnerActor::ICallbacks* Parent; + std::shared_ptr Alloc; const TString TraceId; const ui64 TaskId; NTaskRunnerProxy::IProxyFactory::TPtr Factory; - const NKikimr::NMiniKQL::IFunctionRegistry* FuncRegistry; NTaskRunnerProxy::ITaskRunner::TPtr TaskRunner; ITaskRunnerInvoker::TPtr Invoker; bool Local; @@ -773,22 +764,21 @@ class TTaskRunnerActorFactory: public ITaskRunnerActorFactory { TTaskRunnerActorFactory( const NTaskRunnerProxy::IProxyFactory::TPtr& proxyFactory, const NDqs::ITaskRunnerInvokerFactory::TPtr& invokerFactory, - const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, - TWorkerRuntimeData* runtimeData) + TWorkerRuntimeData* runtimeData) : ProxyFactory(proxyFactory) , InvokerFactory(invokerFactory) - , FuncRegistry(funcRegistry) , RuntimeData(runtimeData) { } std::tuple Create( ITaskRunnerActor::ICallbacks* parent, + std::shared_ptr alloc, const TTxId& txId, ui64 taskId, THashSet&&, THolder&&) override { - auto* actor = new TTaskRunnerActor(parent, ProxyFactory, FuncRegistry, InvokerFactory->Create(), txId, taskId, RuntimeData); + auto* actor = new TTaskRunnerActor(parent, alloc, ProxyFactory, InvokerFactory->Create(), txId, taskId, RuntimeData); return std::make_tuple( static_cast(actor), static_cast(actor) @@ -798,17 +788,15 @@ class TTaskRunnerActorFactory: public ITaskRunnerActorFactory { private: NTaskRunnerProxy::IProxyFactory::TPtr ProxyFactory; NDqs::ITaskRunnerInvokerFactory::TPtr InvokerFactory; - const NKikimr::NMiniKQL::IFunctionRegistry* FuncRegistry; TWorkerRuntimeData* RuntimeData; }; ITaskRunnerActorFactory::TPtr CreateTaskRunnerActorFactory( const NTaskRunnerProxy::IProxyFactory::TPtr& proxyFactory, const NDqs::ITaskRunnerInvokerFactory::TPtr& invokerFactory, - const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, TWorkerRuntimeData* runtimeData) { - return ITaskRunnerActorFactory::TPtr(new TTaskRunnerActorFactory(proxyFactory, invokerFactory, funcRegistry, runtimeData)); + return ITaskRunnerActorFactory::TPtr(new TTaskRunnerActorFactory(proxyFactory, invokerFactory, runtimeData)); } } // namespace NTaskRunnerActor diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.h b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.h index 9439364618b7..260b2a7e0de8 100644 --- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.h +++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.h @@ -29,7 +29,6 @@ namespace NTaskRunnerActor { ITaskRunnerActorFactory::TPtr CreateTaskRunnerActorFactory( const NTaskRunnerProxy::IProxyFactory::TPtr& proxyFactory, const NDqs::ITaskRunnerInvokerFactory::TPtr& invokerFactory, - const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, TWorkerRuntimeData* runtimeData = nullptr); } // namespace NTaskRunnerActor diff --git a/ydb/library/yql/tools/dq/worker_node/main.cpp b/ydb/library/yql/tools/dq/worker_node/main.cpp index 16d0e1c49ac7..9e1a9e56f9c0 100644 --- a/ydb/library/yql/tools/dq/worker_node/main.cpp +++ b/ydb/library/yql/tools/dq/worker_node/main.cpp @@ -400,11 +400,11 @@ int main(int argc, char** argv) { : TTaskRunnerInvokerFactory::TPtr(new TConcurrentInvokerFactory(2*capacity)); YQL_ENSURE(functionRegistry); lwmOptions.TaskRunnerActorFactory = disablePipe - ? NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory(*functionRegistry.Get(), [=](NKikimr::NMiniKQL::TScopedAlloc& alloc, const NDq::TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const NDq::TLogFunc& ) + ? NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory([=](NKikimr::NMiniKQL::TScopedAlloc& alloc, const NDq::TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const NDq::TLogFunc& ) { return lwmOptions.Factory->Get(alloc, task, statsMode); }) - : NTaskRunnerActor::CreateTaskRunnerActorFactory(lwmOptions.Factory, lwmOptions.TaskRunnerInvokerFactory, functionRegistry.Get()); + : NTaskRunnerActor::CreateTaskRunnerActorFactory(lwmOptions.Factory, lwmOptions.TaskRunnerInvokerFactory); lwmOptions.ComputeActorOwnsCounters = true; bool enableSpilling = res.Has("enable-spilling"); auto resman = NDqs::CreateLocalWorkerManager(lwmOptions);