Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 65 additions & 17 deletions ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "dq_compute_actor.h"
#include "dq_async_compute_actor.h"
#include "dq_compute_actor_async_input_helper.h"
#include "dq_task_runner_exec_ctx.h"

#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h>
Expand All @@ -24,11 +25,49 @@ bool IsDebugLogEnabled(const TActorSystem* actorSystem) {

} // anonymous namespace

class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor>
, public NTaskRunnerActor::ITaskRunnerActor::ICallbacks
struct TComputeActorAsyncInputHelperForTaskRunnerActor : public TComputeActorAsyncInputHelper
{
using TBase = TDqComputeActorBase<TDqAsyncComputeActor>;
public:
TComputeActorAsyncInputHelperForTaskRunnerActor(
const TString& logPrefix,
ui64 index,
NDqProto::EWatermarksMode watermarksMode,
ui64& cookie,
int& inflight
)
: TComputeActorAsyncInputHelper(logPrefix, index, watermarksMode)
, TaskRunnerActor(nullptr)
, Cookie(cookie)
, Inflight(inflight)
, FreeSpace(1)
, PushStarted(false)
{}

i64 GetFreeSpace() const override
{
return FreeSpace;
}

void AsyncInputPush(NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, i64 space, bool finished) override
{
Inflight++;
PushStarted = true;
Finished = finished;
Y_ABORT_UNLESS(TaskRunnerActor);
TaskRunnerActor->AsyncInputPush(Cookie++, Index, std::move(batch), space, finished);
}

NTaskRunnerActor::ITaskRunnerActor* TaskRunnerActor;
ui64& Cookie;
int& Inflight;
i64 FreeSpace;
bool PushStarted;
};

class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TComputeActorAsyncInputHelperForTaskRunnerActor>
, public NTaskRunnerActor::ITaskRunnerActor::ICallbacks
{
using TBase = TDqComputeActorBase<TDqAsyncComputeActor, TComputeActorAsyncInputHelperForTaskRunnerActor>;
public:
static constexpr char ActorName[] = "DQ_COMPUTE_ACTOR";

Expand Down Expand Up @@ -81,6 +120,15 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor>
limits.ChannelBufferSize = MemoryLimits.ChannelBufferSize;
limits.OutputChunkMaxSize = GetDqExecutionSettings().FlowControl.MaxOutputChunkSize;

for (auto& [_, source]: SourcesMap) {
source.TaskRunnerActor = TaskRunnerActor;
}

for (auto& [_, source]: InputTransformsMap) {
source.TaskRunnerActor = TaskRunnerActor;
}


Become(&TDqAsyncComputeActor::StateFuncWrapper<&TDqAsyncComputeActor::StateFuncBody>);

auto wakeup = [this]{ ContinueExecute(EResumeSource::CABootstrapWakeup); };
Expand All @@ -105,6 +153,20 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor>
}
}

template<typename T>
requires(std::is_base_of<TComputeActorAsyncInputHelperForTaskRunnerActor, T>::value)
T CreateInputHelper(const TString& logPrefix,
ui64 index,
NDqProto::EWatermarksMode watermarksMode
)
{
return T(logPrefix, index, watermarksMode, Cookie, ProcessSourcesState.Inflight);
}

const IDqAsyncInputBuffer* GetInputTransform(ui64 inputIdx, const TComputeActorAsyncInputHelperForTaskRunnerActor&) const {
return TaskRunnerStats.GetInputTransform(inputIdx);
}

