diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h index 3e3d5fd7173d..62d3c1fcb3b2 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h @@ -48,13 +48,6 @@ class TFetchedData { } } - std::shared_ptr GetBatch() const { - if (!Table) { - return nullptr; - } - return NArrow::ToBatch(Table, true); - } - void AddNulls(THashMap&& blobs) { for (auto&& i : blobs) { AFL_VERIFY(Blobs.emplace(i.first, i.second).second); @@ -109,4 +102,17 @@ class TFetchedData { }; +class TFetchedResult { +private: + YDB_READONLY_DEF(std::shared_ptr, Batch); + YDB_READONLY_DEF(std::shared_ptr, NotAppliedFilter); +public: + TFetchedResult(std::unique_ptr&& data) + : NotAppliedFilter(data->GetNotAppliedFilter()) { + if (data->GetTable()) { + Batch = NArrow::ToBatch(data->GetTable(), true); + } + } +}; + } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp index 0773b0c50a45..96923fed2a2f 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp @@ -16,6 +16,7 @@ bool TStepAction::DoApply(IDataReader& /*owner*/) const { bool TStepAction::DoExecute() { while (Step) { if (Source->IsEmptyData()) { + Source->Finalize(); FinishedFlag = true; return true; } @@ -23,11 +24,13 @@ bool TStepAction::DoExecute() { return true; } if (Source->IsEmptyData()) { + Source->Finalize(); FinishedFlag = true; return true; } Step = Step->GetNextStep(); } + Source->Finalize(); FinishedFlag = true; return true; } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp index 5b55803cd3c8..dce2a8a3e1a3 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp @@ -60,7 +60,7 @@ class TMergeTask: public NColumnShard::IDataTasksProcessor::ITask { } virtual bool DoExecute() override { if (MergingContext->IsExclusiveInterval()) { - ResultBatch = Sources.begin()->second->GetStageData().GetBatch(); + ResultBatch = Sources.begin()->second->GetStageResult().GetBatch(); if (ResultBatch && ResultBatch->num_rows()) { LastPK = Sources.begin()->second->GetLastPK(); ResultBatch = NArrow::ExtractColumnsValidate(ResultBatch, Context->GetProgramInputColumns()->GetColumnNamesVector()); @@ -81,8 +81,8 @@ class TMergeTask: public NColumnShard::IDataTasksProcessor::ITask { } std::shared_ptr merger = Context->BuildMerger(); for (auto&& [_, i] : Sources) { - if (auto rb = i->GetStageData().GetBatch()) { - merger->AddSource(rb, i->GetStageData().GetNotAppliedFilter()); + if (auto rb = i->GetStageResult().GetBatch()) { + merger->AddSource(rb, i->GetStageResult().GetNotAppliedFilter()); } } AFL_VERIFY(merger->GetSourcesCount() <= Sources.size()); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp index 6bc4ddf410b9..e52f769a8172 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp @@ -15,7 +15,7 @@ namespace NKikimr::NOlap::NPlainReader { void IDataSource::InitFetchingPlan(const std::shared_ptr& fetchingFirstStep, const std::shared_ptr& sourcePtr, const bool isExclusive) { AFL_VERIFY(fetchingFirstStep); if (AtomicCas(&FilterStageFlag, 1, 0)) { - StageData = std::make_shared(isExclusive); + StageData = std::make_unique(isExclusive); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("InitFetchingPlan", fetchingFirstStep->DebugString())("source_idx", SourceIdx); NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFetchingPlan")); if (IsAborted()) { diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h index b8765b373e7a..f339312ecfd7 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h @@ -36,7 +36,8 @@ class IDataSource { protected: THashMap Intervals; - std::shared_ptr StageData; + std::unique_ptr StageData; + std::unique_ptr StageResult; TAtomic FilterStageFlag = 0; bool IsReadyFlag = false; @@ -51,8 +52,17 @@ class IDataSource { virtual void DoAbort() = 0; virtual void DoApplyIndex(const NIndexes::TIndexCheckerContainer& indexMeta) = 0; public: + const TFetchedResult& GetStageResult() const { + AFL_VERIFY(!!StageResult); + return *StageResult; + } + void SetIsReady(); + void Finalize() { + StageResult = std::make_unique(std::move(StageData)); + } + bool IsEmptyData() const { return GetStageData().IsEmpty(); }