diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h b/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h index bc5bb4647979..2633037dcb17 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h @@ -15,7 +15,8 @@ using namespace NYql::NDq; class TKqpTaskRunnerExecutionContext : public TDqTaskRunnerExecutionContext { public: TKqpTaskRunnerExecutionContext(ui64 txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp) - : TDqTaskRunnerExecutionContext(txId, withSpilling, std::move(wakeUp)) + : TDqTaskRunnerExecutionContext(txId, std::move(wakeUp)) + , WithSpilling_(withSpilling) { } @@ -26,6 +27,13 @@ class TKqpTaskRunnerExecutionContext : public TDqTaskRunnerExecutionContext { { return KqpBuildOutputConsumer(outputDesc, type, applyCtx, typeEnv, holderFactory, std::move(outputs)); } + + IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, bool withSpilling) const override { + return TDqTaskRunnerExecutionContext::CreateChannelStorage(channelId, WithSpilling_ || withSpilling); + } + +private: + bool WithSpilling_; }; } // namespace NKqp diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp index 1dfe1fbab6ab..bf16d3b1d135 100644 --- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp @@ -73,8 +73,7 @@ void TKqpComputeActor::DoBootstrap() { auto wakeup = [this]{ ContinueExecute(); }; try { - PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get(TxId), RuntimeSettings.UseSpilling, - std::move(wakeup))); + PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get(TxId), RuntimeSettings.UseSpilling, std::move(wakeup))); } catch (const NMiniKQL::TKqpEnsureFail& e) { InternalError((TIssuesIds::EIssueCode) e.GetCode(), e.GetMessage()); return; diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 78bcd5c52c54..227120f6a40c 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -626,7 +626,7 @@ class TKqpExecuterBase : public TActorBootstrapped { auto& record = channelsInfoEv->Record; for (auto& channelId : channelIds) { - FillChannelDesc(TasksGraph, *record.AddUpdate(), TasksGraph.GetChannel(channelId), TasksGraph.GetMeta().ChannelTransportVersion); + FillChannelDesc(TasksGraph, *record.AddUpdate(), TasksGraph.GetChannel(channelId), TasksGraph.GetMeta().ChannelTransportVersion, false); } LOG_T("Sending channels info to compute actor: " << computeActorId << ", channels: " << channelIds.size()); diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index 2aa6c6412727..1e9f2c00900d 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -59,8 +59,8 @@ constexpr ui32 MAX_NON_PARALLEL_TASKS_EXECUTION_LIMIT = 4; TKqpPlanner::TKqpPlanner(TKqpTasksGraph& graph, ui64 txId, const TActorId& executer, const IKqpGateway::TKqpSnapshot& snapshot, const TString& database, const TIntrusiveConstPtr& userToken, TInstant deadline, - const Ydb::Table::QueryStatsCollection::Mode& statsMode, - bool withSpilling, const TMaybe& rlPath, NWilson::TSpan& executerSpan, + const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool withSpilling, + const TMaybe& rlPath, NWilson::TSpan& executerSpan, TVector&& resourcesSnapshot, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, bool useDataQueryPool, bool localComputeTasks, ui64 mkqlMemoryLimit, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, @@ -289,7 +289,7 @@ std::unique_ptr TKqpPlanner::AssignTasksToNodes() { auto plan = planner->Plan(ResourcesSnapshot, ResourceEstimations); THashMap alreadyAssigned; - for(auto& [nodeId, tasks] : TasksPerNode) { + for(auto& [nodeId, tasks] : TasksPerNode) { for(ui64 taskId: tasks) { alreadyAssigned.emplace(taskId, nodeId); } @@ -366,7 +366,7 @@ void TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, bool shareMailbox, bool op task.ComputeActorId = computeActorId; LOG_D("Executing task: " << taskId << " on compute actor: " << task.ComputeActorId); - + auto result = PendingComputeActors.emplace(task.ComputeActorId, TProgressStat()); YQL_ENSURE(result.second); } @@ -409,7 +409,7 @@ std::unique_ptr TKqpPlanner::PlanExecution() { } ComputeTasks.clear(); } - + if (nComputeTasks == 0 && TasksPerNode.size() == 1 && (AsyncIoFactory != nullptr) && AllowSinglePartitionOpt) { // query affects a single key or shard, so it might be more effective // to execute this task locally so we can avoid useless overhead for remote task launching. @@ -518,8 +518,8 @@ ui32 TKqpPlanner::CalcSendMessageFlagsForNode(ui32 nodeId) { std::unique_ptr CreateKqpPlanner(TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer, const IKqpGateway::TKqpSnapshot& snapshot, const TString& database, const TIntrusiveConstPtr& userToken, TInstant deadline, - const Ydb::Table::QueryStatsCollection::Mode& statsMode, - bool withSpilling, const TMaybe& rlPath, NWilson::TSpan& executerSpan, + const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool withSpilling, + const TMaybe& rlPath, NWilson::TSpan& executerSpan, TVector&& resourcesSnapshot, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, bool useDataQueryPool, bool localComputeTasks, ui64 mkqlMemoryLimit, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, bool doOptimization, const TIntrusivePtr& userRequestContext) diff --git a/ydb/core/kqp/executer_actor/kqp_planner.h b/ydb/core/kqp/executer_actor/kqp_planner.h index 3d0f7f7fa9f5..49e988927fd0 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.h +++ b/ydb/core/kqp/executer_actor/kqp_planner.h @@ -43,8 +43,8 @@ class TKqpPlanner { public: TKqpPlanner(TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer, const IKqpGateway::TKqpSnapshot& snapshot, const TString& database, const TIntrusiveConstPtr& userToken, TInstant deadline, - const Ydb::Table::QueryStatsCollection::Mode& statsMode, - bool withSpilling, const TMaybe& rlPath, NWilson::TSpan& ExecuterSpan, + const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool withSpilling, + const TMaybe& rlPath, NWilson::TSpan& ExecuterSpan, TVector&& resourcesSnapshot, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, bool useDataQueryPool, bool localComputeTasks, ui64 mkqlMemoryLimit, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, bool allowSinglePartitionOpt, const TIntrusivePtr& userRequestContext); @@ -63,7 +63,7 @@ class TKqpPlanner { ui32 GetnComputeTasks(); private: - + const IKqpGateway::TKqpSnapshot& GetSnapshot() const; void ExecuteDataComputeTask(ui64 taskId, bool shareMailbox, bool optimizeProtoForLocalExecution); void PrepareToProcess(); @@ -114,8 +114,8 @@ class TKqpPlanner { std::unique_ptr CreateKqpPlanner(TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer, const IKqpGateway::TKqpSnapshot& snapshot, const TString& database, const TIntrusiveConstPtr& userToken, TInstant deadline, - const Ydb::Table::QueryStatsCollection::Mode& statsMode, - bool withSpilling, const TMaybe& rlPath, NWilson::TSpan& executerSpan, + const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool withSpilling, + const TMaybe& rlPath, NWilson::TSpan& executerSpan, TVector&& resourcesSnapshot, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& ExecuterRetriesConfig, bool useDataQueryPool, bool localComputeTasks, diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp index 0f9ff8525c00..0d6151fcbc3e 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp @@ -760,12 +760,13 @@ void FillEndpointDesc(NDqProto::TEndpoint& endpoint, const TTask& task) { } void FillChannelDesc(const TKqpTasksGraph& tasksGraph, NDqProto::TChannel& channelDesc, const TChannel& channel, - const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion) { + const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, bool enableSpilling) { channelDesc.SetId(channel.Id); channelDesc.SetSrcStageId(channel.SrcStageId.StageId); channelDesc.SetDstStageId(channel.DstStageId.StageId); channelDesc.SetSrcTaskId(channel.SrcTask); channelDesc.SetDstTaskId(channel.DstTask); + channelDesc.SetEnableSpilling(enableSpilling); const auto& resultChannelProxies = tasksGraph.GetMeta().ResultChannelProxies; @@ -955,7 +956,7 @@ void FillTaskMeta(const TStageInfo& stageInfo, const TTask& task, NYql::NDqProto } } -void FillOutputDesc(const TKqpTasksGraph& tasksGraph, NYql::NDqProto::TTaskOutput& outputDesc, const TTaskOutput& output) { +void FillOutputDesc(const TKqpTasksGraph& tasksGraph, NYql::NDqProto::TTaskOutput& outputDesc, const TTaskOutput& output, bool enableSpilling) { switch (output.Type) { case TTaskOutputType::Map: YQL_ENSURE(output.Channels.size() == 1); @@ -1015,7 +1016,7 @@ void FillOutputDesc(const TKqpTasksGraph& tasksGraph, NYql::NDqProto::TTaskOutpu for (auto& channel : output.Channels) { auto& channelDesc = *outputDesc.AddChannels(); - FillChannelDesc(tasksGraph, channelDesc, tasksGraph.GetChannel(channel), tasksGraph.GetMeta().ChannelTransportVersion); + FillChannelDesc(tasksGraph, channelDesc, tasksGraph.GetChannel(channel), tasksGraph.GetMeta().ChannelTransportVersion, enableSpilling); } } @@ -1067,7 +1068,7 @@ void FillInputDesc(const TKqpTasksGraph& tasksGraph, NYql::NDqProto::TTaskInput& for (ui64 channel : input.Channels) { auto& channelDesc = *inputDesc.AddChannels(); - FillChannelDesc(tasksGraph, channelDesc, tasksGraph.GetChannel(channel), tasksGraph.GetMeta().ChannelTransportVersion); + FillChannelDesc(tasksGraph, channelDesc, tasksGraph.GetChannel(channel), tasksGraph.GetMeta().ChannelTransportVersion, false); } if (input.Transform) { @@ -1116,8 +1117,12 @@ void SerializeTaskToProto(const TKqpTasksGraph& tasksGraph, const TTask& task, N FillInputDesc(tasksGraph, *result->AddInputs(), input, serializeAsyncIoSettings); } + bool enableSpilling = false; + if (task.Outputs.size() > 1) { + enableSpilling = AppData()->EnableKqpSpilling; + } for (const auto& output : task.Outputs) { - FillOutputDesc(tasksGraph, *result->AddOutputs(), output); + FillOutputDesc(tasksGraph, *result->AddOutputs(), output, enableSpilling); } const NKqpProto::TKqpPhyStage& stage = stageInfo.Meta.GetStage(stageInfo.Id); diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h index 9bfd88c64576..1a68edf17d16 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h @@ -256,8 +256,8 @@ void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, const TStageInfo& stageIn NYql::NDqProto::TDqTask* ArenaSerializeTaskToProto(TKqpTasksGraph& tasksGraph, const TTask& task, bool serializeAsyncIoSettings); void SerializeTaskToProto(const TKqpTasksGraph& tasksGraph, const TTask& task, NYql::NDqProto::TDqTask* message, bool serializeAsyncIoSettings); void FillTableMeta(const TStageInfo& stageInfo, NKikimrTxDataShard::TKqpTransaction_TTableMeta* meta); -void FillChannelDesc(const TKqpTasksGraph& tasksGraph, NYql::NDqProto::TChannel& channelDesc, - const NYql::NDq::TChannel& channel, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion); +void FillChannelDesc(const TKqpTasksGraph& tasksGraph, NYql::NDqProto::TChannel& channelDesc, const NYql::NDq::TChannel& channel, + const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, bool enableSpilling); template TVector BuildKqpColumns(const Proto& op, TIntrusiveConstPtr tableInfo) { diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp index b80353756869..4a8af81d46dd 100644 --- a/ydb/core/tx/datashard/datashard_kqp.cpp +++ b/ydb/core/tx/datashard/datashard_kqp.cpp @@ -1105,11 +1105,11 @@ class TKqpTaskRunnerExecutionContext : public NDq::IDqTaskRunnerExecutionContext return NKqp::KqpBuildOutputConsumer(outputDesc, type, applyCtx, typeEnv, holderFactory, std::move(outputs)); } - NDq::IDqChannelStorage::TPtr CreateChannelStorage(ui64 /* channelId */) const override { + NDq::IDqChannelStorage::TPtr CreateChannelStorage(ui64 /* channelId */, bool /* withSpilling */) const override { return {}; } - NDq::IDqChannelStorage::TPtr CreateChannelStorage(ui64 /* channelId */, TActorSystem* /* actorSystem */, bool /*isConcurrent*/) const override { + NDq::IDqChannelStorage::TPtr CreateChannelStorage(ui64 /* channelId */, bool /* withSpilling */, TActorSystem* /* actorSystem */, bool /*isConcurrent*/) const override { return {}; } }; diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp index ef5d7768cff6..f9664ff53a68 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp @@ -84,8 +84,7 @@ class TDqAsyncComputeActor : public TDqComputeActorBase Become(&TDqAsyncComputeActor::StateFuncWrapper<&TDqAsyncComputeActor::StateFuncBody>); auto wakeup = [this]{ ContinueExecute(EResumeSource::CABootstrapWakeup); }; - std::shared_ptr execCtx = std::make_shared( - TxId, RuntimeSettings.UseSpilling, std::move(wakeup)); + std::shared_ptr execCtx = std::make_shared(TxId, std::move(wakeup)); Send(TaskRunnerActorId, new NTaskRunnerActor::TEvTaskRunnerCreate( diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp index 1811bb4b171f..14810f7649a0 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp @@ -57,7 +57,7 @@ class TDqComputeActor : public TDqComputeActorBase { auto taskRunner = TaskRunnerFactory(Task, RuntimeSettings.StatsMode, logger); SetTaskRunner(taskRunner); auto wakeup = [this]{ ContinueExecute(EResumeSource::CABootstrapWakeup); }; - TDqTaskRunnerExecutionContext execCtx(TxId, RuntimeSettings.UseSpilling, std::move(wakeup)); + TDqTaskRunnerExecutionContext execCtx(TxId, std::move(wakeup)); PrepareTaskRunner(execCtx); ContinueExecute(EResumeSource::CABootstrap); diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h index 5aac45099870..ba0e2040d1cc 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h @@ -22,7 +22,7 @@ struct TEvDqCompute { struct TEvStateRequest : public NActors::TEventPB {}; struct TEvResumeExecution : public NActors::TEventLocal { - TEvResumeExecution(EResumeSource source) + TEvResumeExecution(EResumeSource source) : Source(source) { } diff --git a/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp b/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp index 9ce940a36aa5..18329312a95f 100644 --- a/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp @@ -6,19 +6,18 @@ namespace NYql { namespace NDq { -TDqTaskRunnerExecutionContext::TDqTaskRunnerExecutionContext(TTxId txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp) +TDqTaskRunnerExecutionContext::TDqTaskRunnerExecutionContext(TTxId txId, IDqChannelStorage::TWakeUpCallback&& wakeUp) : TxId_(txId) , WakeUp_(std::move(wakeUp)) - , WithSpilling_(withSpilling) { } -IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 channelId) const { - return CreateChannelStorage(channelId, nullptr, false); +IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 channelId, bool withSpilling) const { + return CreateChannelStorage(channelId, withSpilling, nullptr, false); } -IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 channelId, NActors::TActorSystem* actorSystem, bool isConcurrent) const { - if (WithSpilling_) { +IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 channelId, bool withSpilling, NActors::TActorSystem* actorSystem, bool isConcurrent) const { + if (withSpilling) { return CreateDqChannelStorage(TxId_, channelId, WakeUp_, actorSystem, isConcurrent); } else { return nullptr; diff --git a/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h b/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h index 0dedb3434bfd..f8dee8625013 100644 --- a/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h +++ b/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h @@ -9,15 +9,14 @@ namespace NDq { class TDqTaskRunnerExecutionContext : public TDqTaskRunnerExecutionContextBase { public: - TDqTaskRunnerExecutionContext(TTxId txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp); + TDqTaskRunnerExecutionContext(TTxId txId, IDqChannelStorage::TWakeUpCallback&& wakeUp); - IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId) const override; - IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, NActors::TActorSystem* actorSystem, bool isConcurrent) const override; + IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, bool withSpilling) const override; + IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, bool withSpilling, NActors::TActorSystem* actorSystem, bool isConcurrent) const override; private: const TTxId TxId_; const IDqChannelStorage::TWakeUpCallback WakeUp_; - const bool WithSpilling_; }; } // namespace NDq diff --git a/ydb/library/yql/dq/actors/spilling/channel_storage_actor.cpp b/ydb/library/yql/dq/actors/spilling/channel_storage_actor.cpp index d64a49066170..cf667219c969 100644 --- a/ydb/library/yql/dq/actors/spilling/channel_storage_actor.cpp +++ b/ydb/library/yql/dq/actors/spilling/channel_storage_actor.cpp @@ -18,18 +18,53 @@ using namespace NActors; namespace { -#define LOG_D(s) \ - LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s) -#define LOG_I(s) \ - LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s) -#define LOG_E(s) \ - LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s) -#define LOG_C(s) \ - LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s) -#define LOG_W(s) \ - LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s) -#define LOG_T(s) \ - LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s) +#define LOG_D(s) { \ + if (ActorSystem_) { \ + LOG_DEBUG_S(*ActorSystem_, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s); \ + } else { \ + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s); \ + } \ +} + +#define LOG_I(s) { \ + if (ActorSystem_) { \ + LOG_INFO_S(*ActorSystem_, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s); \ + } else { \ + LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s); \ + } \ +} + +#define LOG_E(s) { \ + if (ActorSystem_) { \ + LOG_ERROR_S(*ActorSystem_, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s); \ + } else { \ + LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s); \ + } \ +} + +#define LOG_C(s) { \ + if (ActorSystem_) { \ + LOG_CRIT_S(*ActorSystem_, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s); \ + } else { \ + LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s); \ + } \ +} + +#define LOG_W(s) { \ + if (ActorSystem_) { \ + LOG_WARN_S(*ActorSystem_, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s); \ + } else { \ + LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s); \ + } \ +} + +#define LOG_T(s) { \ + if (ActorSystem_) { \ + LOG_TRACE_S(*ActorSystem_, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s); \ + } else { \ + LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s); \ + } \ +} constexpr ui32 MAX_INFLIGHT_BLOBS_COUNT = 10; constexpr ui64 MAX_INFLIGHT_BLOBS_SIZE = 50_MB; diff --git a/ydb/library/yql/dq/actors/spilling/spilling_file.cpp b/ydb/library/yql/dq/actors/spilling/spilling_file.cpp index 6433b54a16e1..9975e7d7f21e 100644 --- a/ydb/library/yql/dq/actors/spilling/spilling_file.cpp +++ b/ydb/library/yql/dq/actors/spilling/spilling_file.cpp @@ -77,7 +77,7 @@ class TDqLocalFileSpillingActor : public TActorBootstrappedSender); - Send(ServiceActorId_, ev->Release().Release()); + Send(ServiceActorId_, ev->Release().Release(), NActors::IEventHandle::FlagTrackDelivery); } void HandleWork(TEvDqSpilling::TEvWriteResult::TPtr& ev) { @@ -109,7 +110,7 @@ class TDqLocalFileSpillingActor : public TActorBootstrappedSender); - Send(ServiceActorId_, ev->Release().Release()); + Send(ServiceActorId_, ev->Release().Release(), NActors::IEventHandle::FlagTrackDelivery); } void HandleWork(TEvDqSpilling::TEvReadResult::TPtr& ev) { @@ -125,17 +126,21 @@ class TDqLocalFileSpillingActor : public TActorBootstrappedSender); - Send(ServiceActorId_, new TEvDqSpillingLocalFile::TEvCloseFile); + Send(ServiceActorId_, new TEvDqSpillingLocalFile::TEvCloseFile, NActors::IEventHandle::FlagTrackDelivery); PassAway(); } + void HandleUndelivered() { + Send(ClientActorId_, new TEvDqSpilling::TEvError("Spilling Service not started")); + } + private: void ValidateSender(const TActorId& sender) { YQL_ENSURE(ClientActorId_ == sender, "" << ClientActorId_ << " != " << sender); } void ClientLost() { - Send(ServiceActorId_, new TEvDqSpillingLocalFile::TEvCloseFile("Client lost")); + Send(ServiceActorId_, new TEvDqSpillingLocalFile::TEvCloseFile("Client lost"), NActors::IEventHandle::FlagTrackDelivery); PassAway(); } diff --git a/ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp b/ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp index da84b2e06634..2752dc811750 100644 --- a/ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp +++ b/ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp @@ -377,12 +377,10 @@ Y_UNIT_TEST_SUITE(DqSpillingFileTests) { } Y_UNIT_TEST(ReadError) { - //return; - TTestActorRuntime runtime; runtime.Initialize(); - runtime.StartSpillingService(); + auto spillingSvc = runtime.StartSpillingService(); auto tester = runtime.AllocateEdgeActor(); auto spillingActor = runtime.StartSpillingActor(tester); @@ -396,15 +394,16 @@ Y_UNIT_TEST_SUITE(DqSpillingFileTests) { UNIT_ASSERT_VALUES_EQUAL(0, resp->Get()->BlobId); } - (runtime.GetSpillingRoot() / "node_1" / "1_test_0").ForceDelete(); + auto nodePath = TFsPath("node_" + std::to_string(spillingSvc.NodeId())); + (runtime.GetSpillingRoot() / nodePath / "1_test_0").ForceDelete(); { auto ev = new TEvDqSpilling::TEvRead(0, true); runtime.Send(new IEventHandle(spillingActor, tester, ev)); auto resp = runtime.GrabEdgeEvent(tester); - auto& err = resp->Get()->Message; - auto expected = "can't open \"" + runtime.GetSpillingRoot().GetPath() + "/node_1/1_test_0\" with mode RdOnly"; + auto err = resp->Get()->Message; + auto expected = "can't open \"" + runtime.GetSpillingRoot().GetPath() + "/" + nodePath.GetPath() +"/1_test_0\" with mode RdOnly"; UNIT_ASSERT_C(err.Contains("No such file or directory"), err); UNIT_ASSERT_C(err.Contains(expected), err); } @@ -451,6 +450,34 @@ Y_UNIT_TEST_SUITE(DqSpillingFileTests) { } } + Y_UNIT_TEST(NoSpillingService) { + TTestActorRuntime runtime; + runtime.Initialize(); + + auto tester = runtime.AllocateEdgeActor(); + auto spillingActor = runtime.StartSpillingActor(tester); + + runtime.WaitBootstrap(); + + // put blob 1 + { + auto ev = new TEvDqSpilling::TEvWrite(1, CreateRope(10, 'a')); + runtime.Send(new IEventHandle(spillingActor, tester, ev)); + + auto resp = runtime.GrabEdgeEvent(tester, TDuration::Seconds(1)); + UNIT_ASSERT_EQUAL("Spilling Service not started", resp->Get()->Message); + } + + // get blob 1 + { + auto ev = new TEvDqSpilling::TEvRead(1); + runtime.Send(new IEventHandle(spillingActor, tester, ev)); + + auto resp = runtime.GrabEdgeEvent(tester, TDuration::Seconds(1)); + UNIT_ASSERT_EQUAL("Spilling Service not started", resp->Get()->Message); + } + } + } // suite } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/proto/dq_tasks.proto b/ydb/library/yql/dq/proto/dq_tasks.proto index a3a6dd91bcd1..942bb476e0d1 100644 --- a/ydb/library/yql/dq/proto/dq_tasks.proto +++ b/ydb/library/yql/dq/proto/dq_tasks.proto @@ -83,6 +83,7 @@ message TChannel { bool InMemory = 8; ECheckpointingMode CheckpointingMode = 9; EWatermarksMode WatermarksMode = 10; + bool EnableSpilling = 13; } message TUnionAllInput { diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp index 4fa8b13063c9..f0c7ee5b653c 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp @@ -643,7 +643,7 @@ class TDqTaskRunner : public IDqTaskRunner { settings.Level = StatsModeToCollectStatsLevel(Settings.StatsMode); if (!outputChannelDesc.GetInMemory()) { - settings.ChannelStorage = execCtx.CreateChannelStorage(channelId); + settings.ChannelStorage = execCtx.CreateChannelStorage(channelId, outputChannelDesc.GetEnableSpilling()); } auto outputChannel = CreateDqOutputChannel(channelId, outputChannelDesc.GetDstStageId(), *taskOutputType, holderFactory, settings, LogFunc); diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h index b50f1ba9045f..6cd08d8156e2 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h @@ -136,8 +136,8 @@ class IDqTaskRunnerExecutionContext { const NKikimr::NMiniKQL::THolderFactory& holderFactory, TVector&& outputs) const = 0; - virtual IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId) const = 0; - virtual IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, NActors::TActorSystem* actorSystem, bool isConcurrent) const = 0; + virtual IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, bool withSpilling) const = 0; + virtual IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, bool withSpilling, NActors::TActorSystem* actorSystem, bool isConcurrent) const = 0; }; class TDqTaskRunnerExecutionContextBase : public IDqTaskRunnerExecutionContext { @@ -151,11 +151,11 @@ class TDqTaskRunnerExecutionContextBase : public IDqTaskRunnerExecutionContext { class TDqTaskRunnerExecutionContextDefault : public TDqTaskRunnerExecutionContextBase { public: - IDqChannelStorage::TPtr CreateChannelStorage(ui64 /*channelId*/) const override { + IDqChannelStorage::TPtr CreateChannelStorage(ui64 /*channelId*/, bool /*withSpilling*/) const override { return {}; }; - IDqChannelStorage::TPtr CreateChannelStorage(ui64 /*channelId*/, NActors::TActorSystem* /*actorSystem*/, bool /*isConcurrent*/) const override { + IDqChannelStorage::TPtr CreateChannelStorage(ui64 /*channelId*/, bool /*withSpilling*/, NActors::TActorSystem* /*actorSystem*/, bool /*isConcurrent*/) const override { return {}; }; diff --git a/ydb/library/yql/providers/dq/actors/compute_actor.cpp b/ydb/library/yql/providers/dq/actors/compute_actor.cpp index ea95feb7da52..b5d251400244 100644 --- a/ydb/library/yql/providers/dq/actors/compute_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/compute_actor.cpp @@ -23,8 +23,7 @@ IActor* CreateComputeActor( const TString& computeActorType, const NDq::NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory, ::NMonitoring::TDynamicCounterPtr taskCounters, - NDqProto::EDqStatsMode statsMode, - bool enableSpilling) + NDqProto::EDqStatsMode statsMode) { auto memoryLimits = NDq::TComputeMemoryLimits(); memoryLimits.ChannelBufferSize = 1000000; @@ -41,7 +40,6 @@ IActor* CreateComputeActor( computeRuntimeSettings.ExtraMemoryAllocationPool = 3; computeRuntimeSettings.FailOnUndelivery = false; computeRuntimeSettings.StatsMode = (statsMode != NDqProto::DQ_STATS_MODE_UNSPECIFIED) ? statsMode : NDqProto::DQ_STATS_MODE_FULL; - computeRuntimeSettings.UseSpilling = enableSpilling; computeRuntimeSettings.AsyncInputPushLimit = 64_MB; // clear fake actorids diff --git a/ydb/library/yql/providers/dq/actors/compute_actor.h b/ydb/library/yql/providers/dq/actors/compute_actor.h index 39c239245fca..807e2d24c1f5 100644 --- a/ydb/library/yql/providers/dq/actors/compute_actor.h +++ b/ydb/library/yql/providers/dq/actors/compute_actor.h @@ -14,7 +14,6 @@ NActors::IActor* CreateComputeActor( const TString& computeActorType, const NDq::NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory, ::NMonitoring::TDynamicCounterPtr taskCounters, - NDqProto::EDqStatsMode statsMode, - bool enableSpilling); + NDqProto::EDqStatsMode statsMode); } // namespace NYql diff --git a/ydb/library/yql/providers/dq/actors/executer_actor.cpp b/ydb/library/yql/providers/dq/actors/executer_actor.cpp index 989e4b3e65bf..411f68af5c3e 100644 --- a/ydb/library/yql/providers/dq/actors/executer_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/executer_actor.cpp @@ -188,7 +188,6 @@ class TDqExecuter: public TRichActor, NYql::TCounters { const TString computeActorType = Settings->ComputeActorType.Get().GetOrElse("sync"); - bool enableSpilling = Settings->SpillingEngine.Get().GetOrElse(TDqSettings::TDefault::SpillingEngine) != TDqSettings::ESpillingEngine::Disable; auto resourceAllocator = RegisterChild(CreateResourceAllocator( GwmActorId, SelfId(), ControlId, workerCount, @@ -196,14 +195,12 @@ class TDqExecuter: public TRichActor, NYql::TCounters { Counters, enableComputeActor ? tasks : TVector(), computeActorType, - StatsMode, - enableSpilling)); + StatsMode)); auto allocateRequest = MakeHolder(workerCount, Username); allocateRequest->Record.SetTraceId(TraceId); allocateRequest->Record.SetCreateComputeActor(enableComputeActor); allocateRequest->Record.SetComputeActorType(computeActorType); allocateRequest->Record.SetStatsMode(StatsMode); - allocateRequest->Record.SetEnableSpilling(enableSpilling); if (enableComputeActor) { ActorIdToProto(ControlId, allocateRequest->Record.MutableResultActorId()); } diff --git a/ydb/library/yql/providers/dq/actors/resource_allocator.cpp b/ydb/library/yql/providers/dq/actors/resource_allocator.cpp index ebbd30f2a591..d272c30711de 100644 --- a/ydb/library/yql/providers/dq/actors/resource_allocator.cpp +++ b/ydb/library/yql/providers/dq/actors/resource_allocator.cpp @@ -51,8 +51,7 @@ class TResourceAllocator: public TRichActor const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, const TVector& tasks, const TString& computeActorType, - NDqProto::EDqStatsMode statsMode, - bool enableSpilling) + NDqProto::EDqStatsMode statsMode) : TRichActor(&TResourceAllocator::Handle) , GwmActor(gwmActor) , SenderId(senderId) @@ -67,7 +66,6 @@ class TResourceAllocator: public TRichActor , Tasks(tasks) , ComputeActorType(computeActorType) , StatsMode(statsMode) - , UseSpilling(enableSpilling) { AllocatedWorkers.resize(workerCount); if (!Tasks.empty()) { @@ -258,7 +256,6 @@ class TResourceAllocator: public TRichActor *request->Record.AddTask() = node.Task; } request->Record.SetStatsMode(StatsMode); - request->Record.SetEnableSpilling(UseSpilling); YQL_CLOG(WARN, ProviderDq) << "Send TEvAllocateWorkersRequest to " << NDqs::NExecutionHelpers::PrettyPrintWorkerInfo(node.WorkerInfo, 0); if (backoff) { TActivationContext::Schedule(backoff, new IEventHandle( @@ -338,7 +335,6 @@ class TResourceAllocator: public TRichActor TVector Tasks; // for compute actor const TString ComputeActorType; NDqProto::EDqStatsMode StatsMode; - bool UseSpilling; }; NActors::IActor* CreateResourceAllocator( @@ -351,10 +347,9 @@ NActors::IActor* CreateResourceAllocator( const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, const TVector& tasks, const TString& computeActorType, - NDqProto::EDqStatsMode statsMode, - bool enableSpilling) + NDqProto::EDqStatsMode statsMode) { - return new TResourceAllocator(gwmActor, senderId, controlId, size, traceId, settings, counters, tasks, computeActorType, statsMode, enableSpilling); + return new TResourceAllocator(gwmActor, senderId, controlId, size, traceId, settings, counters, tasks, computeActorType, statsMode); } } // namespace NYql diff --git a/ydb/library/yql/providers/dq/actors/resource_allocator.h b/ydb/library/yql/providers/dq/actors/resource_allocator.h index fe08a39c7fb5..127442a6f5d0 100644 --- a/ydb/library/yql/providers/dq/actors/resource_allocator.h +++ b/ydb/library/yql/providers/dq/actors/resource_allocator.h @@ -18,6 +18,5 @@ namespace NYql { const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, const TVector& tasks = {}, const TString& computeActorType = "old", - NDqProto::EDqStatsMode statsMode = NDqProto::DQ_STATS_MODE_UNSPECIFIED, - bool enableSpilling = false); + NDqProto::EDqStatsMode statsMode = NDqProto::DQ_STATS_MODE_UNSPECIFIED); } // namespace NYql diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp index 96e69b23d083..0d1e760b2ee8 100644 --- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp @@ -94,15 +94,13 @@ class TDqWorker: public TRichActor const ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory, const IDqAsyncIoFactory::TPtr& asyncIoFactory, TWorkerRuntimeData* runtimeData, - const TString& traceId, - bool useSpilling) + const TString& traceId) : TRichActor(&TDqWorker::Handler) , AsyncIoFactory(asyncIoFactory) , TaskRunnerActorFactory(taskRunnerActorFactory) , RuntimeData(runtimeData) , TraceId(traceId) , MemoryQuotaManager(new TDummyMemoryQuotaManager) - , UseSpilling(useSpilling) { YQL_LOG_CTX_ROOT_SESSION_SCOPE(TraceId); YQL_CLOG(DEBUG, ProviderDq) << "TDqWorker created "; @@ -262,8 +260,7 @@ class TDqWorker: public TRichActor limits.OutputChunkMaxSize = 2_MB; auto wakeup = [this]{ ResumeExecution(EResumeSource::Default); }; - std::shared_ptr execCtx = std::make_shared( - TraceId, UseSpilling, std::move(wakeup)); + std::shared_ptr execCtx = std::make_shared(TraceId, std::move(wakeup)); Send(TaskRunnerActor, new TEvTaskRunnerCreate(std::move(ev->Get()->Record.GetTask()), limits, NDqProto::DQ_STATS_MODE_BASIC, execCtx)); } @@ -796,15 +793,13 @@ class TDqWorker: public TRichActor TVector AllWorkers; IMemoryQuotaManager::TPtr MemoryQuotaManager; - const bool UseSpilling; }; NActors::IActor* CreateWorkerActor( TWorkerRuntimeData* runtimeData, const TString& traceId, const ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory, - const IDqAsyncIoFactory::TPtr& asyncIoFactory, - bool useSpilling) + const IDqAsyncIoFactory::TPtr& asyncIoFactory) { Y_ABORT_UNLESS(taskRunnerActorFactory); return new TLogWrapReceive( @@ -812,8 +807,7 @@ NActors::IActor* CreateWorkerActor( taskRunnerActorFactory, asyncIoFactory, runtimeData, - traceId, - useSpilling), traceId); + traceId), traceId); } } // namespace NYql::NDqs diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.h b/ydb/library/yql/providers/dq/actors/worker_actor.h index 54dc9cf34d36..438578fd97c3 100644 --- a/ydb/library/yql/providers/dq/actors/worker_actor.h +++ b/ydb/library/yql/providers/dq/actors/worker_actor.h @@ -25,7 +25,6 @@ namespace NYql::NDqs { TWorkerRuntimeData* runtimeData, const TString& traceId, const NDq::NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory, - const NDq::IDqAsyncIoFactory::TPtr& asyncIoFactory, - bool useSpilling); + const NDq::IDqAsyncIoFactory::TPtr& asyncIoFactory); } // namespace NYql::NDqs diff --git a/ydb/library/yql/providers/dq/api/protos/dqs.proto b/ydb/library/yql/providers/dq/api/protos/dqs.proto index 69c6b0441d4e..37a7449d305b 100644 --- a/ydb/library/yql/providers/dq/api/protos/dqs.proto +++ b/ydb/library/yql/providers/dq/api/protos/dqs.proto @@ -39,7 +39,7 @@ message TAllocateWorkersRequest { uint64 FreeWorkerAfterMs = 14; NYql.NDqProto.EDqStatsMode StatsMode = 16; - bool EnableSpilling = 17; // false + reserved 17; } message TWorkerGroup { diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.h b/ydb/library/yql/providers/dq/common/yql_dq_settings.h index c0284e015c96..b50224a76e67 100644 --- a/ydb/library/yql/providers/dq/common/yql_dq_settings.h +++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.h @@ -204,6 +204,10 @@ struct TDqSettings { return fastPickle ? NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_FAST_PICKLE_1_0 : NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_PICKLE_1_0; } } + + bool IsSpillingEnabled() const { + return SpillingEngine.Get().GetOrElse(TDqSettings::TDefault::SpillingEngine) != ESpillingEngine::Disable; + } }; struct TDqConfiguration: public TDqSettings, public NCommon::TSettingDispatcher { diff --git a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp index 372b7be9fcdb..5fbe840b6077 100644 --- a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp +++ b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp @@ -73,7 +73,6 @@ class TLocalServiceHolder { }); lwmOptions.Counters = NDqs::TWorkerManagerCounters(lwmGroup); lwmOptions.DropTaskCountersOnFinish = false; - lwmOptions.UseSpilling = withSpilling; auto resman = NDqs::CreateLocalWorkerManager(lwmOptions); ServiceNode->AddLocalService( diff --git a/ydb/library/yql/providers/dq/planner/execution_planner.cpp b/ydb/library/yql/providers/dq/planner/execution_planner.cpp index c2605765e82b..b265d529d904 100644 --- a/ydb/library/yql/providers/dq/planner/execution_planner.cpp +++ b/ydb/library/yql/providers/dq/planner/execution_planner.cpp @@ -443,8 +443,12 @@ namespace NYql::NDqs { } } + bool enableSpilling = false; + if (task.Outputs.size() > 1) { + enableSpilling = Settings->IsSpillingEnabled(); + } for (auto& output : task.Outputs) { - FillOutputDesc(*taskDesc.AddOutputs(), output); + FillOutputDesc(*taskDesc.AddOutputs(), output, enableSpilling); } auto& program = *taskDesc.MutableProgram(); @@ -643,7 +647,7 @@ void TDqsExecutionPlanner::BuildAllPrograms() { } } - void TDqsExecutionPlanner::FillChannelDesc(NDqProto::TChannel& channelDesc, const NDq::TChannel& channel) { + void TDqsExecutionPlanner::FillChannelDesc(NDqProto::TChannel& channelDesc, const NDq::TChannel& channel, bool enableSpilling) { channelDesc.SetId(channel.Id); channelDesc.SetSrcStageId(std::get<2>(StagePrograms[channel.SrcStageId])); channelDesc.SetDstStageId(std::get<2>(StagePrograms[channel.DstStageId])); @@ -651,6 +655,7 @@ void TDqsExecutionPlanner::BuildAllPrograms() { channelDesc.SetDstTaskId(channel.DstTask); channelDesc.SetCheckpointingMode(channel.CheckpointingMode); channelDesc.SetTransportVersion(Settings->GetDataTransportVersion()); + channelDesc.SetEnableSpilling(enableSpilling); if (channel.SrcTask) { NActors::ActorIdToProto(TasksGraph.GetTask(channel.SrcTask).ComputeActorId, @@ -691,11 +696,11 @@ void TDqsExecutionPlanner::BuildAllPrograms() { for (ui64 channel : input.Channels) { auto& channelDesc = *inputDesc.AddChannels(); - FillChannelDesc(channelDesc, TasksGraph.GetChannel(channel)); + FillChannelDesc(channelDesc, TasksGraph.GetChannel(channel), /*enableSpilling*/false); } } - void TDqsExecutionPlanner::FillOutputDesc(NDqProto::TTaskOutput& outputDesc, const TTaskOutput& output) { + void TDqsExecutionPlanner::FillOutputDesc(NDqProto::TTaskOutput& outputDesc, const TTaskOutput& output, bool enableSpilling) { switch (output.Type) { case TTaskOutputType::Map: YQL_ENSURE(output.Channels.size() == 1); @@ -736,7 +741,7 @@ void TDqsExecutionPlanner::BuildAllPrograms() { for (auto& channel : output.Channels) { auto& channelDesc = *outputDesc.AddChannels(); - FillChannelDesc(channelDesc, TasksGraph.GetChannel(channel)); + FillChannelDesc(channelDesc, TasksGraph.GetChannel(channel), enableSpilling); } if (output.Transform) { diff --git a/ydb/library/yql/providers/dq/planner/execution_planner.h b/ydb/library/yql/providers/dq/planner/execution_planner.h index e60a9a5fb212..efd59c6b9ae6 100644 --- a/ydb/library/yql/providers/dq/planner/execution_planner.h +++ b/ydb/library/yql/providers/dq/planner/execution_planner.h @@ -68,9 +68,9 @@ namespace NYql::NDqs { bool BuildReadStage(const NNodes::TDqPhyStage& stage, bool dqSource, bool canFallback); void BuildConnections(const NNodes::TDqPhyStage& stage); void BuildAllPrograms(); - void FillChannelDesc(NDqProto::TChannel& channelDesc, const NDq::TChannel& channel); + void FillChannelDesc(NDqProto::TChannel& channelDesc, const NDq::TChannel& channel, bool enableSpilling); void FillInputDesc(NDqProto::TTaskInput& inputDesc, const TTaskInput& input); - void FillOutputDesc(NDqProto::TTaskOutput& outputDesc, const TTaskOutput& output); + void FillOutputDesc(NDqProto::TTaskOutput& outputDesc, const TTaskOutput& output, bool enableSpilling); void GatherPhyMapping(THashMap, TString>& clusters, THashMap, TString>& tables); void BuildCheckpointingAndWatermarksMode(bool enableCheckpoints, bool enableWatermarks); diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp index 093ffbca84d0..5114d0300c6d 100644 --- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp +++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp @@ -456,7 +456,7 @@ class TTaskRunnerActor auto toPop = ev->Get()->Size; ui64 channelId = ev->Get()->ChannelId; - TSpillingStorageInfo::TPtr spillingStorageInfo = GetSpillingStorage(channelId, actorSystem); + TSpillingStorageInfo::TPtr spillingStorageInfo = GetSpillingStorage(channelId); Invoker->Invoke([spillingStorageInfo, cookie, selfId, channelId=ev->Get()->ChannelId, actorSystem, replyTo, wasFinished, toPop, taskRunner=TaskRunner, settings=Settings, stageId=StageId]() { try { @@ -572,8 +572,10 @@ class TTaskRunnerActor auto cookie = ev->Cookie; auto taskId = ev->Get()->Task.GetId(); auto& inputs = ev->Get()->Task.GetInputs(); + auto& outputs = ev->Get()->Task.GetOutputs(); auto startTime = TInstant::Now(); ExecCtx = ev->Get()->ExecCtx; + auto* actorSystem = TActivationContext::ActorSystem(); for (auto inputId = 0; inputId < inputs.size(); inputId++) { auto& input = inputs[inputId]; @@ -585,6 +587,14 @@ class TTaskRunnerActor } } } + + for (auto outputId = 0; outputId < outputs.size(); outputId++) { + auto& channels = outputs[outputId].GetChannels(); + for (auto& channel : channels) { + CreateSpillingStorage(channel.GetId(), actorSystem, channel.GetEnableSpilling()); + } + } + ParentId = ev->Sender; try { @@ -603,7 +613,6 @@ class TTaskRunnerActor return; } - auto* actorSystem = TActivationContext::ActorSystem(); Invoker->Invoke([taskRunner=TaskRunner, replyTo, selfId, cookie, actorSystem, settings=Settings, stageId=StageId, startTime, clusterName = ClusterName](){ try { //auto guard = taskRunner->BindAllocator(); // only for local mode @@ -705,20 +714,26 @@ class TTaskRunnerActor }); } - TSpillingStorageInfo::TPtr GetSpillingStorage(ui64 channelId, TActorSystem* actorSystem) { + TSpillingStorageInfo::TPtr GetSpillingStorage(ui64 channelId) { + TSpillingStorageInfo::TPtr spillingStorage = nullptr; + auto spillingIt = SpillingStoragesInfos.find(channelId); + if (spillingIt != SpillingStoragesInfos.end()) { + spillingStorage = spillingIt->second; + } + return spillingStorage; + } + + void CreateSpillingStorage(ui64 channelId, TActorSystem* actorSystem, bool enableSpilling) { TSpillingStorageInfo::TPtr spillingStorageInfo = nullptr; - auto channelStorage = ExecCtx->CreateChannelStorage(channelId, actorSystem, true /*isConcurrent*/); + auto channelStorage = ExecCtx->CreateChannelStorage(channelId, enableSpilling, actorSystem, true /*isConcurrent*/); if (channelStorage) { auto spillingIt = SpillingStoragesInfos.find(channelId); - if (spillingIt == SpillingStoragesInfos.end()) { - TSpillingStorageInfo* info = new TSpillingStorageInfo(channelStorage, channelId); - spillingIt = SpillingStoragesInfos.emplace(channelId, info).first; - } - spillingStorageInfo = spillingIt->second; - } + YQL_ENSURE(spillingIt == SpillingStoragesInfos.end()); - return spillingStorageInfo; + TSpillingStorageInfo* info = new TSpillingStorageInfo(channelStorage, channelId); + spillingIt = SpillingStoragesInfos.emplace(channelId, info).first; + } } NActors::TActorId ParentId; diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp index a50b11951980..0ec468e41980 100644 --- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp +++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp @@ -219,7 +219,6 @@ class TLocalWorkerManager: public TWorkerManagerCommon { } bool createComputeActor = ev->Get()->Record.GetCreateComputeActor(); TString computeActorType = ev->Get()->Record.GetComputeActorType(); - bool enableSpilling = Options.UseSpilling && ev->Get()->Record.GetEnableSpilling(); if (createComputeActor && !Options.CanUseComputeActor) { Send(ev->Sender, MakeHolder("Compute Actor Disabled", NYql::NDqProto::StatusIds::BAD_REQUEST), 0, ev->Cookie); @@ -297,15 +296,13 @@ class TLocalWorkerManager: public TWorkerManagerCommon { computeActorType, Options.TaskRunnerActorFactory, taskCounters, - ev->Get()->Record.GetStatsMode(), - enableSpilling)); + ev->Get()->Record.GetStatsMode())); } else { actor.Reset(CreateWorkerActor( Options.RuntimeData, traceId, Options.TaskRunnerActorFactory, - Options.AsyncIoFactory, - enableSpilling)); + Options.AsyncIoFactory)); } allocationInfo.WorkerActors.emplace_back(RegisterChild( actor.Release(), createComputeActor ? NYql::NDq::TEvDq::TEvAbortExecution::Unavailable("Aborted by LWM").Release() : nullptr diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h index b0d11d216c91..9185c43c485f 100644 --- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h +++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h @@ -39,7 +39,6 @@ namespace NYql::NDqs { NActors::TActorId QuoterServiceActorId; bool ComputeActorOwnsCounters = false; bool DropTaskCountersOnFinish = true; - bool UseSpilling = false; }; NActors::IActor* CreateLocalWorkerManager(const TLocalWorkerManagerOptions& options); diff --git a/ydb/library/yql/tools/dq/worker_node/main.cpp b/ydb/library/yql/tools/dq/worker_node/main.cpp index 900eae7e2cd2..e3730e6581c4 100644 --- a/ydb/library/yql/tools/dq/worker_node/main.cpp +++ b/ydb/library/yql/tools/dq/worker_node/main.cpp @@ -406,23 +406,25 @@ int main(int argc, char** argv) { }) : NTaskRunnerActor::CreateTaskRunnerActorFactory(lwmOptions.Factory, lwmOptions.TaskRunnerInvokerFactory); lwmOptions.ComputeActorOwnsCounters = true; - lwmOptions.UseSpilling = res.Has("enable-spilling"); + bool enableSpilling = res.Has("enable-spilling"); auto resman = NDqs::CreateLocalWorkerManager(lwmOptions); auto workerManagerActorId = actorSystem->Register(resman); actorSystem->RegisterLocalService(MakeWorkerManagerActorID(nodeId), workerManagerActorId); - auto spillingActor = actorSystem->Register( - NDq::CreateDqLocalFileSpillingService( - NDq::TFileSpillingServiceConfig{ - .Root = "./spilling", - .CleanupOnShutdown = true - }, - MakeIntrusive(dqSensors) - ) - ); - - actorSystem->RegisterLocalService(NDq::MakeDqLocalFileSpillingServiceID(nodeId), spillingActor); + if (enableSpilling) { + auto spillingActor = actorSystem->Register( + NDq::CreateDqLocalFileSpillingService( + NDq::TFileSpillingServiceConfig{ + .Root = "./spilling", + .CleanupOnShutdown = true + }, + MakeIntrusive(dqSensors) + ) + ); + + actorSystem->RegisterLocalService(NDq::MakeDqLocalFileSpillingServiceID(nodeId), spillingActor); + } auto endFuture = ShouldContinue.GetFuture(); diff --git a/ydb/library/yql/tools/dqrun/dqrun.cpp b/ydb/library/yql/tools/dqrun/dqrun.cpp index 1e99ef6e22af..0c143b77ae10 100644 --- a/ydb/library/yql/tools/dqrun/dqrun.cpp +++ b/ydb/library/yql/tools/dqrun/dqrun.cpp @@ -666,6 +666,12 @@ int RunMain(int argc, const char* argv[]) setting->SetValue("1"); } + if (res.Has("enable-spilling")) { + auto* setting = gatewaysConfig.MutableDq()->AddDefaultSettings(); + setting->SetName("SpillingEngine"); + setting->SetValue("file"); + } + TString defYtServer = gatewaysConfig.HasYt() ? NYql::TConfigClusters::GetDefaultYtServer(gatewaysConfig.GetYt()) : TString(); auto storage = CreateFS(fileStorageCfg, defYtServer); @@ -814,11 +820,11 @@ int RunMain(int argc, const char* argv[]) size_t requestTimeout = gatewaysConfig.HasHttpGateway() && gatewaysConfig.GetHttpGateway().HasRequestTimeoutSeconds() ? gatewaysConfig.GetHttpGateway().GetRequestTimeoutSeconds() : 100; size_t maxRetries = gatewaysConfig.HasHttpGateway() && gatewaysConfig.GetHttpGateway().HasMaxRetries() ? gatewaysConfig.GetHttpGateway().GetMaxRetries() : 2; - const bool enableSpilling = res.Has("enable-spilling"); - dqGateway = CreateLocalDqGateway(funcRegistry.Get(), dqCompFactory, dqTaskTransformFactory, dqTaskPreprocessorFactories, - enableSpilling, CreateAsyncIoFactory(driver, httpGateway, genericClient, requestTimeout, maxRetries), threads, - metricsRegistry, - metricsPusherFactory); + + bool enableSpilling = res.Has("enable-spilling"); + dqGateway = CreateLocalDqGateway(funcRegistry.Get(), dqCompFactory, dqTaskTransformFactory, dqTaskPreprocessorFactories, enableSpilling, + CreateAsyncIoFactory(driver, httpGateway, genericClient, requestTimeout, maxRetries), threads, + metricsRegistry, metricsPusherFactory); } dataProvidersInit.push_back(GetDqDataProviderInitializer(&CreateDqExecTransformer, dqGateway, dqCompFactory, {}, storage));