From 068f067c77da3e653305c992aab07cf09435a0fa Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Wed, 31 Jan 2024 13:49:11 +0300 Subject: [PATCH 1/6] use cache for fetched data compiled result --- .../engines/reader/plain_reader/context.cpp | 5 ++++- .../engines/reader/plain_reader/fetched_data.h | 12 ++++++++++++ .../engines/reader/plain_reader/fetching.cpp | 5 +++++ .../engines/reader/plain_reader/fetching.h | 13 +++++++++++++ .../engines/reader/plain_reader/interval.cpp | 6 +++--- .../engines/reader/plain_reader/source.cpp | 2 +- .../engines/reader/plain_reader/source.h | 12 +++++++++++- 7 files changed, 49 insertions(+), 6 deletions(-) diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp index cf07daf49f8f..4fb1901598da 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp @@ -21,7 +21,9 @@ std::shared_ptr TSpecialReadContext const bool needSnapshots = !exclusiveSource || ReadMetadata->GetSnapshot() < source->GetRecordSnapshotMax(); auto result = CacheFetchingScripts[needSnapshots ? 1 : 0][exclusiveSource ? 1 : 0]; if (!result) { - return std::make_shared(source->GetRecordsCount(), "fake"); + std::shared_ptr result = std::make_shared(source->GetRecordsCount(), "fake"); + result->AttachNext(std::make_shared()); + return result; } return result; } @@ -101,6 +103,7 @@ std::shared_ptr TSpecialReadContext current = current->AttachNext(std::make_shared(std::make_shared(columnsAdditionalFetch))); } } + current = current->AttachNext(std::make_shared()); return result->GetNextStep(); } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h index 3e3d5fd7173d..8647345d6ce5 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h @@ -109,4 +109,16 @@ class TFetchedData { }; +class TFetchedResult { +private: + YDB_READONLY_DEF(std::shared_ptr, Batch); + YDB_READONLY_DEF(std::shared_ptr, NotAppliedFilter); +public: + TFetchedResult(std::unique_ptr&& data) + : Batch(data->GetBatch()) + , NotAppliedFilter(data->GetNotAppliedFilter()) { + + } +}; + } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp index 0773b0c50a45..5915f56b562f 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp @@ -86,4 +86,9 @@ bool TApplyIndexStep::DoExecuteInplace(const std::shared_ptr& sourc return true; } +bool TFinalizeSourceStep::DoExecuteInplace(const std::shared_ptr& source, const std::shared_ptr& /*step*/) const { + source->Finalize(); + return true; +} + } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.h index 6446538535da..39dc6560077b 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.h @@ -181,6 +181,19 @@ class TAssemblerStep: public IFetchingStep { } }; +class TFinalizeSourceStep: public IFetchingStep { +private: + using TBase = IFetchingStep; + virtual TString DoDebugString() const override { + return ""; + } +public: + virtual bool DoExecuteInplace(const std::shared_ptr& source, const std::shared_ptr& /*step*/) const override; + TFinalizeSourceStep() + : TBase("FINALIZE") { + } +}; + class TFilterProgramStep: public IFetchingStep { private: using TBase = IFetchingStep; diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp index 5b55803cd3c8..dce2a8a3e1a3 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp @@ -60,7 +60,7 @@ class TMergeTask: public NColumnShard::IDataTasksProcessor::ITask { } virtual bool DoExecute() override { if (MergingContext->IsExclusiveInterval()) { - ResultBatch = Sources.begin()->second->GetStageData().GetBatch(); + ResultBatch = Sources.begin()->second->GetStageResult().GetBatch(); if (ResultBatch && ResultBatch->num_rows()) { LastPK = Sources.begin()->second->GetLastPK(); ResultBatch = NArrow::ExtractColumnsValidate(ResultBatch, Context->GetProgramInputColumns()->GetColumnNamesVector()); @@ -81,8 +81,8 @@ class TMergeTask: public NColumnShard::IDataTasksProcessor::ITask { } std::shared_ptr merger = Context->BuildMerger(); for (auto&& [_, i] : Sources) { - if (auto rb = i->GetStageData().GetBatch()) { - merger->AddSource(rb, i->GetStageData().GetNotAppliedFilter()); + if (auto rb = i->GetStageResult().GetBatch()) { + merger->AddSource(rb, i->GetStageResult().GetNotAppliedFilter()); } } AFL_VERIFY(merger->GetSourcesCount() <= Sources.size()); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp index 6bc4ddf410b9..e52f769a8172 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp @@ -15,7 +15,7 @@ namespace NKikimr::NOlap::NPlainReader { void IDataSource::InitFetchingPlan(const std::shared_ptr& fetchingFirstStep, const std::shared_ptr& sourcePtr, const bool isExclusive) { AFL_VERIFY(fetchingFirstStep); if (AtomicCas(&FilterStageFlag, 1, 0)) { - StageData = std::make_shared(isExclusive); + StageData = std::make_unique(isExclusive); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("InitFetchingPlan", fetchingFirstStep->DebugString())("source_idx", SourceIdx); NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFetchingPlan")); if (IsAborted()) { diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h index b8765b373e7a..f339312ecfd7 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h @@ -36,7 +36,8 @@ class IDataSource { protected: THashMap Intervals; - std::shared_ptr StageData; + std::unique_ptr StageData; + std::unique_ptr StageResult; TAtomic FilterStageFlag = 0; bool IsReadyFlag = false; @@ -51,8 +52,17 @@ class IDataSource { virtual void DoAbort() = 0; virtual void DoApplyIndex(const NIndexes::TIndexCheckerContainer& indexMeta) = 0; public: + const TFetchedResult& GetStageResult() const { + AFL_VERIFY(!!StageResult); + return *StageResult; + } + void SetIsReady(); + void Finalize() { + StageResult = std::make_unique(std::move(StageData)); + } + bool IsEmptyData() const { return GetStageData().IsEmpty(); } From f1ab76d61690d5526bf3c2691eb70ae8f509f4ca Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Wed, 31 Jan 2024 13:54:38 +0300 Subject: [PATCH 2/6] correction --- .../engines/reader/plain_reader/context.cpp | 5 +---- .../engines/reader/plain_reader/fetching.cpp | 8 +++----- .../engines/reader/plain_reader/fetching.h | 13 ------------- 3 files changed, 4 insertions(+), 22 deletions(-) diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp index 4fb1901598da..cf07daf49f8f 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp @@ -21,9 +21,7 @@ std::shared_ptr TSpecialReadContext const bool needSnapshots = !exclusiveSource || ReadMetadata->GetSnapshot() < source->GetRecordSnapshotMax(); auto result = CacheFetchingScripts[needSnapshots ? 1 : 0][exclusiveSource ? 1 : 0]; if (!result) { - std::shared_ptr result = std::make_shared(source->GetRecordsCount(), "fake"); - result->AttachNext(std::make_shared()); - return result; + return std::make_shared(source->GetRecordsCount(), "fake"); } return result; } @@ -103,7 +101,6 @@ std::shared_ptr TSpecialReadContext current = current->AttachNext(std::make_shared(std::make_shared(columnsAdditionalFetch))); } } - current = current->AttachNext(std::make_shared()); return result->GetNextStep(); } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp index 5915f56b562f..96923fed2a2f 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp @@ -16,6 +16,7 @@ bool TStepAction::DoApply(IDataReader& /*owner*/) const { bool TStepAction::DoExecute() { while (Step) { if (Source->IsEmptyData()) { + Source->Finalize(); FinishedFlag = true; return true; } @@ -23,11 +24,13 @@ bool TStepAction::DoExecute() { return true; } if (Source->IsEmptyData()) { + Source->Finalize(); FinishedFlag = true; return true; } Step = Step->GetNextStep(); } + Source->Finalize(); FinishedFlag = true; return true; } @@ -86,9 +89,4 @@ bool TApplyIndexStep::DoExecuteInplace(const std::shared_ptr& sourc return true; } -bool TFinalizeSourceStep::DoExecuteInplace(const std::shared_ptr& source, const std::shared_ptr& /*step*/) const { - source->Finalize(); - return true; -} - } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.h index 39dc6560077b..6446538535da 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.h @@ -181,19 +181,6 @@ class TAssemblerStep: public IFetchingStep { } }; -class TFinalizeSourceStep: public IFetchingStep { -private: - using TBase = IFetchingStep; - virtual TString DoDebugString() const override { - return ""; - } -public: - virtual bool DoExecuteInplace(const std::shared_ptr& source, const std::shared_ptr& /*step*/) const override; - TFinalizeSourceStep() - : TBase("FINALIZE") { - } -}; - class TFilterProgramStep: public IFetchingStep { private: using TBase = IFetchingStep; From 5f2e30790693be4cb99221b7fb08be573d8b6f30 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Wed, 31 Jan 2024 13:49:11 +0300 Subject: [PATCH 3/6] use cache for fetched data compiled result --- .../engines/reader/plain_reader/context.cpp | 5 ++++- .../engines/reader/plain_reader/fetched_data.h | 12 ++++++++++++ .../engines/reader/plain_reader/fetching.cpp | 5 +++++ .../engines/reader/plain_reader/fetching.h | 13 +++++++++++++ .../engines/reader/plain_reader/interval.cpp | 6 +++--- .../engines/reader/plain_reader/source.cpp | 2 +- .../engines/reader/plain_reader/source.h | 12 +++++++++++- 7 files changed, 49 insertions(+), 6 deletions(-) diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp index cf07daf49f8f..4fb1901598da 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp @@ -21,7 +21,9 @@ std::shared_ptr TSpecialReadContext const bool needSnapshots = !exclusiveSource || ReadMetadata->GetSnapshot() < source->GetRecordSnapshotMax(); auto result = CacheFetchingScripts[needSnapshots ? 1 : 0][exclusiveSource ? 1 : 0]; if (!result) { - return std::make_shared(source->GetRecordsCount(), "fake"); + std::shared_ptr result = std::make_shared(source->GetRecordsCount(), "fake"); + result->AttachNext(std::make_shared()); + return result; } return result; } @@ -101,6 +103,7 @@ std::shared_ptr TSpecialReadContext current = current->AttachNext(std::make_shared(std::make_shared(columnsAdditionalFetch))); } } + current = current->AttachNext(std::make_shared()); return result->GetNextStep(); } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h index 3e3d5fd7173d..8647345d6ce5 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h @@ -109,4 +109,16 @@ class TFetchedData { }; +class TFetchedResult { +private: + YDB_READONLY_DEF(std::shared_ptr, Batch); + YDB_READONLY_DEF(std::shared_ptr, NotAppliedFilter); +public: + TFetchedResult(std::unique_ptr&& data) + : Batch(data->GetBatch()) + , NotAppliedFilter(data->GetNotAppliedFilter()) { + + } +}; + } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp index 0773b0c50a45..5915f56b562f 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp @@ -86,4 +86,9 @@ bool TApplyIndexStep::DoExecuteInplace(const std::shared_ptr& sourc return true; } +bool TFinalizeSourceStep::DoExecuteInplace(const std::shared_ptr& source, const std::shared_ptr& /*step*/) const { + source->Finalize(); + return true; +} + } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.h index 6446538535da..39dc6560077b 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.h @@ -181,6 +181,19 @@ class TAssemblerStep: public IFetchingStep { } }; +class TFinalizeSourceStep: public IFetchingStep { +private: + using TBase = IFetchingStep; + virtual TString DoDebugString() const override { + return ""; + } +public: + virtual bool DoExecuteInplace(const std::shared_ptr& source, const std::shared_ptr& /*step*/) const override; + TFinalizeSourceStep() + : TBase("FINALIZE") { + } +}; + class TFilterProgramStep: public IFetchingStep { private: using TBase = IFetchingStep; diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp index 5b55803cd3c8..dce2a8a3e1a3 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp @@ -60,7 +60,7 @@ class TMergeTask: public NColumnShard::IDataTasksProcessor::ITask { } virtual bool DoExecute() override { if (MergingContext->IsExclusiveInterval()) { - ResultBatch = Sources.begin()->second->GetStageData().GetBatch(); + ResultBatch = Sources.begin()->second->GetStageResult().GetBatch(); if (ResultBatch && ResultBatch->num_rows()) { LastPK = Sources.begin()->second->GetLastPK(); ResultBatch = NArrow::ExtractColumnsValidate(ResultBatch, Context->GetProgramInputColumns()->GetColumnNamesVector()); @@ -81,8 +81,8 @@ class TMergeTask: public NColumnShard::IDataTasksProcessor::ITask { } std::shared_ptr merger = Context->BuildMerger(); for (auto&& [_, i] : Sources) { - if (auto rb = i->GetStageData().GetBatch()) { - merger->AddSource(rb, i->GetStageData().GetNotAppliedFilter()); + if (auto rb = i->GetStageResult().GetBatch()) { + merger->AddSource(rb, i->GetStageResult().GetNotAppliedFilter()); } } AFL_VERIFY(merger->GetSourcesCount() <= Sources.size()); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp index 6bc4ddf410b9..e52f769a8172 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp @@ -15,7 +15,7 @@ namespace NKikimr::NOlap::NPlainReader { void IDataSource::InitFetchingPlan(const std::shared_ptr& fetchingFirstStep, const std::shared_ptr& sourcePtr, const bool isExclusive) { AFL_VERIFY(fetchingFirstStep); if (AtomicCas(&FilterStageFlag, 1, 0)) { - StageData = std::make_shared(isExclusive); + StageData = std::make_unique(isExclusive); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("InitFetchingPlan", fetchingFirstStep->DebugString())("source_idx", SourceIdx); NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFetchingPlan")); if (IsAborted()) { diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h index b8765b373e7a..f339312ecfd7 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h @@ -36,7 +36,8 @@ class IDataSource { protected: THashMap Intervals; - std::shared_ptr StageData; + std::unique_ptr StageData; + std::unique_ptr StageResult; TAtomic FilterStageFlag = 0; bool IsReadyFlag = false; @@ -51,8 +52,17 @@ class IDataSource { virtual void DoAbort() = 0; virtual void DoApplyIndex(const NIndexes::TIndexCheckerContainer& indexMeta) = 0; public: + const TFetchedResult& GetStageResult() const { + AFL_VERIFY(!!StageResult); + return *StageResult; + } + void SetIsReady(); + void Finalize() { + StageResult = std::make_unique(std::move(StageData)); + } + bool IsEmptyData() const { return GetStageData().IsEmpty(); } From 08cd31181ec095d29689da9be7a6f3ae8c450853 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Wed, 31 Jan 2024 13:54:38 +0300 Subject: [PATCH 4/6] correction --- .../engines/reader/plain_reader/context.cpp | 5 +---- .../engines/reader/plain_reader/fetching.cpp | 8 +++----- .../engines/reader/plain_reader/fetching.h | 13 ------------- 3 files changed, 4 insertions(+), 22 deletions(-) diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp index 4fb1901598da..cf07daf49f8f 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp @@ -21,9 +21,7 @@ std::shared_ptr TSpecialReadContext const bool needSnapshots = !exclusiveSource || ReadMetadata->GetSnapshot() < source->GetRecordSnapshotMax(); auto result = CacheFetchingScripts[needSnapshots ? 1 : 0][exclusiveSource ? 1 : 0]; if (!result) { - std::shared_ptr result = std::make_shared(source->GetRecordsCount(), "fake"); - result->AttachNext(std::make_shared()); - return result; + return std::make_shared(source->GetRecordsCount(), "fake"); } return result; } @@ -103,7 +101,6 @@ std::shared_ptr TSpecialReadContext current = current->AttachNext(std::make_shared(std::make_shared(columnsAdditionalFetch))); } } - current = current->AttachNext(std::make_shared()); return result->GetNextStep(); } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp index 5915f56b562f..96923fed2a2f 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp @@ -16,6 +16,7 @@ bool TStepAction::DoApply(IDataReader& /*owner*/) const { bool TStepAction::DoExecute() { while (Step) { if (Source->IsEmptyData()) { + Source->Finalize(); FinishedFlag = true; return true; } @@ -23,11 +24,13 @@ bool TStepAction::DoExecute() { return true; } if (Source->IsEmptyData()) { + Source->Finalize(); FinishedFlag = true; return true; } Step = Step->GetNextStep(); } + Source->Finalize(); FinishedFlag = true; return true; } @@ -86,9 +89,4 @@ bool TApplyIndexStep::DoExecuteInplace(const std::shared_ptr& sourc return true; } -bool TFinalizeSourceStep::DoExecuteInplace(const std::shared_ptr& source, const std::shared_ptr& /*step*/) const { - source->Finalize(); - return true; -} - } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.h index 39dc6560077b..6446538535da 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.h @@ -181,19 +181,6 @@ class TAssemblerStep: public IFetchingStep { } }; -class TFinalizeSourceStep: public IFetchingStep { -private: - using TBase = IFetchingStep; - virtual TString DoDebugString() const override { - return ""; - } -public: - virtual bool DoExecuteInplace(const std::shared_ptr& source, const std::shared_ptr& /*step*/) const override; - TFinalizeSourceStep() - : TBase("FINALIZE") { - } -}; - class TFilterProgramStep: public IFetchingStep { private: using TBase = IFetchingStep; From cf9da46928a1ac31cca49faff55ed7d945ed072e Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Wed, 31 Jan 2024 14:04:47 +0300 Subject: [PATCH 5/6] correction --- .../engines/reader/plain_reader/fetched_data.h | 12 ++++++++++++ .../engines/reader/plain_reader/fetching.cpp | 3 +++ .../engines/reader/plain_reader/interval.cpp | 6 +++--- .../engines/reader/plain_reader/source.cpp | 2 +- .../columnshard/engines/reader/plain_reader/source.h | 12 +++++++++++- 5 files changed, 30 insertions(+), 5 deletions(-) diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h index 3e3d5fd7173d..8647345d6ce5 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h @@ -109,4 +109,16 @@ class TFetchedData { }; +class TFetchedResult { +private: + YDB_READONLY_DEF(std::shared_ptr, Batch); + YDB_READONLY_DEF(std::shared_ptr, NotAppliedFilter); +public: + TFetchedResult(std::unique_ptr&& data) + : Batch(data->GetBatch()) + , NotAppliedFilter(data->GetNotAppliedFilter()) { + + } +}; + } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp index 0773b0c50a45..96923fed2a2f 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp @@ -16,6 +16,7 @@ bool TStepAction::DoApply(IDataReader& /*owner*/) const { bool TStepAction::DoExecute() { while (Step) { if (Source->IsEmptyData()) { + Source->Finalize(); FinishedFlag = true; return true; } @@ -23,11 +24,13 @@ bool TStepAction::DoExecute() { return true; } if (Source->IsEmptyData()) { + Source->Finalize(); FinishedFlag = true; return true; } Step = Step->GetNextStep(); } + Source->Finalize(); FinishedFlag = true; return true; } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp index 5b55803cd3c8..dce2a8a3e1a3 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp @@ -60,7 +60,7 @@ class TMergeTask: public NColumnShard::IDataTasksProcessor::ITask { } virtual bool DoExecute() override { if (MergingContext->IsExclusiveInterval()) { - ResultBatch = Sources.begin()->second->GetStageData().GetBatch(); + ResultBatch = Sources.begin()->second->GetStageResult().GetBatch(); if (ResultBatch && ResultBatch->num_rows()) { LastPK = Sources.begin()->second->GetLastPK(); ResultBatch = NArrow::ExtractColumnsValidate(ResultBatch, Context->GetProgramInputColumns()->GetColumnNamesVector()); @@ -81,8 +81,8 @@ class TMergeTask: public NColumnShard::IDataTasksProcessor::ITask { } std::shared_ptr merger = Context->BuildMerger(); for (auto&& [_, i] : Sources) { - if (auto rb = i->GetStageData().GetBatch()) { - merger->AddSource(rb, i->GetStageData().GetNotAppliedFilter()); + if (auto rb = i->GetStageResult().GetBatch()) { + merger->AddSource(rb, i->GetStageResult().GetNotAppliedFilter()); } } AFL_VERIFY(merger->GetSourcesCount() <= Sources.size()); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp index 6bc4ddf410b9..e52f769a8172 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp @@ -15,7 +15,7 @@ namespace NKikimr::NOlap::NPlainReader { void IDataSource::InitFetchingPlan(const std::shared_ptr& fetchingFirstStep, const std::shared_ptr& sourcePtr, const bool isExclusive) { AFL_VERIFY(fetchingFirstStep); if (AtomicCas(&FilterStageFlag, 1, 0)) { - StageData = std::make_shared(isExclusive); + StageData = std::make_unique(isExclusive); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("InitFetchingPlan", fetchingFirstStep->DebugString())("source_idx", SourceIdx); NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFetchingPlan")); if (IsAborted()) { diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h index b8765b373e7a..f339312ecfd7 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h @@ -36,7 +36,8 @@ class IDataSource { protected: THashMap Intervals; - std::shared_ptr StageData; + std::unique_ptr StageData; + std::unique_ptr StageResult; TAtomic FilterStageFlag = 0; bool IsReadyFlag = false; @@ -51,8 +52,17 @@ class IDataSource { virtual void DoAbort() = 0; virtual void DoApplyIndex(const NIndexes::TIndexCheckerContainer& indexMeta) = 0; public: + const TFetchedResult& GetStageResult() const { + AFL_VERIFY(!!StageResult); + return *StageResult; + } + void SetIsReady(); + void Finalize() { + StageResult = std::make_unique(std::move(StageData)); + } + bool IsEmptyData() const { return GetStageData().IsEmpty(); } From 3b13c32a363fc50c6a2a251c6068596c85dc8c5d Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Wed, 31 Jan 2024 15:02:59 +0300 Subject: [PATCH 6/6] correction --- .../engines/reader/plain_reader/fetched_data.h | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h index 8647345d6ce5..62d3c1fcb3b2 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h @@ -48,13 +48,6 @@ class TFetchedData { } } - std::shared_ptr GetBatch() const { - if (!Table) { - return nullptr; - } - return NArrow::ToBatch(Table, true); - } - void AddNulls(THashMap&& blobs) { for (auto&& i : blobs) { AFL_VERIFY(Blobs.emplace(i.first, i.second).second); @@ -115,9 +108,10 @@ class TFetchedResult { YDB_READONLY_DEF(std::shared_ptr, NotAppliedFilter); public: TFetchedResult(std::unique_ptr&& data) - : Batch(data->GetBatch()) - , NotAppliedFilter(data->GetNotAppliedFilter()) { - + : NotAppliedFilter(data->GetNotAppliedFilter()) { + if (data->GetTable()) { + Batch = NArrow::ToBatch(data->GetTable(), true); + } } };