diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index c06bfb99d37f..94aaa3811ef2 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -1461,32 +1461,6 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped } protected: - void SetTaskRunner(const TIntrusivePtr& taskRunner) { - TaskRunner = taskRunner; - } - - void PrepareTaskRunner(const IDqTaskRunnerExecutionContext& execCtx) { - YQL_ENSURE(TaskRunner); - - auto guard = TaskRunner->BindAllocator(MemoryQuota->GetMkqlMemoryLimit()); - auto* alloc = guard.GetMutex(); - - MemoryQuota->TrySetIncreaseMemoryLimitCallback(alloc); - - TDqTaskRunnerMemoryLimits limits; - limits.ChannelBufferSize = MemoryLimits.ChannelBufferSize; - limits.OutputChunkMaxSize = GetDqExecutionSettings().FlowControl.MaxOutputChunkSize; - - TaskRunner->Prepare(Task, limits, execCtx); - - FillIoMaps( - TaskRunner->GetHolderFactory(), - TaskRunner->GetTypeEnv(), - TaskRunner->GetSecureParams(), - TaskRunner->GetTaskParams(), - TaskRunner->GetReadRanges()); - } - void FillIoMaps( const NKikimr::NMiniKQL::THolderFactory& holderFactory, const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_log.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_log.h index aed3dc524015..c70216b9e42a 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_log.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_log.h @@ -6,19 +6,19 @@ #endif #define CA_LOG_T(s) \ - LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) + LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, this->LogPrefix << s) #define CA_LOG_D(s) \ - LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) + LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, this->LogPrefix << s) #define CA_LOG_I(s) \ - LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) + LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, this->LogPrefix << s) #define CA_LOG_W(s) \ - LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) + LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, this->LogPrefix << s) #define CA_LOG_N(s) \ - LOG_NOTICE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) + LOG_NOTICE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, this->LogPrefix << s) #define CA_LOG_E(s) \ - LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) + LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, this->LogPrefix << s) #define CA_LOG_C(s) \ - LOG_CRIT_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) + LOG_CRIT_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, this->LogPrefix << s) #define CA_LOG(prio, s) \ - LOG_LOG_S(*NActors::TlsActivationContext, prio, NKikimrServices::KQP_COMPUTE, LogPrefix << s) + LOG_LOG_S(*NActors::TlsActivationContext, prio, NKikimrServices::KQP_COMPUTE, this->LogPrefix << s) diff --git a/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h b/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h index 0d10b8e7c861..8d2ffb00924b 100644 --- a/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h +++ b/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h @@ -43,6 +43,32 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase& taskRunner) { + this->TaskRunner = taskRunner; + } + + void PrepareTaskRunner(const IDqTaskRunnerExecutionContext& execCtx) { + YQL_ENSURE(this->TaskRunner); + + auto guard = this->TaskRunner->BindAllocator(this->MemoryQuota->GetMkqlMemoryLimit()); + auto* alloc = guard.GetMutex(); + + this->MemoryQuota->TrySetIncreaseMemoryLimitCallback(alloc); + + TDqTaskRunnerMemoryLimits limits; + limits.ChannelBufferSize = this->MemoryLimits.ChannelBufferSize; + limits.OutputChunkMaxSize = GetDqExecutionSettings().FlowControl.MaxOutputChunkSize; + + this->TaskRunner->Prepare(this->Task, limits, execCtx); + + TBase::FillIoMaps( + this->TaskRunner->GetHolderFactory(), + this->TaskRunner->GetTypeEnv(), + this->TaskRunner->GetSecureParams(), + this->TaskRunner->GetTaskParams(), + this->TaskRunner->GetReadRanges()); + } }; } //namespace NYql::NDq