diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp index f9664ff53a68..50ead996d9f9 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp @@ -1,5 +1,6 @@ #include "dq_compute_actor.h" #include "dq_async_compute_actor.h" +#include "dq_compute_actor_async_input_helper.h" #include "dq_task_runner_exec_ctx.h" #include @@ -24,11 +25,49 @@ bool IsDebugLogEnabled(const TActorSystem* actorSystem) { } // anonymous namespace -class TDqAsyncComputeActor : public TDqComputeActorBase - , public NTaskRunnerActor::ITaskRunnerActor::ICallbacks +struct TComputeActorAsyncInputHelperForTaskRunnerActor : public TComputeActorAsyncInputHelper { - using TBase = TDqComputeActorBase; +public: + TComputeActorAsyncInputHelperForTaskRunnerActor( + const TString& logPrefix, + ui64 index, + NDqProto::EWatermarksMode watermarksMode, + ui64& cookie, + int& inflight + ) + : TComputeActorAsyncInputHelper(logPrefix, index, watermarksMode) + , TaskRunnerActor(nullptr) + , Cookie(cookie) + , Inflight(inflight) + , FreeSpace(1) + , PushStarted(false) + {} + + i64 GetFreeSpace() const override + { + return FreeSpace; + } + + void AsyncInputPush(NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, i64 space, bool finished) override + { + Inflight++; + PushStarted = true; + Finished = finished; + Y_ABORT_UNLESS(TaskRunnerActor); + TaskRunnerActor->AsyncInputPush(Cookie++, Index, std::move(batch), space, finished); + } + + NTaskRunnerActor::ITaskRunnerActor* TaskRunnerActor; + ui64& Cookie; + int& Inflight; + i64 FreeSpace; + bool PushStarted; +}; +class TDqAsyncComputeActor : public TDqComputeActorBase + , public NTaskRunnerActor::ITaskRunnerActor::ICallbacks +{ + using TBase = TDqComputeActorBase; public: static constexpr char ActorName[] = "DQ_COMPUTE_ACTOR"; @@ -81,6 +120,15 @@ class TDqAsyncComputeActor : public TDqComputeActorBase limits.ChannelBufferSize = MemoryLimits.ChannelBufferSize; limits.OutputChunkMaxSize = GetDqExecutionSettings().FlowControl.MaxOutputChunkSize; + for (auto& [_, source]: SourcesMap) { + source.TaskRunnerActor = TaskRunnerActor; + } + + for (auto& [_, source]: InputTransformsMap) { + source.TaskRunnerActor = TaskRunnerActor; + } + + Become(&TDqAsyncComputeActor::StateFuncWrapper<&TDqAsyncComputeActor::StateFuncBody>); auto wakeup = [this]{ ContinueExecute(EResumeSource::CABootstrapWakeup); }; @@ -105,6 +153,20 @@ class TDqAsyncComputeActor : public TDqComputeActorBase } } + template + requires(std::is_base_of::value) + T CreateInputHelper(const TString& logPrefix, + ui64 index, + NDqProto::EWatermarksMode watermarksMode + ) + { + return T(logPrefix, index, watermarksMode, Cookie, ProcessSourcesState.Inflight); + } + + const IDqAsyncInputBuffer* GetInputTransform(ui64 inputIdx, const TComputeActorAsyncInputHelperForTaskRunnerActor&) const { + return TaskRunnerStats.GetInputTransform(inputIdx); + } + private: STFUNC(StateFuncBody) { switch (ev->GetTypeRewrite()) { @@ -189,10 +251,6 @@ class TDqAsyncComputeActor : public TDqComputeActorBase return TaskRunnerStats.GetSink(outputIdx); } - const IDqAsyncInputBuffer* GetInputTransform(ui64 inputIdx, const TAsyncInputTransformInfo&) const override { - return TaskRunnerStats.GetInputTransform(inputIdx); - } - void DrainOutputChannel(TOutputChannelInfo& outputChannel) override { YQL_ENSURE(!outputChannel.Finished || Checkpoints); @@ -280,12 +338,6 @@ class TDqAsyncComputeActor : public TDqComputeActorBase DoExecute(); } - void AsyncInputPush(NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, TAsyncInputInfoBase& source, i64 space, bool finished) override { - ProcessSourcesState.Inflight++; - source.PushStarted = true; - source.Finished = finished; - TaskRunnerActor->AsyncInputPush(Cookie++, source.Index, std::move(batch), space, finished); - } void TakeInputChannelData(TChannelDataOOB&& channelDataOOB, bool ack) override { CA_LOG_T("took input"); @@ -409,10 +461,6 @@ class TDqAsyncComputeActor : public TDqComputeActorBase return inputChannel->FreeSpace; } - i64 AsyncInputFreeSpace(TAsyncInputInfoBase& source) override { - return source.FreeSpace; - } - TGuard BindAllocator() override { return TypeEnv->BindAllocator(); } diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h new file mode 100644 index 000000000000..1c41ac1ebd9c --- /dev/null +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h @@ -0,0 +1,103 @@ +#pragma once +#include "dq_compute_actor_async_io.h" +#include "dq_compute_issues_buffer.h" +#include "dq_compute_actor_metrics.h" +#include "dq_compute_actor_watermarks.h" + +//must be included the last +#include "dq_compute_actor_log.h" + +namespace NYql::NDq { + +constexpr ui32 IssuesBufferSize = 16; + +struct TComputeActorAsyncInputHelper { + TString Type; + const TString LogPrefix; + ui64 Index; + IDqComputeActorAsyncInput* AsyncInput = nullptr; + NActors::IActor* Actor = nullptr; + TIssuesBuffer IssuesBuffer; + bool Finished = false; + const NDqProto::EWatermarksMode WatermarksMode = NDqProto::EWatermarksMode::WATERMARKS_MODE_DISABLED; + TMaybe PendingWatermark = Nothing(); +public: + TComputeActorAsyncInputHelper( + const TString& logPrefix, + ui64 index, + NDqProto::EWatermarksMode watermarksMode) + : LogPrefix(logPrefix) + , Index(index) + , IssuesBuffer(IssuesBufferSize) + , WatermarksMode(watermarksMode) {} + + bool IsPausedByWatermark() { + return PendingWatermark.Defined(); + } + + void Pause(TInstant watermark) { + YQL_ENSURE(WatermarksMode != NDqProto::WATERMARKS_MODE_DISABLED); + PendingWatermark = watermark; + } + + void ResumeByWatermark(TInstant watermark) { + YQL_ENSURE(watermark == PendingWatermark); + PendingWatermark = Nothing(); + } + + virtual i64 GetFreeSpace() const = 0; + virtual void AsyncInputPush(NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, i64 space, bool finished) = 0; + + TMaybe PollAsyncInput(TDqComputeActorMetrics& metricsReporter, TDqComputeActorWatermarks& watermarksTracker, i64 asyncInputPushLimit) { + if (Finished) { + CA_LOG_T("Skip polling async input[" << Index << "]: finished"); + return {}; + } + + if (IsPausedByWatermark()) { + CA_LOG_T("Skip polling async input[" << Index << "]: paused"); + return {}; + } + + const i64 freeSpace = GetFreeSpace(); + if (freeSpace > 0) { + TMaybe watermark; + NKikimr::NMiniKQL::TUnboxedValueBatch batch; + Y_ABORT_UNLESS(AsyncInput); + bool finished = false; + const i64 space = AsyncInput->GetAsyncInputData(batch, watermark, finished, std::min(freeSpace, asyncInputPushLimit)); + CA_LOG_T("Poll async input " << Index + << ". Buffer free space: " << freeSpace + << ", read from async input: " << space << " bytes, " + << batch.RowCount() << " rows, finished: " << finished); + + metricsReporter.ReportAsyncInputData(Index, batch.RowCount(), space, watermark); + + if (watermark) { + const auto inputWatermarkChanged = watermarksTracker.NotifyAsyncInputWatermarkReceived( + Index, + *watermark); + + if (inputWatermarkChanged) { + CA_LOG_T("Pause async input " << Index << " because of watermark " << *watermark); + Pause(*watermark); + } + } + AsyncInputPush(std::move(batch), space, finished); + if (!batch.empty()) { + // If we have read some data, we must run such reading again + // to process the case when async input notified us about new data + // but we haven't read all of it. + return EResumeSource::CAPollAsync; + } + + } else { + CA_LOG_T("Skip polling async input[" << Index << "]: no free space: " << freeSpace); + return EResumeSource::CAPollAsyncNoSpace; // If there is no free space in buffer, => we have something to process + } + return {}; + } +}; + +} //namespace NYql::NDq + diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index f8e194fe2d2f..c06bfb99d37f 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -35,33 +35,12 @@ #include #include -#if defined CA_LOG_D || defined CA_LOG_I || defined CA_LOG_E || defined CA_LOG_C -# error log macro definition clash -#endif - -#define CA_LOG_T(s) \ - LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) -#define CA_LOG_D(s) \ - LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) -#define CA_LOG_I(s) \ - LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) -#define CA_LOG_W(s) \ - LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) -#define CA_LOG_N(s) \ - LOG_NOTICE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) -#define CA_LOG_E(s) \ - LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) -#define CA_LOG_C(s) \ - LOG_CRIT_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) -#define CA_LOG(prio, s) \ - LOG_LOG_S(*NActors::TlsActivationContext, prio, NKikimrServices::KQP_COMPUTE, LogPrefix << s) - +#include "dq_compute_actor_async_input_helper.h" +#include "dq_compute_actor_log.h" namespace NYql { namespace NDq { -constexpr ui32 IssuesBufferSize = 16; - struct TSinkCallbacks : public IDqComputeActorAsyncOutput::ICallbacks { void OnAsyncOutputError(ui64 outputIndex, const TIssues& issues, NYql::NDqProto::StatusIds::StatusCode fatalCode) override final { OnSinkError(outputIndex, issues, fatalCode); @@ -111,7 +90,7 @@ struct TComputeActorStateFuncHelper { } // namespace NDetails -template +template class TDqComputeActorBase : public NActors::TActorBootstrapped , public TDqComputeActorChannels::ICallbacks , public TDqComputeActorCheckpoints::ICallbacks @@ -126,7 +105,6 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped RlNoResourceTag = 102, }; - public: void Bootstrap() { try { @@ -854,7 +832,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped ythrow yexception() << "Invalid state version " << version; } for (const NDqProto::TSourceState& sourceState : state.GetSources()) { - TAsyncInputInfoBase* source = SourcesMap.FindPtr(sourceState.GetInputIndex()); + TAsyncInputHelper* source = SourcesMap.FindPtr(sourceState.GetInputIndex()); YQL_ENSURE(source, "Failed to load state. Source with input index " << sourceState.GetInputIndex() << " was not found"); YQL_ENSURE(source->AsyncInput, "Source[" << sourceState.GetInputIndex() << "] is not created"); source->AsyncInput->LoadState(sourceState); @@ -952,49 +930,11 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped } }; - struct TAsyncInputInfoBase { - TString Type; - const TString LogPrefix; - ui64 Index; - IDqAsyncInputBuffer::TPtr Buffer; - IDqComputeActorAsyncInput* AsyncInput = nullptr; - NActors::IActor* Actor = nullptr; - TIssuesBuffer IssuesBuffer; - bool Finished = false; - i64 FreeSpace = 1; - bool PushStarted = false; - const NDqProto::EWatermarksMode WatermarksMode = NDqProto::EWatermarksMode::WATERMARKS_MODE_DISABLED; - TMaybe PendingWatermark = Nothing(); - - explicit TAsyncInputInfoBase( - const TString& logPrefix, - ui64 index, - NDqProto::EWatermarksMode watermarksMode) - : LogPrefix(logPrefix) - , Index(index) - , IssuesBuffer(IssuesBufferSize) - , WatermarksMode(watermarksMode) {} - - bool IsPausedByWatermark() { - return PendingWatermark.Defined(); - } - - void Pause(TInstant watermark) { - YQL_ENSURE(WatermarksMode != NDqProto::WATERMARKS_MODE_DISABLED); - PendingWatermark = watermark; - } - - void ResumeByWatermark(TInstant watermark) { - YQL_ENSURE(watermark == PendingWatermark); - PendingWatermark = Nothing(); - } - }; - - struct TAsyncInputTransformInfo : public TAsyncInputInfoBase { + struct TAsyncInputTransformHelper : TAsyncInputHelper { NUdf::TUnboxedValue InputBuffer; TMaybe ProgramBuilder; - using TAsyncInputInfoBase::TAsyncInputInfoBase; + using TAsyncInputHelper::TAsyncInputHelper; }; struct TOutputChannelInfo { @@ -1155,18 +1095,6 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped return TaskRunner->BindAllocator(); } - virtual void AsyncInputPush(NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, TAsyncInputInfoBase& source, i64 space, bool finished) { - source.Buffer->Push(std::move(batch), space); - if (finished) { - source.Buffer->Finish(); - source.Finished = true; - } - } - - virtual i64 AsyncInputFreeSpace(TAsyncInputInfoBase& source) { - return source.Buffer->GetFreeSpace(); - } - virtual bool SayHelloOnBootstrap() { return true; } @@ -1573,7 +1501,10 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped } auto collectStatsLevel = StatsModeToCollectStatsLevel(RuntimeSettings.StatsMode); for (auto& [inputIndex, source] : SourcesMap) { - if (TaskRunner) { source.Buffer = TaskRunner->GetSource(inputIndex); Y_ABORT_UNLESS(source.Buffer);} + if constexpr (!TDerived::HasAsyncTaskRunner) { + source.Buffer = TaskRunner->GetSource(inputIndex); + Y_ABORT_UNLESS(source.Buffer); + } Y_ABORT_UNLESS(AsyncIoFactory); const auto& inputDesc = Task.GetInputs(inputIndex); Y_ABORT_UNLESS(inputDesc.HasSource()); @@ -1607,7 +1538,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped this->RegisterWithSameMailbox(source.Actor); } for (auto& [inputIndex, transform] : InputTransformsMap) { - if (TaskRunner) { + if constexpr (!TDerived::HasAsyncTaskRunner) { transform.ProgramBuilder.ConstructInPlace(TaskRunner->GetTypeEnv(), *FunctionRegistry); std::tie(transform.InputBuffer, transform.Buffer) = TaskRunner->GetInputTransform(inputIndex); Y_ABORT_UNLESS(AsyncIoFactory); @@ -1698,58 +1629,6 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped } } - void PollAsyncInput(TAsyncInputInfoBase& info, ui64 inputIndex) { - Y_ABORT_UNLESS(!TaskRunner || info.Buffer); - - if (info.Finished) { - CA_LOG_T("Skip polling async input[" << inputIndex << "]: finished"); - return; - } - - if (info.IsPausedByWatermark()) { - CA_LOG_T("Skip polling async input[" << inputIndex << "]: paused"); - return; - } - - const i64 freeSpace = AsyncInputFreeSpace(info); - if (freeSpace > 0) { - TMaybe watermark; - NKikimr::NMiniKQL::TUnboxedValueBatch batch; - Y_ABORT_UNLESS(info.AsyncInput); - bool finished = false; - const i64 space = info.AsyncInput->GetAsyncInputData(batch, watermark, finished, std::min(freeSpace, RuntimeSettings.AsyncInputPushLimit)); - CA_LOG_T("Poll async input " << inputIndex - << ". Buffer free space: " << freeSpace - << ", read from async input: " << space << " bytes, " - << batch.RowCount() << " rows, finished: " << finished); - - if (!batch.empty()) { - // If we have read some data, we must run such reading again - // to process the case when async input notified us about new data - // but we haven't read all of it. - ContinueExecute(EResumeSource::CAPollAsync); - } - - MetricsReporter.ReportAsyncInputData(inputIndex, batch.RowCount(), space, watermark); - - if (watermark) { - const auto inputWatermarkChanged = WatermarksTracker.NotifyAsyncInputWatermarkReceived( - inputIndex, - *watermark); - - if (inputWatermarkChanged) { - CA_LOG_T("Pause async input " << inputIndex << " because of watermark " << *watermark); - info.Pause(*watermark); - } - } - - AsyncInputPush(std::move(batch), info, space, finished); - } else { - CA_LOG_T("Skip polling async input[" << inputIndex << "]: no free space: " << freeSpace); - ContinueExecute(EResumeSource::CAPollAsyncNoSpace); // If there is no free space in buffer, => we have something to process - } - } - void PollAsyncInput() { // Don't produce any input from sources if we're about to save checkpoint. if (!Running || (Checkpoints && Checkpoints->HasPendingCheckpoint() && !Checkpoints->ComputeActorStateSaved())) { @@ -1759,12 +1638,16 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped CA_LOG_T("Poll sources"); for (auto& [inputIndex, source] : SourcesMap) { - PollAsyncInput(source, inputIndex); + if (auto resume = source.PollAsyncInput(MetricsReporter, WatermarksTracker, RuntimeSettings.AsyncInputPushLimit)) { + ContinueExecute(*resume); + } } CA_LOG_T("Poll inputs"); for (auto& [inputIndex, transform] : InputTransformsMap) { - PollAsyncInput(transform, inputIndex); + if (auto resume = transform.PollAsyncInput(MetricsReporter, WatermarksTracker, RuntimeSettings.AsyncInputPushLimit)) { + ContinueExecute(*resume); + } } } @@ -1859,16 +1742,18 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped if (inputDesc.HasTransform()) { auto result = InputTransformsMap.emplace( - std::piecewise_construct, - std::make_tuple(i), - std::make_tuple(LogPrefix, i, NDqProto::WATERMARKS_MODE_DISABLED) + i, + static_cast(this)->template CreateInputHelper(LogPrefix, i, NDqProto::WATERMARKS_MODE_DISABLED) ); YQL_ENSURE(result.second); } if (inputDesc.HasSource()) { const auto watermarksMode = inputDesc.GetSource().GetWatermarksMode(); - auto result = SourcesMap.emplace(i, TAsyncInputInfoBase(LogPrefix, i, watermarksMode)); + auto result = SourcesMap.emplace( + i, + static_cast(this)->template CreateInputHelper(LogPrefix, i, watermarksMode) + ); YQL_ENSURE(result.second); } else { for (auto& channel : inputDesc.GetChannels()) { @@ -1950,10 +1835,6 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped return sinkInfo.Buffer.Get(); } - virtual const IDqAsyncInputBuffer* GetInputTransform(ui64, const TAsyncInputTransformInfo& inputTransformInfo) const { - return inputTransformInfo.Buffer.Get(); - } - public: TDuration GetSourceCpuTime() const { @@ -2103,7 +1984,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped } for (auto& [inputIndex, transformInfo] : InputTransformsMap) { - auto* transform = GetInputTransform(inputIndex, transformInfo); + auto* transform = static_cast(this)->GetInputTransform(inputIndex, transformInfo); if (transform && RuntimeSettings.CollectFull()) { // TODO: Ingress clarification auto& protoTransform = *protoTask->AddInputTransforms(); @@ -2248,8 +2129,8 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped TDqComputeActorChannels* Channels = nullptr; TDqComputeActorCheckpoints* Checkpoints = nullptr; THashMap InputChannelsMap; // Channel id -> Channel info - THashMap SourcesMap; // Input index -> Source info - THashMap InputTransformsMap; // Input index -> Transforms info + THashMap SourcesMap; // Input index -> Source info + THashMap InputTransformsMap; // Input index -> Transforms info THashMap OutputChannelsMap; // Channel id -> Channel info THashMap SinksMap; // Output index -> Sink info THashMap OutputTransformsMap; // Output index -> Transforms info diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_log.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_log.h new file mode 100644 index 000000000000..aed3dc524015 --- /dev/null +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_log.h @@ -0,0 +1,24 @@ +#pragma once +#include + +#if defined CA_LOG_D || defined CA_LOG_I || defined CA_LOG_E || defined CA_LOG_C +# error log macro definition clash +#endif + +#define CA_LOG_T(s) \ + LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) +#define CA_LOG_D(s) \ + LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) +#define CA_LOG_I(s) \ + LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) +#define CA_LOG_W(s) \ + LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) +#define CA_LOG_N(s) \ + LOG_NOTICE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) +#define CA_LOG_E(s) \ + LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) +#define CA_LOG_C(s) \ + LOG_CRIT_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) +#define CA_LOG(prio, s) \ + LOG_LOG_S(*NActors::TlsActivationContext, prio, NKikimrServices::KQP_COMPUTE, LogPrefix << s) + diff --git a/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h b/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h index 41d0401c0a3f..0d10b8e7c861 100644 --- a/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h +++ b/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h @@ -1,12 +1,48 @@ -#include "./dq_compute_actor_impl.h" +#include "dq_compute_actor_impl.h" +#include "dq_compute_actor_async_input_helper.h" namespace NYql::NDq { +struct TComputeActorAsyncInputHelperForTaskRunner : public TComputeActorAsyncInputHelper +{ +public: + using TComputeActorAsyncInputHelper::TComputeActorAsyncInputHelper; + + void AsyncInputPush(NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, i64 space, bool finished) override { + Buffer->Push(std::move(batch), space); + if (finished) { + Buffer->Finish(); + Finished = true; + } + } + i64 GetFreeSpace() const override{ + return Buffer->GetFreeSpace(); + } + + IDqAsyncInputBuffer::TPtr Buffer; +}; + template -class TDqSyncComputeActorBase: public TDqComputeActorBase { +class TDqSyncComputeActorBase: public TDqComputeActorBase { + using TBase = TDqComputeActorBase; public: - using TDqComputeActorBase::TDqComputeActorBase; + using TDqComputeActorBase::TDqComputeActorBase; static constexpr bool HasAsyncTaskRunner = false; + + template + requires(std::is_base_of::value) + T CreateInputHelper(const TString& logPrefix, + ui64 index, + NDqProto::EWatermarksMode watermarksMode + ) + { + return T(logPrefix, index, watermarksMode); + } + + const IDqAsyncInputBuffer* GetInputTransform(ui64, const TComputeActorAsyncInputHelperForTaskRunner& inputTransformInfo) const + { + return inputTransformInfo.Buffer.Get(); + } }; } //namespace NYql::NDq diff --git a/ydb/library/yql/minikql/mkql_program_builder.h b/ydb/library/yql/minikql/mkql_program_builder.h index e0ee17b9cd1c..b4f59892afca 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.h +++ b/ydb/library/yql/minikql/mkql_program_builder.h @@ -122,7 +122,7 @@ struct TAggInfo { std::vector ArgsColumns; }; -class TProgramBuilder : private TNonCopyable { +class TProgramBuilder : public TMoveOnly { public: TProgramBuilder(const TTypeEnvironment& env, const IFunctionRegistry& functionRegistry, bool voidWithEffects = false);