private:
STFUNC(StateFuncBody) {
switch (ev->GetTypeRewrite()) {
Expand Down Expand Up @@ -189,10 +251,6 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor>
return TaskRunnerStats.GetSink(outputIdx);
}

const IDqAsyncInputBuffer* GetInputTransform(ui64 inputIdx, const TAsyncInputTransformInfo&) const override {
return TaskRunnerStats.GetInputTransform(inputIdx);
}

void DrainOutputChannel(TOutputChannelInfo& outputChannel) override {
YQL_ENSURE(!outputChannel.Finished || Checkpoints);

Expand Down Expand Up @@ -280,12 +338,6 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor>
DoExecute();
}

void AsyncInputPush(NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, TAsyncInputInfoBase& source, i64 space, bool finished) override {
ProcessSourcesState.Inflight++;
source.PushStarted = true;
source.Finished = finished;
TaskRunnerActor->AsyncInputPush(Cookie++, source.Index, std::move(batch), space, finished);
}

void TakeInputChannelData(TChannelDataOOB&& channelDataOOB, bool ack) override {
CA_LOG_T("took input");
Expand Down Expand Up @@ -409,10 +461,6 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor>
return inputChannel->FreeSpace;
}

i64 AsyncInputFreeSpace(TAsyncInputInfoBase& source) override {
return source.FreeSpace;
}

TGuard<NKikimr::NMiniKQL::TScopedAlloc> BindAllocator() override {
return TypeEnv->BindAllocator();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
#pragma once
#include "dq_compute_actor_async_io.h"
#include "dq_compute_issues_buffer.h"
#include "dq_compute_actor_metrics.h"
#include "dq_compute_actor_watermarks.h"

//must be included the last
#include "dq_compute_actor_log.h"

namespace NYql::NDq {

constexpr ui32 IssuesBufferSize = 16;

struct TComputeActorAsyncInputHelper {
TString Type;
const TString LogPrefix;
ui64 Index;
IDqComputeActorAsyncInput* AsyncInput = nullptr;
NActors::IActor* Actor = nullptr;
TIssuesBuffer IssuesBuffer;
bool Finished = false;
const NDqProto::EWatermarksMode WatermarksMode = NDqProto::EWatermarksMode::WATERMARKS_MODE_DISABLED;
TMaybe<TInstant> PendingWatermark = Nothing();
public:
TComputeActorAsyncInputHelper(
const TString& logPrefix,
ui64 index,
NDqProto::EWatermarksMode watermarksMode)
: LogPrefix(logPrefix)
, Index(index)
, IssuesBuffer(IssuesBufferSize)
, WatermarksMode(watermarksMode) {}

bool IsPausedByWatermark() {
return PendingWatermark.Defined();
}

void Pause(TInstant watermark) {
YQL_ENSURE(WatermarksMode != NDqProto::WATERMARKS_MODE_DISABLED);
PendingWatermark = watermark;
}

void ResumeByWatermark(TInstant watermark) {
YQL_ENSURE(watermark == PendingWatermark);
PendingWatermark = Nothing();
}

virtual i64 GetFreeSpace() const = 0;
virtual void AsyncInputPush(NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, i64 space, bool finished) = 0;

TMaybe<EResumeSource> PollAsyncInput(TDqComputeActorMetrics& metricsReporter, TDqComputeActorWatermarks& watermarksTracker, i64 asyncInputPushLimit) {
if (Finished) {
CA_LOG_T("Skip polling async input[" << Index << "]: finished");
return {};
}

if (IsPausedByWatermark()) {
CA_LOG_T("Skip polling async input[" << Index << "]: paused");
return {};
}

const i64 freeSpace = GetFreeSpace();
if (freeSpace > 0) {
TMaybe<TInstant> watermark;
NKikimr::NMiniKQL::TUnboxedValueBatch batch;
Y_ABORT_UNLESS(AsyncInput);
bool finished = false;
const i64 space = AsyncInput->GetAsyncInputData(batch, watermark, finished, std::min(freeSpace, asyncInputPushLimit));
CA_LOG_T("Poll async input " << Index
<< ". Buffer free space: " << freeSpace
<< ", read from async input: " << space << " bytes, "
<< batch.RowCount() << " rows, finished: " << finished);

metricsReporter.ReportAsyncInputData(Index, batch.RowCount(), space, watermark);

if (watermark) {
const auto inputWatermarkChanged = watermarksTracker.NotifyAsyncInputWatermarkReceived(
Index,
*watermark);

if (inputWatermarkChanged) {
CA_LOG_T("Pause async input " << Index << " because of watermark " << *watermark);
Pause(*watermark);
}
}
AsyncInputPush(std::move(batch), space, finished);
if (!batch.empty()) {
// If we have read some data, we must run such reading again
// to process the case when async input notified us about new data
// but we haven't read all of it.
return EResumeSource::CAPollAsync;
}

} else {
CA_LOG_T("Skip polling async input[" << Index << "]: no free space: " << freeSpace);
return EResumeSource::CAPollAsyncNoSpace; // If there is no free space in buffer, => we have something to process
}
return {};
}
};

} //namespace NYql::NDq

Loading