diff --git a/ydb/core/formats/arrow/arrow_filter.cpp b/ydb/core/formats/arrow/arrow_filter.cpp index e09f6d221e8c..bdfb8d0355f1 100644 --- a/ydb/core/formats/arrow/arrow_filter.cpp +++ b/ydb/core/formats/arrow/arrow_filter.cpp @@ -323,7 +323,7 @@ bool ApplyImpl(const TColumnFilter& filter, std::shared_ptr& batch, const } if (filter.IsTotalDenyFilter()) { batch = batch->Slice(0, 0); - return false; + return true; } if (filter.IsTotalAllowFilter()) { return true; @@ -343,11 +343,11 @@ bool ApplyImpl(const TColumnFilter& filter, std::shared_ptr& batch, const return false; } -bool TColumnFilter::Apply(std::shared_ptr& batch, const std::optional startPos, const std::optional count) { +bool TColumnFilter::Apply(std::shared_ptr& batch, const std::optional startPos, const std::optional count) const { return ApplyImpl(*this, batch, startPos, count); } -bool TColumnFilter::Apply(std::shared_ptr& batch, const std::optional startPos, const std::optional count) { +bool TColumnFilter::Apply(std::shared_ptr& batch, const std::optional startPos, const std::optional count) const { return ApplyImpl(*this, batch, startPos, count); } diff --git a/ydb/core/formats/arrow/arrow_filter.h b/ydb/core/formats/arrow/arrow_filter.h index 309467b5be37..29dc8471c548 100644 --- a/ydb/core/formats/arrow/arrow_filter.h +++ b/ydb/core/formats/arrow/arrow_filter.h @@ -171,8 +171,8 @@ class TColumnFilter { // It makes a filter using composite predicate static TColumnFilter MakePredicateFilter(const arrow::Datum& datum, const arrow::Datum& border, ECompareType compareType); - bool Apply(std::shared_ptr& batch, const std::optional startPos = {}, const std::optional count = {}); - bool Apply(std::shared_ptr& batch, const std::optional startPos = {}, const std::optional count = {}); + bool Apply(std::shared_ptr& batch, const std::optional startPos = {}, const std::optional count = {}) const; + bool Apply(std::shared_ptr& batch, const std::optional startPos = {}, const std::optional count = {}) const; void Apply(const ui32 expectedRecordsCount, std::vector& datums) const; // Combines filters by 'and' operator (extFilter count is true positions count in self, thought extFitler patch exactly that positions) diff --git a/ydb/core/tx/columnshard/columnshard__read_base.cpp b/ydb/core/tx/columnshard/columnshard__read_base.cpp index dbd0d638f636..000b8e7e63c8 100644 --- a/ydb/core/tx/columnshard/columnshard__read_base.cpp +++ b/ydb/core/tx/columnshard/columnshard__read_base.cpp @@ -45,6 +45,7 @@ bool TTxReadBase::ParseProgram(NKikimrSchemeOp::EOlapProgramType programType, AFL_VERIFY(namesChecker.emplace(names.back()).second); } NOlap::TProgramContainer container; + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "overriden_columns")("columns", JoinSeq(",", names)); container.OverrideProcessingColumns(std::vector(names.begin(), names.end())); read.SetProgram(std::move(container)); return true; diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index 114881eb4e27..e4c76ab6cccf 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -118,7 +118,7 @@ class TColumnShardScan : public TActorBootstrapped, NArrow::IR ResourceSubscribeActorId = ctx.Register(new NOlap::NResourceBroker::NSubscribe::TActor(TabletId, SelfId())); ReadCoordinatorActorId = ctx.Register(new NOlap::NBlobOperations::NRead::TReadCoordinatorActor(TabletId, SelfId())); - std::shared_ptr context = std::make_shared(StoragesManager, ScanCountersPool, false, + std::shared_ptr context = std::make_shared(StoragesManager, ScanCountersPool, ReadMetadataRanges[ReadMetadataIndex], SelfId(), ResourceSubscribeActorId, ReadCoordinatorActorId, ComputeShardingPolicy); ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan(context); @@ -367,7 +367,7 @@ class TColumnShardScan : public TActorBootstrapped, NArrow::IR return Finish(); } - auto context = std::make_shared(StoragesManager, ScanCountersPool, false, ReadMetadataRanges[ReadMetadataIndex], SelfId(), + auto context = std::make_shared(StoragesManager, ScanCountersPool, ReadMetadataRanges[ReadMetadataIndex], SelfId(), ResourceSubscribeActorId, ReadCoordinatorActorId, ComputeShardingPolicy); ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan(context); } @@ -921,29 +921,17 @@ class TCurrentBatch { Results.emplace_back(std::move(res)); } - void FillResult(std::vector& result, const bool mergePartsToMax) const { + void FillResult(std::vector& result) const { if (Results.empty()) { return; } - if (mergePartsToMax) { - std::vector> batches; - std::vector> guards; - for (auto&& i : Results) { - batches.emplace_back(i.GetResultBatchPtrVerified()); - guards.insert(guards.end(), i.GetResourcesGuards().begin(), i.GetResourcesGuards().end()); - } - auto res = NArrow::CombineBatches(batches); - AFL_VERIFY(res); - result.emplace_back(TPartialReadResult(guards, NArrow::TShardedRecordBatch(res), Results.back().GetLastReadKey())); - } else { - for (auto&& i : Results) { - result.emplace_back(std::move(i)); - } + for (auto&& i : Results) { + result.emplace_back(std::move(i)); } } }; -std::vector TPartialReadResult::SplitResults(std::vector&& resultsExt, const ui32 maxRecordsInResult, const bool mergePartsToMax) { +std::vector TPartialReadResult::SplitResults(std::vector&& resultsExt, const ui32 maxRecordsInResult) { std::vector resultBatches; TCurrentBatch currentBatch; for (auto&& i : resultsExt) { @@ -960,7 +948,7 @@ std::vector TPartialReadResult::SplitResults std::vector result; for (auto&& i : resultBatches) { - i.FillResult(result, mergePartsToMax); + i.FillResult(result); } return result; } diff --git a/ydb/core/tx/columnshard/columnshard__scan.h b/ydb/core/tx/columnshard/columnshard__scan.h index 5c20f2a5bdfb..2c6dd9c52a57 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.h +++ b/ydb/core/tx/columnshard/columnshard__scan.h @@ -47,7 +47,7 @@ class TPartialReadResult { return ResultBatch.GetRecordsCount(); } - static std::vector SplitResults(std::vector&& resultsExt, const ui32 maxRecordsInResult, const bool mergePartsToMax); + static std::vector SplitResults(std::vector&& resultsExt, const ui32 maxRecordsInResult); const NArrow::TShardedRecordBatch& GetShardedBatch() const { return ResultBatch; diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h index 00e4fd205aef..2ba089bb80d9 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h @@ -97,7 +97,7 @@ class TPortionInfo { TSerializationStats GetSerializationStat(const ISnapshotSchema& schema) const { TSerializationStats result; for (auto&& i : Records) { - if (schema.GetFieldByColumnId(i.ColumnId)) { + if (schema.GetFieldByColumnIdOptional(i.ColumnId)) { result.AddStat(i.GetSerializationStat(schema.GetFieldByColumnIdVerified(i.ColumnId)->name())); } } @@ -151,8 +151,7 @@ class TPortionInfo { : PathId(pathId) , Portion(portionId) , MinSnapshot(minSnapshot) - , BlobsOperator(blobsOperator) - { + , BlobsOperator(blobsOperator) { } TString DebugString(const bool withDetails = false) const; @@ -399,22 +398,30 @@ class TPortionInfo { public: TAssembleBlobInfo(const ui32 rowsCount) : NullRowsCount(rowsCount) { - + AFL_VERIFY(NullRowsCount); } TAssembleBlobInfo(const TString& data) : Data(data) { - + AFL_VERIFY(!!Data); } ui32 GetNullRowsCount() const noexcept { - return NullRowsCount; + return NullRowsCount; } const TString& GetData() const noexcept { return Data; } + bool IsBlob() const { + return !NullRowsCount && !!Data; + } + + bool IsNull() const { + return NullRowsCount && !Data; + } + std::shared_ptr BuildRecordBatch(const TColumnLoader& loader) const; }; @@ -437,8 +444,7 @@ class TPortionInfo { TPreparedColumn(std::vector&& blobs, const std::shared_ptr& loader) : Loader(loader) - , Blobs(std::move(blobs)) - { + , Blobs(std::move(blobs)) { Y_ABORT_UNLESS(Loader); Y_ABORT_UNLESS(Loader->GetExpectedSchema()->num_fields() == 1); } @@ -505,8 +511,7 @@ class TPortionInfo { TPreparedBatchData(std::vector&& columns, std::shared_ptr schema, const size_t rowsCount) : Columns(std::move(columns)) , Schema(schema) - , RowsCount(rowsCount) - { + , RowsCount(rowsCount) { } std::shared_ptr Assemble(const TAssembleOptions& options = {}) const; @@ -525,8 +530,7 @@ class TPortionInfo { : ColumnId(resultLoader->GetColumnId()) , NumRows(numRows) , DataLoader(dataLoader) - , ResultLoader(resultLoader) - { + , ResultLoader(resultLoader) { AFL_VERIFY(ResultLoader); if (DataLoader) { AFL_VERIFY(ResultLoader->GetColumnId() == DataLoader->GetColumnId()); @@ -598,8 +602,8 @@ class TPortionInfo { } std::shared_ptr AssembleInBatch(const ISnapshotSchema& dataSchema, - const ISnapshotSchema& resultSchema, - THashMap& data) const { + const ISnapshotSchema& resultSchema, + THashMap& data) const { auto batch = PrepareForAssemble(dataSchema, resultSchema, data).Assemble(); Y_ABORT_UNLESS(batch->Validate().ok()); return batch; diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.cpp index 6e73043a447e..0d05fe444c02 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.cpp @@ -12,6 +12,12 @@ TString TColumnsSet::DebugString() const { } NKikimr::NOlap::NPlainReader::TColumnsSet TColumnsSet::operator-(const TColumnsSet& external) const { + if (external.IsEmpty()) { + return *this; + } + if (IsEmpty()) { + return external; + } TColumnsSet result = *this; for (auto&& i : external.ColumnIds) { result.ColumnIds.erase(i); @@ -28,6 +34,12 @@ NKikimr::NOlap::NPlainReader::TColumnsSet TColumnsSet::operator-(const TColumnsS } NKikimr::NOlap::NPlainReader::TColumnsSet TColumnsSet::operator+(const TColumnsSet& external) const { + if (external.IsEmpty()) { + return *this; + } + if (IsEmpty()) { + return external; + } TColumnsSet result = *this; result.ColumnIds.insert(external.ColumnIds.begin(), external.ColumnIds.end()); auto fields = result.Schema->fields(); @@ -42,7 +54,7 @@ NKikimr::NOlap::NPlainReader::TColumnsSet TColumnsSet::operator+(const TColumnsS } bool TColumnsSet::ColumnsOnly(const std::vector& fieldNames) const { - if (fieldNames.size() != GetSize()) { + if (fieldNames.size() != GetColumnsCount()) { return false; } std::set fieldNamesSet; @@ -64,11 +76,7 @@ void TColumnsSet::Rebuild() { ColumnNamesVector.emplace_back(i); ColumnNames.emplace(i); } - if (ColumnIds.size()) { - FilteredSchema = std::make_shared(FullReadSchema, ColumnIds); - } else { - FilteredSchema = FullReadSchema; - } + FilteredSchema = std::make_shared(FullReadSchema, ColumnIds); } } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h index 5c6eaecb0531..9b50c4cfaa12 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/columns_set.h @@ -18,12 +18,19 @@ class TColumnsSet { public: TColumnsSet() = default; + bool IsEmpty() const { + return ColumnIds.empty(); + } + + bool operator!() const { + return IsEmpty(); + } const std::vector& GetColumnNamesVector() const { return ColumnNamesVector; } - ui32 GetSize() const { + ui32 GetColumnsCount() const { return ColumnIds.size(); } @@ -96,27 +103,4 @@ class TColumnsSet { TColumnsSet operator-(const TColumnsSet& external) const; }; -class TFetchingPlan { -private: - YDB_READONLY_DEF(std::shared_ptr, FilterStage); - YDB_READONLY_DEF(std::shared_ptr, FetchingStage); - bool CanUseEarlyFilterImmediatelyFlag = false; -public: - TFetchingPlan(const std::shared_ptr& filterStage, const std::shared_ptr& fetchingStage, const bool canUseEarlyFilterImmediately) - : FilterStage(filterStage) - , FetchingStage(fetchingStage) - , CanUseEarlyFilterImmediatelyFlag(canUseEarlyFilterImmediately) { - - } - - TString DebugString() const { - return TStringBuilder() << "{filter=" << (FilterStage ? FilterStage->DebugString() : "NO") << ";fetching=" << - (FetchingStage ? FetchingStage->DebugString() : "NO") << ";use_filter=" << CanUseEarlyFilterImmediatelyFlag << "}"; - } - - bool CanUseEarlyFilterImmediately() const { - return CanUseEarlyFilterImmediatelyFlag; - } -}; - } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.cpp deleted file mode 100644 index 8ae01b74e1ac..000000000000 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.cpp +++ /dev/null @@ -1,44 +0,0 @@ -#include "committed_assembler.h" -#include "plain_read_data.h" - -namespace NKikimr::NOlap::NPlainReader { - -bool TCommittedAssembler::DoExecute() { - ResultBatch = NArrow::DeserializeBatch(BlobData, ReadMetadata->GetBlobSchema(SchemaVersion)); - Y_ABORT_UNLESS(ResultBatch); - ResultBatch = ReadMetadata->GetIndexInfo().AddSpecialColumns(ResultBatch, DataSnapshot); - Y_ABORT_UNLESS(ResultBatch); - ReadMetadata->GetPKRangesFilter().BuildFilter(ResultBatch).Apply(ResultBatch); - auto t = NArrow::TStatusValidator::GetValid(arrow::Table::FromRecordBatches({ResultBatch})); - EarlyFilter = ReadMetadata->GetProgram().ApplyEarlyFilter(t, false); - ResultBatch = NArrow::ToBatch(t, true); - return true; -} - -bool TCommittedAssembler::DoApply(IDataReader& /*owner*/) const { - if (Source->GetFetchingPlan().GetFilterStage()->GetSchema()) { - Source->InitFilterStageData(nullptr, EarlyFilter, NArrow::ExtractColumns(ResultBatch, Source->GetFetchingPlan().GetFilterStage()->GetSchema(), true), Source); - } else { - Source->InitFilterStageData(nullptr, EarlyFilter, nullptr, Source); - } - if (Source->GetFetchingPlan().GetFetchingStage()->GetSchema()) { - Source->InitFetchStageData(NArrow::ExtractColumns(ResultBatch, Source->GetFetchingPlan().GetFetchingStage()->GetSchema(), true)); - } else { - Source->InitFetchStageData(nullptr); - } - return true; -} - -TCommittedAssembler::TCommittedAssembler(const NActors::TActorId& scanActorId, const TString& blobData, const TReadMetadata::TConstPtr& readMetadata, const std::shared_ptr& source, - const TCommittedBlob& cBlob, NColumnShard::TCounterGuard&& taskGuard) - : TBase(scanActorId) - , BlobData(blobData) - , ReadMetadata(readMetadata) - , Source(source) - , SchemaVersion(cBlob.GetSchemaVersion()) - , DataSnapshot(cBlob.GetSnapshot()) - , TaskGuard(std::move(taskGuard)) -{ -} - -} diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.h deleted file mode 100644 index eafcc342d990..000000000000 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.h +++ /dev/null @@ -1,34 +0,0 @@ -#pragma once -#include "source.h" -#include -#include -#include -#include -#include -#include - -namespace NKikimr::NOlap::NPlainReader { -class TCommittedAssembler: public NColumnShard::IDataTasksProcessor::ITask, public NColumnShard::TMonitoringObjectsCounter { -private: - using TBase = NColumnShard::IDataTasksProcessor::ITask; - TString BlobData; - TReadMetadata::TConstPtr ReadMetadata; - const std::shared_ptr Source; - ui64 SchemaVersion; - TSnapshot DataSnapshot; - - std::shared_ptr EarlyFilter; - std::shared_ptr ResultBatch; - const NColumnShard::TCounterGuard TaskGuard; -protected: - virtual bool DoExecute() override; - virtual bool DoApply(IDataReader& owner) const override; -public: - virtual TString GetTaskClassIdentifier() const override { - return "PlainReader::TCommittedAssembler"; - } - - TCommittedAssembler(const NActors::TActorId& scanActorId, const TString& blobData, const TReadMetadata::TConstPtr& readMetadata, const std::shared_ptr& source, - const TCommittedBlob& cBlob, NColumnShard::TCounterGuard&& taskGuard); -}; -} diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp index 9c06905261a7..a4ecce5de374 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp @@ -1,51 +1,20 @@ #include "constructor.h" -#include "filter_assembler.h" -#include "column_assembler.h" -#include "committed_assembler.h" -#include -#include -#include #include namespace NKikimr::NOlap::NPlainReader { -THashMap TAssembleColumnsTaskConstructor::BuildBatchAssembler() { - auto blobs = ExtractBlobsData(); - THashMap blobsDataAssemble; - for (auto&& i : blobs) { - blobsDataAssemble.emplace(i.first, i.second); - } - for (auto&& i : NullBlocks) { - AFL_VERIFY(blobsDataAssemble.emplace(i.first, i.second).second); - } - return blobsDataAssemble; -} - -void TEFTaskConstructor::DoOnDataReady(const std::shared_ptr& /*resourcesGuard*/) { - auto task = std::make_shared(Context, PortionInfo, Source, Columns, UseEarlyFilter, BuildBatchAssembler()); - task->SetPriority(NConveyor::ITask::EPriority::Normal); - NConveyor::TScanServiceOperator::SendTaskToExecute(task); -} - -void TFFColumnsTaskConstructor::DoOnDataReady(const std::shared_ptr& /*resourcesGuard*/) { - auto task = std::make_shared(Context, PortionInfo, Source, Columns, BuildBatchAssembler(), AppliedFilter); - task->SetPriority(NConveyor::ITask::EPriority::High); - NConveyor::TScanServiceOperator::SendTaskToExecute(task); -} - -void TCommittedColumnsTaskConstructor::DoOnDataReady(const std::shared_ptr& /*resourcesGuard*/) { - auto blobs = ExtractBlobsData(); - Y_ABORT_UNLESS(NullBlocks.size() == 0); - Y_ABORT_UNLESS(blobs.size() == 1); - auto task = std::make_shared(Context->GetCommonContext()->GetScanActorId(), blobs.begin()->second, - Context->GetReadMetadata(), Source, CommittedBlob, Context->GetCommonContext()->GetCounters().GetAssembleTasksGuard()); - task->SetPriority(NConveyor::ITask::EPriority::High); +void TBlobsFetcherTask::DoOnDataReady(const std::shared_ptr& /*resourcesGuard*/) { + Source->MutableStageData().AddBlobs(ExtractBlobsData()); + AFL_VERIFY(Step->GetNextStep()); + auto task = std::make_shared(Source, Step->GetNextStep(), Context->GetCommonContext()->GetScanActorId()); NConveyor::TScanServiceOperator::SendTaskToExecute(task); } -bool IFetchTaskConstructor::DoOnError(const TBlobRange& range, const IBlobsReadingAction::TErrorStatus& status) { - AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("error_on_blob_reading", range.ToString())("scan_actor_id", Context->GetCommonContext()->GetScanActorId())("status", status.GetErrorMessage())("status_code", status.GetStatus()); - NActors::TActorContext::AsActorContext().Send(Context->GetCommonContext()->GetScanActorId(), std::make_unique(TConclusionStatus::Fail("cannot read blob range " + range.ToString()))); +bool TBlobsFetcherTask::DoOnError(const TBlobRange& range, const IBlobsReadingAction::TErrorStatus& status) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("error_on_blob_reading", range.ToString())("scan_actor_id", Context->GetCommonContext()->GetScanActorId()) + ("status", status.GetErrorMessage())("status_code", status.GetStatus()); + NActors::TActorContext::AsActorContext().Send(Context->GetCommonContext()->GetScanActorId(), + std::make_unique(TConclusionStatus::Fail("cannot read blob range " + range.ToString()))); return false; } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h index 54105ee81efd..3b5ceca5200e 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h @@ -8,87 +8,25 @@ namespace NKikimr::NOlap::NPlainReader { -class IFetchTaskConstructor: public NBlobOperations::NRead::ITask { +class TBlobsFetcherTask: public NBlobOperations::NRead::ITask { private: using TBase = NBlobOperations::NRead::ITask; -protected: const std::shared_ptr Source; + const std::shared_ptr Step; const std::shared_ptr Context; - THashMap NullBlocks; - NColumnShard::TCounterGuard TasksGuard; + + virtual void DoOnDataReady(const std::shared_ptr& resourcesGuard) override; virtual bool DoOnError(const TBlobRange& range, const IBlobsReadingAction::TErrorStatus& status) override; public: - IFetchTaskConstructor(const std::shared_ptr& context, const std::vector>& readActions, THashMap&& nullBlocks, - const std::shared_ptr& sourcePtr, const TString& taskCustomer) - : TBase(readActions, taskCustomer) + TBlobsFetcherTask(const std::vector>& readActions, const std::shared_ptr& sourcePtr, + const std::shared_ptr& step, const std::shared_ptr& context, const TString& taskCustomer, const TString& externalTaskId) + : TBase(readActions, taskCustomer, externalTaskId) , Source(sourcePtr) + , Step(step) , Context(context) - , NullBlocks(std::move(nullBlocks)) - , TasksGuard(context->GetCommonContext()->GetCounters().GetReadTasksGuard()) - { - } -}; - -class TCommittedColumnsTaskConstructor: public IFetchTaskConstructor { -private: - TCommittedBlob CommittedBlob; - using TBase = IFetchTaskConstructor; -protected: - virtual void DoOnDataReady(const std::shared_ptr& resourcesGuard) override; -public: - TCommittedColumnsTaskConstructor(const std::shared_ptr& context, const std::vector>& readActions, THashMap&& nullBlocks, - const TCommittedDataSource& source, const std::shared_ptr& sourcePtr, const TString& taskCustomer) - : TBase(context, readActions, std::move(nullBlocks), sourcePtr, taskCustomer) - , CommittedBlob(source.GetCommitted()) - { - - } -}; - -class TAssembleColumnsTaskConstructor: public IFetchTaskConstructor { -private: - using TBase = IFetchTaskConstructor; -protected: - std::shared_ptr Columns; - std::shared_ptr PortionInfo; - THashMap BuildBatchAssembler(); -public: - TAssembleColumnsTaskConstructor(const std::shared_ptr& context, const std::vector>& readActions, THashMap&& nullBlocks, - const std::shared_ptr& columns, const TPortionDataSource& portion, const std::shared_ptr& sourcePtr, const TString& taskCustomer) - : TBase(context, readActions, std::move(nullBlocks), sourcePtr, taskCustomer) - , Columns(columns) - , PortionInfo(portion.GetPortionInfoPtr()) { } }; -class TFFColumnsTaskConstructor: public TAssembleColumnsTaskConstructor { -private: - using TBase = TAssembleColumnsTaskConstructor; - std::shared_ptr AppliedFilter; - virtual void DoOnDataReady(const std::shared_ptr& resourcesGuard) override; -public: - TFFColumnsTaskConstructor(const std::shared_ptr& context, const std::vector>& readActions, THashMap&& nullBlocks, - const std::shared_ptr& columns, const TPortionDataSource& portion, const std::shared_ptr& sourcePtr, const TString& taskCustomer) - : TBase(context, readActions, std::move(nullBlocks), columns, portion, sourcePtr, taskCustomer) - , AppliedFilter(portion.GetFilterStageData().GetAppliedFilter()) - { - } -}; - -class TEFTaskConstructor: public TAssembleColumnsTaskConstructor { -private: - bool UseEarlyFilter = false; - using TBase = TAssembleColumnsTaskConstructor; - virtual void DoOnDataReady(const std::shared_ptr& resourcesGuard) override; -public: - TEFTaskConstructor(const std::shared_ptr& context, const std::vector>& readActions, THashMap&& nullBlocks, - const std::shared_ptr& columns, const TPortionDataSource& portion, const std::shared_ptr& sourcePtr, const bool useEarlyFilter, const TString& taskCustomer) - : TBase(context, readActions, std::move(nullBlocks), columns, portion, sourcePtr, taskCustomer) - , UseEarlyFilter(useEarlyFilter) - { - } -}; - } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp index 35a90e520bb2..78a5da19a253 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp @@ -8,14 +8,145 @@ std::shared_ptr TSpecialRea } ui64 TSpecialReadContext::GetMemoryForSources(const std::map>& sources, const bool isExclusive) { - auto fetchingPlan = GetColumnsFetchingPlan(isExclusive); ui64 result = 0; for (auto&& i : sources) { + auto fetchingPlan = GetColumnsFetchingPlan(i.second, isExclusive); AFL_VERIFY(i.second->GetIntervalsCount()); - result += i.second->GetRawBytes(fetchingPlan.GetFilterStage()->GetColumnIds()) / i.second->GetIntervalsCount(); - result += i.second->GetRawBytes(fetchingPlan.GetFetchingStage()->GetColumnIds()) / i.second->GetIntervalsCount(); + result += fetchingPlan->PredictRawBytes(i.second) / i.second->GetIntervalsCount(); } return result; } +std::shared_ptr TSpecialReadContext::GetColumnsFetchingPlan(const std::shared_ptr& source, const bool exclusiveSource) const { + const bool needSnapshots = !exclusiveSource || ReadMetadata->GetSnapshot() < source->GetRecordSnapshotMax(); + auto result = CacheFetchingScripts[needSnapshots ? 1 : 0][exclusiveSource ? 1 : 0]; + if (!result) { + return std::make_shared(source->GetRecordsCount(), "fake"); + } + return result; +} + +std::shared_ptr TSpecialReadContext::BuildColumnsFetchingPlan(const bool needSnapshots, const bool exclusiveSource) const { + if (!EFColumns->GetColumnsCount()) { + TColumnsSet columnsFetch = *FFColumns; + if (needSnapshots) { + columnsFetch = columnsFetch + *SpecColumns; + } + if (!exclusiveSource) { + columnsFetch = columnsFetch + *PKColumns + *SpecColumns; + } + if (columnsFetch.GetColumnsCount()) { + std::shared_ptr result = std::make_shared(std::make_shared(columnsFetch), "simple"); + std::shared_ptr current = result; + current = current->AttachNext(std::make_shared(std::make_shared(columnsFetch))); + return result; + } else { + return nullptr; + } + } else if (exclusiveSource) { + TColumnsSet columnsFetch = *EFColumns; + if (needSnapshots || FFColumns->Contains(SpecColumns)) { + columnsFetch = columnsFetch + *SpecColumns; + } + AFL_VERIFY(columnsFetch.GetColumnsCount()); + std::shared_ptr result = std::make_shared(std::make_shared(columnsFetch), "ef"); + std::shared_ptr current = result; + + if (needSnapshots || FFColumns->Contains(SpecColumns)) { + current = current->AttachNext(std::make_shared(SpecColumns)); + current = current->AttachNext(std::make_shared()); + columnsFetch = columnsFetch - *SpecColumns; + } + current = current->AttachNext(std::make_shared(std::make_shared(columnsFetch))); + if (!ReadMetadata->GetPKRangesFilter().IsEmpty()) { + current = current->AttachNext(std::make_shared()); + } + for (auto&& i : ReadMetadata->GetProgram().GetSteps()) { + if (!i->IsFilterOnly()) { + break; + } + current = current->AttachNext(std::make_shared(i)); + } + const TColumnsSet columnsAdditionalFetch = *FFColumns - *EFColumns - *SpecColumns; + if (columnsAdditionalFetch.GetColumnsCount()) { + current = current->AttachNext(std::make_shared(std::make_shared(columnsAdditionalFetch))); + current = current->AttachNext(std::make_shared(std::make_shared(columnsAdditionalFetch))); + } + return result; + } else { + TColumnsSet columnsFetch = *MergeColumns + *EFColumns; + AFL_VERIFY(columnsFetch.GetColumnsCount()); + std::shared_ptr result = std::make_shared(std::make_shared(columnsFetch), "full"); + std::shared_ptr current = result; + + current = current->AttachNext(std::make_shared(SpecColumns)); + if (needSnapshots) { + current = current->AttachNext(std::make_shared()); + } + current = current->AttachNext(std::make_shared(PKColumns)); + if (!ReadMetadata->GetPKRangesFilter().IsEmpty()) { + current = current->AttachNext(std::make_shared()); + } + const TColumnsSet columnsFetchEF = columnsFetch - *SpecColumns - *PKColumns; + current = current->AttachNext(std::make_shared(std::make_shared(columnsFetchEF))); + for (auto&& i : ReadMetadata->GetProgram().GetSteps()) { + if (!i->IsFilterOnly()) { + break; + } + current = current->AttachNext(std::make_shared(i)); + } + const TColumnsSet columnsAdditionalFetch = *FFColumns - *EFColumns - *SpecColumns - *PKColumns; + if (columnsAdditionalFetch.GetColumnsCount()) { + current = current->AttachNext(std::make_shared(std::make_shared(columnsAdditionalFetch))); + current = current->AttachNext(std::make_shared(std::make_shared(columnsAdditionalFetch))); + } + return result; + } +} + +TSpecialReadContext::TSpecialReadContext(const std::shared_ptr& commonContext) + : CommonContext(commonContext) +{ + ReadMetadata = dynamic_pointer_cast(CommonContext->GetReadMetadata()); + Y_ABORT_UNLESS(ReadMetadata); + Y_ABORT_UNLESS(ReadMetadata->SelectInfo); + + auto readSchema = ReadMetadata->GetLoadSchema(ReadMetadata->GetSnapshot()); + SpecColumns = std::make_shared(TIndexInfo::GetSpecialColumnIdsSet(), ReadMetadata->GetIndexInfo(), readSchema); + { + auto efColumns = ReadMetadata->GetEarlyFilterColumnIds(); + if (efColumns.size()) { + EFColumns = std::make_shared(efColumns, ReadMetadata->GetIndexInfo(), readSchema); + } else { + EFColumns = std::make_shared(); + } + } + if (ReadMetadata->HasProcessingColumnIds()) { + FFColumns = std::make_shared(ReadMetadata->GetProcessingColumnIds(), ReadMetadata->GetIndexInfo(), readSchema); + if (SpecColumns->Contains(*FFColumns) && !EFColumns->IsEmpty()) { + FFColumns = std::make_shared(*EFColumns + *SpecColumns); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("ff_modified", FFColumns->DebugString()); + } else { + AFL_VERIFY(!FFColumns->Contains(*SpecColumns))("info", FFColumns->DebugString()); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("ff_first", FFColumns->DebugString()); + } + } else { + FFColumns = EFColumns; + } + if (FFColumns->IsEmpty()) { + ProgramInputColumns = SpecColumns; + } else { + ProgramInputColumns = FFColumns; + } + + PKColumns = std::make_shared(ReadMetadata->GetPKColumnIds(), ReadMetadata->GetIndexInfo(), readSchema); + MergeColumns = std::make_shared(*PKColumns + *SpecColumns); + + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("columns_context_info", DebugString()); + CacheFetchingScripts[0][0] = BuildColumnsFetchingPlan(false, false); + CacheFetchingScripts[0][1] = BuildColumnsFetchingPlan(false, true); + CacheFetchingScripts[1][0] = BuildColumnsFetchingPlan(true, false); + CacheFetchingScripts[1][1] = BuildColumnsFetchingPlan(true, true); +} + } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h index 2c6f213143c4..3fe05f23d8b3 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h @@ -1,5 +1,6 @@ #pragma once #include "columns_set.h" +#include "fetching.h" #include #include @@ -23,8 +24,8 @@ class TSpecialReadContext { std::shared_ptr PKFFColumns; std::shared_ptr EFPKColumns; std::shared_ptr FFMinusEFColumns; - std::shared_ptr FFMinusEFPKColumns; - bool TrivialEFFlag = false; + std::shared_ptr BuildColumnsFetchingPlan(const bool needSnapshotsFilter, const bool exclusiveSource) const; + std::array, 2>, 2> CacheFetchingScripts; public: ui64 GetMemoryForSources(const std::map>& sources, const bool isExclusive); @@ -43,65 +44,9 @@ class TSpecialReadContext { ; } - TSpecialReadContext(const std::shared_ptr& commonContext) - : CommonContext(commonContext) - { - ReadMetadata = dynamic_pointer_cast(CommonContext->GetReadMetadata()); - Y_ABORT_UNLESS(ReadMetadata); - Y_ABORT_UNLESS(ReadMetadata->SelectInfo); + TSpecialReadContext(const std::shared_ptr& commonContext); - auto readSchema = ReadMetadata->GetLoadSchema(ReadMetadata->GetSnapshot()); - SpecColumns = std::make_shared(TIndexInfo::GetSpecialColumnIdsSet(), ReadMetadata->GetIndexInfo(), readSchema); - { - auto efColumns = ReadMetadata->GetEarlyFilterColumnIds(); - if (efColumns.size()) { - EFColumns = std::make_shared(ReadMetadata->GetEarlyFilterColumnIds(), ReadMetadata->GetIndexInfo(), readSchema); - } else { - EFColumns = SpecColumns; - } - } - *EFColumns = *EFColumns + *SpecColumns; - if (ReadMetadata->HasProcessingColumnIds()) { - FFColumns = std::make_shared(ReadMetadata->GetProcessingColumnIds(), ReadMetadata->GetIndexInfo(), readSchema); - AFL_VERIFY(!FFColumns->Contains(*SpecColumns))("info", FFColumns->DebugString()); - *FFColumns = *FFColumns + *EFColumns; - } else { - FFColumns = std::make_shared(*EFColumns); - } - ProgramInputColumns = FFColumns; - - PKColumns = std::make_shared(ReadMetadata->GetPKColumnIds(), ReadMetadata->GetIndexInfo(), readSchema); - MergeColumns = std::make_shared(*PKColumns + *SpecColumns); - - TrivialEFFlag = EFColumns->ColumnsOnly(ReadMetadata->GetIndexInfo().ArrowSchemaSnapshot()->field_names()); - - PKFFColumns = std::make_shared(*PKColumns + *FFColumns); - EFPKColumns = std::make_shared(*EFColumns + *PKColumns); - FFMinusEFColumns = std::make_shared(*FFColumns - *EFColumns); - FFMinusEFPKColumns = std::make_shared(*FFColumns - *EFColumns - *PKColumns); - - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("columns_context_info", DebugString()); - } - - TFetchingPlan GetColumnsFetchingPlan(const bool exclusiveSource) const { - if (CommonContext->GetIsInternalRead()) { - return TFetchingPlan(PKFFColumns, EmptyColumns, exclusiveSource); - } - - if (exclusiveSource) { - if (TrivialEFFlag) { - return TFetchingPlan(FFColumns, EmptyColumns, true); - } else { - return TFetchingPlan(EFColumns, FFMinusEFColumns, true); - } - } else { - if (TrivialEFFlag) { - return TFetchingPlan(PKFFColumns, EmptyColumns, false); - } else { - return TFetchingPlan(EFPKColumns, FFMinusEFPKColumns, false); - } - } - } + std::shared_ptr GetColumnsFetchingPlan(const std::shared_ptr& source, const bool exclusiveSource) const; }; } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.cpp index 1c9e0df7ff58..d4f1a808b742 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.cpp @@ -1,5 +1,17 @@ #include "fetched_data.h" +#include +#include namespace NKikimr::NOlap { +void TFetchedData::SyncTableColumns(const std::vector>& fields) { + for (auto&& i : fields) { + if (Table->GetColumnByName(i->name())) { + continue; + } + Table = NArrow::TStatusValidator::GetValid(Table->AddColumn(Table->num_columns(), i, + std::make_shared(NArrow::TThreadSimpleArraysCache::GetNull(i->type(), Table->num_rows())))); + } +} + } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h index c07efcaf473b..4c29f220d34f 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetched_data.h @@ -1,45 +1,109 @@ #pragma once -#include +#include #include #include +#include +#include #include +#include namespace NKikimr::NOlap { class TFetchedData { protected: - YDB_READONLY_DEF(std::shared_ptr, Batch); + using TBlobs = THashMap; + YDB_ACCESSOR_DEF(TBlobs, Blobs); + YDB_READONLY_DEF(std::shared_ptr, Table); + YDB_READONLY_DEF(std::shared_ptr, Filter); + YDB_READONLY(bool, UseFilter, false); public: - TFetchedData(const std::shared_ptr& batch) - : Batch(batch) + TFetchedData(const bool useFilter) + : UseFilter(useFilter) { + + } + + void SyncTableColumns(const std::vector>& fields); + + std::shared_ptr GetAppliedFilter() const { + return UseFilter ? Filter : nullptr; + } + + std::shared_ptr GetNotAppliedFilter() const { + return UseFilter ? nullptr : Filter; + } + + TString ExtractBlob(const TBlobRange& bRange) { + auto it = Blobs.find(bRange); + AFL_VERIFY(it != Blobs.end()); + AFL_VERIFY(it->second.IsBlob()); + auto result = it->second.GetData(); + Blobs.erase(it); + return result; + } + + void AddBlobs(THashMap&& blobs) { + for (auto&& i : blobs) { + AFL_VERIFY(Blobs.emplace(i.first, std::move(i.second)).second); + } + } + + std::shared_ptr GetBatch() const { + return NArrow::ToBatch(Table, true); + } + + void AddNulls(THashMap&& blobs) { + for (auto&& i : blobs) { + AFL_VERIFY(Blobs.emplace(i.first, i.second).second); + } } -}; -class TFilterStageData: public TFetchedData { -private: - using TBase = TFetchedData; - YDB_READONLY_DEF(std::shared_ptr, AppliedFilter); - YDB_READONLY_DEF(std::shared_ptr, NotAppliedEarlyFilter); -public: bool IsEmptyFilter() const { - return (AppliedFilter && AppliedFilter->IsTotalDenyFilter()) || (NotAppliedEarlyFilter && NotAppliedEarlyFilter->IsTotalDenyFilter()); + return Filter && Filter->IsTotalDenyFilter(); } - TFilterStageData(const std::shared_ptr& appliedFilter, const std::shared_ptr& earlyFilter, const std::shared_ptr& batch) - : TBase(batch) - , AppliedFilter(appliedFilter) - , NotAppliedEarlyFilter(earlyFilter) - { + bool IsEmpty() const { + return IsEmptyFilter() || (Table && !Table->num_rows()); + } + void AddFilter(const std::shared_ptr& filter) { + if (UseFilter && Table && filter) { + AFL_VERIFY(filter->Apply(Table)); + } + if (!Filter) { + Filter = filter; + } else if (filter) { + *Filter = Filter->CombineSequentialAnd(*filter); + } + } + + void AddFilter(const NArrow::TColumnFilter& filter) { + if (UseFilter && Table) { + AFL_VERIFY(filter.Apply(Table)); + } + if (!Filter) { + Filter = std::make_shared(filter); + } else { + *Filter = Filter->CombineSequentialAnd(filter); + } + } + + void AddBatch(const std::shared_ptr& batch) { + return AddBatch(arrow::Table::Make(batch->schema(), batch->columns(), batch->num_rows())); + } + + void AddBatch(const std::shared_ptr& table) { + auto tableLocal = table; + if (Filter && UseFilter) { + AFL_VERIFY(Filter->Apply(tableLocal)); + } + if (!Table) { + Table = tableLocal; + } else { + AFL_VERIFY(NArrow::MergeBatchColumns({Table, tableLocal}, Table)); + } } -}; -class TFetchStageData: public TFetchedData { -private: - using TBase = TFetchedData; -public: - using TBase::TBase; }; } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp new file mode 100644 index 000000000000..3058b64781c0 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp @@ -0,0 +1,71 @@ +#include "fetching.h" +#include "source.h" +#include +#include + +namespace NKikimr::NOlap::NPlainReader { + +bool TStepAction::DoApply(IDataReader& /*owner*/) const { + if (FinishedFlag) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "apply"); + Source->SetIsReady(); + } + return true; +} + +bool TStepAction::DoExecute() { + while (Step) { + if (!Step->ExecuteInplace(Source, Step)) { + return true; + } + if (Source->IsEmptyData()) { + FinishedFlag = true; + return true; + } + Step = Step->GetNextStep(); + } + FinishedFlag = true; + return true; +} + +bool TBlobsFetchingStep::DoExecuteInplace(const std::shared_ptr& source, const std::shared_ptr& step) const { + return !source->StartFetchingColumns(source, step, Columns); +} + +ui64 TBlobsFetchingStep::PredictRawBytes(const std::shared_ptr& source) const { + return source->GetRawBytes(Columns->GetColumnIds()); +} + +bool TAssemblerStep::DoExecuteInplace(const std::shared_ptr& source, const std::shared_ptr& /*step*/) const { + source->AssembleColumns(Columns); + return true; +} + +bool TFilterProgramStep::DoExecuteInplace(const std::shared_ptr& source, const std::shared_ptr& /*step*/) const { + auto filter = Step->BuildFilter(source->GetStageData().GetTable()); + source->MutableStageData().AddFilter(filter); + return true; +} + +bool TPredicateFilter::DoExecuteInplace(const std::shared_ptr& source, const std::shared_ptr& /*step*/) const { + auto filter = source->GetContext()->GetReadMetadata()->GetPKRangesFilter().BuildFilter(source->GetStageData().GetTable()); + source->MutableStageData().AddFilter(filter); + return true; +} + +bool TSnapshotFilter::DoExecuteInplace(const std::shared_ptr& source, const std::shared_ptr& /*step*/) const { + auto filter = MakeSnapshotFilter(source->GetStageData().GetTable(), source->GetContext()->GetReadMetadata()->GetSnapshot()); + source->MutableStageData().AddFilter(filter); + return true; +} + +bool TBuildFakeSpec::DoExecuteInplace(const std::shared_ptr& source, const std::shared_ptr& /*step*/) const { + std::vector> columns; + for (auto&& f : TIndexInfo::ArrowSchemaSnapshot()->fields()) { + columns.emplace_back(NArrow::TThreadSimpleArraysCache::GetConst(f->type(), std::make_shared(0), Count)); + } + source->MutableStageData().AddBatch(arrow::RecordBatch::Make(TIndexInfo::ArrowSchemaSnapshot(), Count, columns)); + return true; +} + +} diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.h new file mode 100644 index 000000000000..297863392a7b --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.h @@ -0,0 +1,174 @@ +#pragma once +#include "columns_set.h" +#include +#include +#include +#include +#include + +namespace NKikimr::NOlap::NPlainReader { +class IDataSource; + +class IFetchingStep { +private: + std::shared_ptr NextStep; + YDB_READONLY_DEF(TString, Name); + YDB_READONLY(ui32, Index, 0); + YDB_READONLY_DEF(TString, BranchName); +protected: + virtual bool DoExecuteInplace(const std::shared_ptr& source, const std::shared_ptr& step) const = 0; + virtual TString DoDebugString() const { + return ""; + } +public: + virtual ~IFetchingStep() = default; + + std::shared_ptr AttachNext(const std::shared_ptr& nextStep) { + AFL_VERIFY(nextStep); + NextStep = nextStep; + nextStep->Index = Index + 1; + nextStep->BranchName = BranchName; + return nextStep; + } + + virtual ui64 PredictRawBytes(const std::shared_ptr& /*source*/) const { + return 0; + } + + bool ExecuteInplace(const std::shared_ptr& source, const std::shared_ptr& step) const { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("scan_step", DebugString())("scan_step_idx", GetIndex()); + return DoExecuteInplace(source, step); + } + + const std::shared_ptr& GetNextStep() const { + return NextStep; + } + + IFetchingStep(const TString& name, const TString& branchName = Default()) + : Name(name) + , BranchName(branchName) + { + + } + + TString DebugString() const { + TStringBuilder sb; + sb << "name=" << Name << ";" << DoDebugString() << ";branch=" << BranchName << ";"; + if (NextStep) { + sb << "next=" << NextStep->DebugString() << ";"; + } + return sb; + } +}; + +class TStepAction: public NColumnShard::IDataTasksProcessor::ITask { +private: + using TBase = NColumnShard::IDataTasksProcessor::ITask; + std::shared_ptr Source; + std::shared_ptr Step; + bool FinishedFlag = false; +protected: + virtual bool DoApply(IDataReader& /*owner*/) const override; + virtual bool DoExecute() override; +public: + virtual TString GetTaskClassIdentifier() const override { + return "STEP_ACTION"; + } + + TStepAction(const std::shared_ptr& source, const std::shared_ptr& step, const NActors::TActorId& ownerActorId) + : TBase(ownerActorId) + , Source(source) + , Step(step) + { + + } +}; + +class TBuildFakeSpec: public IFetchingStep { +private: + using TBase = IFetchingStep; + const ui32 Count = 0; +protected: + virtual bool DoExecuteInplace(const std::shared_ptr& source, const std::shared_ptr& /*step*/) const override; +public: + TBuildFakeSpec(const ui32 count, const TString& nameBranch = "") + : TBase("FAKE_SPEC", nameBranch) + , Count(count) + { + AFL_VERIFY(Count); + } +}; + +class TBlobsFetchingStep: public IFetchingStep { +private: + using TBase = IFetchingStep; + std::shared_ptr Columns; +protected: + virtual bool DoExecuteInplace(const std::shared_ptr& source, const std::shared_ptr& step) const override; + virtual ui64 PredictRawBytes(const std::shared_ptr& source) const override; + virtual TString DoDebugString() const override { + return TStringBuilder() << "columns=" << Columns->DebugString() << ";"; + } +public: + TBlobsFetchingStep(const std::shared_ptr& columns, const TString& nameBranch = "") + : TBase("FETCHING", nameBranch) + , Columns(columns) + { + AFL_VERIFY(Columns); + } +}; + +class TAssemblerStep: public IFetchingStep { +private: + using TBase = IFetchingStep; + YDB_READONLY_DEF(std::shared_ptr, Columns); + virtual TString DoDebugString() const override { + return TStringBuilder() << "columns=" << Columns->DebugString() << ";"; + } +public: + virtual bool DoExecuteInplace(const std::shared_ptr& source, const std::shared_ptr& /*step*/) const override; + TAssemblerStep(const std::shared_ptr& columns) + : TBase("ASSEMBLER") + , Columns(columns) + { + AFL_VERIFY(Columns); + } +}; + +class TFilterProgramStep: public IFetchingStep { +private: + using TBase = IFetchingStep; + std::shared_ptr Step; +public: + virtual bool DoExecuteInplace(const std::shared_ptr& source, const std::shared_ptr& step) const override; + TFilterProgramStep(const std::shared_ptr& step) + : TBase("PROGRAM") + , Step(step) + { + + } +}; + +class TPredicateFilter: public IFetchingStep { +private: + using TBase = IFetchingStep; +public: + virtual bool DoExecuteInplace(const std::shared_ptr& source, const std::shared_ptr& step) const override; + TPredicateFilter() + : TBase("PREDICATE") { + + } +}; + +class TSnapshotFilter: public IFetchingStep { +private: + using TBase = IFetchingStep; +public: + virtual bool DoExecuteInplace(const std::shared_ptr& source, const std::shared_ptr& step) const override; + TSnapshotFilter() + : TBase("SNAPSHOT") { + + } +}; + +} diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp deleted file mode 100644 index 7b12b1ebf01e..000000000000 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp +++ /dev/null @@ -1,231 +0,0 @@ -#include "filter_assembler.h" -#include "plain_read_data.h" -#include -#include - -namespace NKikimr::NOlap::NPlainReader { - -class TFilterContext { -private: - TPortionInfo::TPreparedBatchData BatchAssembler; - YDB_ACCESSOR_DEF(std::shared_ptr, Context); - YDB_ACCESSOR_DEF(std::set, ReadyColumnIds); - std::optional OriginalCount; - YDB_READONLY(std::shared_ptr, AppliedFilter, std::make_shared(NArrow::TColumnFilter::BuildAllowFilter())); - YDB_READONLY_DEF(std::shared_ptr, EarlyFilter); - YDB_READONLY_DEF(std::shared_ptr, ResultTable); -public: - TFilterContext(TPortionInfo::TPreparedBatchData&& batchAssembler, const std::shared_ptr& context) - : BatchAssembler(std::move(batchAssembler)) - , Context(context) - { - - } - - ui32 GetOriginalCountVerified() const { - AFL_VERIFY(OriginalCount); - return *OriginalCount; - } - - std::shared_ptr AppendToResult(const std::set& columnIds, const std::optional> constantFill = {}) { - TPortionInfo::TPreparedBatchData::TAssembleOptions options; - options.IncludedColumnIds = columnIds; - for (auto it = options.IncludedColumnIds->begin(); it != options.IncludedColumnIds->end();) { - if (!ReadyColumnIds.emplace(*it).second) { - it = options.IncludedColumnIds->erase(it); - } else { - ++it; - } - } - if (options.IncludedColumnIds->empty()) { - AFL_VERIFY(ResultTable); - return ResultTable; - } - if (constantFill) { - for (auto&& i : *options.IncludedColumnIds) { - options.ConstantColumnIds.emplace(i, *constantFill); - } - } - std::shared_ptr table = BatchAssembler.AssembleTable(options); - AFL_VERIFY(table); - if (!OriginalCount) { - OriginalCount = table->num_rows(); - } - AFL_VERIFY(AppliedFilter->Apply(table)); - if (!ResultTable) { - ResultTable = table; - } else { - AFL_VERIFY(NArrow::MergeBatchColumns({ResultTable, table}, ResultTable)); - } - return ResultTable; - } - - void ApplyFilter(const std::shared_ptr& filter, const bool useFilter) { - if (useFilter) { - filter->Apply(ResultTable); - *AppliedFilter = AppliedFilter->CombineSequentialAnd(*filter); - } else if (EarlyFilter) { - *EarlyFilter = EarlyFilter->And(*filter); - } else { - EarlyFilter = filter; - } - } -}; - -class IFilterConstructor { -private: - const std::set ColumnIds; - const std::optional> ConstantFill; -protected: - virtual std::shared_ptr BuildFilter(const TFilterContext& filterContext, const std::shared_ptr& data) const = 0; -public: - IFilterConstructor(const std::set& columnIds, const std::optional> constantFill = {}) - : ColumnIds(columnIds) - , ConstantFill(constantFill) - { - AFL_VERIFY(ColumnIds.size()); - } - - virtual ~IFilterConstructor() = default; - - void Execute(TFilterContext& filterContext, const bool useFilter) { - auto result = filterContext.AppendToResult(ColumnIds, ConstantFill); - auto localFilter = BuildFilter(filterContext, result); - AFL_VERIFY(!!localFilter); - filterContext.ApplyFilter(localFilter, useFilter); - } -}; - -class TPredicateFilter: public IFilterConstructor { -private: - using TBase = IFilterConstructor; -protected: - virtual std::shared_ptr BuildFilter(const TFilterContext& filterContext, const std::shared_ptr& data) const override { - return std::make_shared(filterContext.GetContext()->GetReadMetadata()->GetPKRangesFilter().BuildFilter(data)); - } -public: - TPredicateFilter(const std::shared_ptr& ctx) - : TBase(ctx->GetReadMetadata()->GetPKRangesFilter().GetColumnIds(ctx->GetReadMetadata()->GetIndexInfo())) { - - } -}; - -class TRestoreMergeData: public IFilterConstructor { -private: - using TBase = IFilterConstructor; -protected: - virtual std::shared_ptr BuildFilter(const TFilterContext& /*filterContext*/, const std::shared_ptr& /*data*/) const override { - return std::make_shared(NArrow::TColumnFilter::BuildAllowFilter()); - } -public: - TRestoreMergeData(const std::shared_ptr& ctx) - : TBase(ctx->GetMergeColumns()->GetColumnIds()) { - - } -}; - -class TFakeSnapshotData: public IFilterConstructor { -private: - using TBase = IFilterConstructor; -protected: - virtual std::shared_ptr BuildFilter(const TFilterContext& /*filterContext*/, const std::shared_ptr& /*data*/) const override { - return std::make_shared(NArrow::TColumnFilter::BuildAllowFilter()); - } -public: - TFakeSnapshotData(const std::shared_ptr& ctx) - : TBase(ctx->GetSpecColumns()->GetColumnIds(), std::make_shared(0)) { - - } -}; - -class TSnapshotFilter: public IFilterConstructor { -private: - using TBase = IFilterConstructor; -protected: - virtual std::shared_ptr BuildFilter(const TFilterContext& filterContext, const std::shared_ptr& data) const override { - return std::make_shared(MakeSnapshotFilter(data, filterContext.GetContext()->GetReadMetadata()->GetSnapshot())); - } -public: - TSnapshotFilter(const std::shared_ptr& ctx) - : TBase(ctx->GetSpecColumns()->GetColumnIds()) { - - } -}; - -class TProgramStepFilter: public IFilterConstructor { -private: - using TBase = IFilterConstructor; - std::shared_ptr Step; -protected: - virtual std::shared_ptr BuildFilter(const TFilterContext& /*filterContext*/, const std::shared_ptr& data) const override { - return Step->BuildFilter(data); - } -public: - TProgramStepFilter(const std::shared_ptr& step) - : TBase(step->GetFilterOriginalColumnIds()) - , Step(step) - { - } -}; - -bool TAssembleFilter::DoExecute() { - std::vector> filters; - if (!UseFilter) { - filters.emplace_back(std::make_shared(Context)); - } - if (FilterColumns->GetColumnIds().contains((ui32)TIndexInfo::ESpecialColumn::PLAN_STEP)) { - const bool needSnapshotsFilter = ReadMetadata->GetSnapshot() < RecordsMaxSnapshot; - if (needSnapshotsFilter) { - filters.emplace_back(std::make_shared(Context)); - } else { - filters.emplace_back(std::make_shared(Context)); - } - } - if (!ReadMetadata->GetPKRangesFilter().IsEmpty()) { - filters.emplace_back(std::make_shared(Context)); - } - - for (auto&& i : ReadMetadata->GetProgram().GetSteps()) { - if (!i->IsFilterOnly()) { - break; - } - - filters.emplace_back(std::make_shared(i)); - } - - TFilterContext filterContext(BuildBatchConstructor(FilterColumns->GetFilteredSchemaVerified()), Context); - - for (auto&& f : filters) { - f->Execute(filterContext, UseFilter); - AFL_VERIFY(filterContext.GetResultTable()); - if (filterContext.GetResultTable()->num_rows() == 0 || (filterContext.GetEarlyFilter() && filterContext.GetEarlyFilter()->IsTotalDenyFilter())) { - break; - } - } - - AppliedFilter = filterContext.GetAppliedFilter(); - EarlyFilter = filterContext.GetEarlyFilter(); - - auto batch = filterContext.GetResultTable(); - if (!batch || batch->num_rows() == 0) { - return true; - } - - OriginalCount = filterContext.GetOriginalCountVerified(); - auto fullTable = filterContext.AppendToResult(FilterColumns->GetColumnIds()); - AFL_VERIFY(AppliedFilter->IsTotalAllowFilter() || AppliedFilter->Size() == OriginalCount)("original", OriginalCount)("af_count", AppliedFilter->Size()); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "not_skip_data") - ("original_count", OriginalCount)("filtered_count", batch->num_rows())("use_filter", UseFilter) - ("filter_columns", FilterColumns->GetColumnIds().size())("af_count", AppliedFilter->Size())("ef_count", EarlyFilter ? EarlyFilter->Size() : 0); - - FilteredBatch = NArrow::ToBatch(fullTable, true); - return true; -} - -bool TAssembleFilter::DoApply(IDataReader& /*owner*/) const { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "apply"); - Source->InitFilterStageData(AppliedFilter, EarlyFilter, FilteredBatch, Source); - return true; -} - -} diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.h deleted file mode 100644 index 12492a6d8a41..000000000000 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.h +++ /dev/null @@ -1,76 +0,0 @@ -#pragma once -#include -#include -#include -#include -#include -#include "source.h" -#include "columns_set.h" - -namespace NKikimr::NOlap::NPlainReader { - -class TAssemblerCommon: public NColumnShard::IDataTasksProcessor::ITask { -private: - using TBase = NColumnShard::IDataTasksProcessor::ITask; -protected: - const std::shared_ptr Context; - const std::shared_ptr Source; - const std::shared_ptr PortionInfo; - const TReadMetadata::TConstPtr ReadMetadata; - THashMap Blobs; - - TPortionInfo::TPreparedBatchData BuildBatchConstructor(const ISnapshotSchema& resultSchema) { - auto blobSchema = ReadMetadata->GetLoadSchema(PortionInfo->GetMinSnapshot()); - return PortionInfo->PrepareForAssemble(*blobSchema, resultSchema, Blobs); - } - -public: - TAssemblerCommon(const std::shared_ptr& context, const std::shared_ptr& portionInfo, - const std::shared_ptr& source, const THashMap& blobs) - : TBase(context->GetCommonContext()->GetScanActorId()) - , Context(context) - , Source(source) - , PortionInfo(portionInfo) - , ReadMetadata(Context->GetReadMetadata()) - , Blobs(blobs) - { - AFL_VERIFY(Blobs.size()); - } - -}; - -class TAssembleFilter: public TAssemblerCommon, public NColumnShard::TMonitoringObjectsCounter { -private: - using TBase = TAssemblerCommon; - - std::shared_ptr FilteredBatch; - std::shared_ptr AppliedFilter; - std::shared_ptr EarlyFilter; - const TSnapshot RecordsMaxSnapshot; - ui32 OriginalCount = 0; - std::shared_ptr FilterColumns; - const bool UseFilter = true; - const NColumnShard::TCounterGuard TaskGuard; -protected: - virtual bool DoApply(IDataReader& owner) const override; - virtual bool DoExecute() override; -public: - - virtual TString GetTaskClassIdentifier() const override { - return "PlainReading::TAssembleFilter"; - } - - TAssembleFilter(const std::shared_ptr& context, const std::shared_ptr& portionInfo, - const std::shared_ptr& source, const std::shared_ptr& filterColumns, const bool useFilter, const THashMap& blobs) - : TBase(context, portionInfo, source, std::move(blobs)) - , RecordsMaxSnapshot(PortionInfo->RecordSnapshotMax()) - , FilterColumns(filterColumns) - , UseFilter(useFilter) - , TaskGuard(Context->GetCommonContext()->GetCounters().GetAssembleTasksGuard()) - { - Y_UNUSED(RecordsMaxSnapshot); - TBase::SetPriority(TBase::EPriority::Normal); - } -}; - -} diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp index 464127ef0287..5b55803cd3c8 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp @@ -45,11 +45,7 @@ class TMergeTask: public NColumnShard::IDataTasksProcessor::ITask { bool EmptyFiltersOnly() const { for (auto&& [_, i] : Sources) { - if (!i->GetFilterStageData().GetBatch() || !i->GetFilterStageData().GetBatch()->num_rows()) { - continue; - } - auto f = i->GetFilterStageData().GetNotAppliedEarlyFilter(); - if (!f || !f->IsTotalDenyFilter()) { + if (!i->IsEmptyData()) { return false; } } @@ -64,7 +60,7 @@ class TMergeTask: public NColumnShard::IDataTasksProcessor::ITask { } virtual bool DoExecute() override { if (MergingContext->IsExclusiveInterval()) { - ResultBatch = Sources.begin()->second->GetBatch(); + ResultBatch = Sources.begin()->second->GetStageData().GetBatch(); if (ResultBatch && ResultBatch->num_rows()) { LastPK = Sources.begin()->second->GetLastPK(); ResultBatch = NArrow::ExtractColumnsValidate(ResultBatch, Context->GetProgramInputColumns()->GetColumnNamesVector()); @@ -85,8 +81,8 @@ class TMergeTask: public NColumnShard::IDataTasksProcessor::ITask { } std::shared_ptr merger = Context->BuildMerger(); for (auto&& [_, i] : Sources) { - if (auto rb = i->GetBatch()) { - merger->AddSource(rb, i->GetFilterStageData().GetNotAppliedEarlyFilter()); + if (auto rb = i->GetStageData().GetBatch()) { + merger->AddSource(rb, i->GetStageData().GetNotAppliedFilter()); } } AFL_VERIFY(merger->GetSourcesCount() <= Sources.size()); @@ -196,7 +192,7 @@ void TFetchingInterval::DoOnAllocationSuccess(const std::shared_ptrDebugString())("start", MergingContext->GetIncludeStart())("finish", MergingContext->GetIncludeFinish())("sources", Sources.size()); for (auto&& [_, i] : Sources) { - i->InitFetchingPlan(Context->GetColumnsFetchingPlan(MergingContext->IsExclusiveInterval()), i); + i->InitFetchingPlan(Context->GetColumnsFetchingPlan(i, MergingContext->IsExclusiveInterval()), i, MergingContext->IsExclusiveInterval()); } OnInitResourcesGuard(guard); } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp index d84609c623d1..0720416cddb2 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp @@ -52,7 +52,7 @@ TPlainReadData::TPlainReadData(const std::shared_ptr& conte stats->IndexPortions = GetReadMetadata()->SelectInfo->PortionsOrderedPK.size(); stats->IndexBatches = GetReadMetadata()->NumIndexedBlobs(); stats->CommittedBatches = GetReadMetadata()->CommittedBlobs.size(); - stats->SchemaColumns = (*SpecialReadContext->GetProgramInputColumns() - *SpecialReadContext->GetSpecColumns()).GetSize(); + stats->SchemaColumns = (*SpecialReadContext->GetProgramInputColumns() - *SpecialReadContext->GetSpecColumns()).GetColumnsCount(); stats->CommittedPortionsBytes = committedPortionsBytes; stats->InsertedPortionsBytes = insertedPortionsBytes; stats->CompactedPortionsBytes = compactedPortionsBytes; @@ -60,10 +60,7 @@ TPlainReadData::TPlainReadData(const std::shared_ptr& conte } std::vector TPlainReadData::DoExtractReadyResults(const int64_t maxRowsInBatch) { - if ((GetContext().GetIsInternalRead() && ReadyResultsCount < maxRowsInBatch) && !Scanner->IsFinished()) { - return {}; - } - auto result = TPartialReadResult::SplitResults(std::move(PartialResults), maxRowsInBatch, GetContext().GetIsInternalRead()); + auto result = TPartialReadResult::SplitResults(std::move(PartialResults), maxRowsInBatch); ui32 count = 0; for (auto&& r: result) { count += r.GetRecordsCount(); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp index b80813b1d80c..7611a4488ff2 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp @@ -131,10 +131,6 @@ bool TScanHead::IsReverse() const { return GetContext().GetReadMetadata()->IsDescSorted(); } -NKikimr::NOlap::NPlainReader::TFetchingPlan TScanHead::GetColumnsFetchingPlan(const bool exclusiveSource) const { - return Context->GetColumnsFetchingPlan(exclusiveSource); -} - void TScanHead::Abort() { for (auto&& i : FetchingIntervals) { i.second->Abort(); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h index 971cbc654b7c..8e071e8f54a5 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h @@ -48,8 +48,6 @@ class TScanHead { ui64 ZeroCount = 0; public: - TFetchingPlan GetColumnsFetchingPlan(const bool exclusiveSource) const; - bool IsReverse() const; void Abort(); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp index 254c6205b109..e6a408933f39 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp @@ -1,66 +1,49 @@ #include "source.h" #include "interval.h" #include "fetched_data.h" -#include "constructor.h" #include "plain_read_data.h" +#include "constructor.h" #include #include #include +#include +#include namespace NKikimr::NOlap::NPlainReader { -void IDataSource::InitFetchStageData(const std::shared_ptr& batchExt) { - auto batch = batchExt; - if (!batch && FetchingPlan->GetFetchingStage()->GetSize()) { - const ui32 numRows = GetFilterStageData().GetBatch() ? GetFilterStageData().GetBatch()->num_rows() : 0; - batch = NArrow::MakeEmptyBatch(FetchingPlan->GetFetchingStage()->GetSchema(), numRows); - } - if (batch) { - Y_ABORT_UNLESS((ui32)batch->num_columns() == FetchingPlan->GetFetchingStage()->GetSize()); - } - NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFetchStageData")); - Y_ABORT_UNLESS(!FetchStageData); - FetchStageData = std::make_shared(batch); - for (auto&& i : Intervals) { - i.second->OnSourceFetchStageReady(GetSourceIdx()); - } - Intervals.clear(); -} - -void IDataSource::InitFilterStageData(const std::shared_ptr& appliedFilter, - const std::shared_ptr& earlyFilter, const std::shared_ptr& batch, const std::shared_ptr& sourcePtr) { - if (IsAborted()) { - return; - } - NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFilterStageData")); - Y_ABORT_UNLESS(!FilterStageData); - FilterStageData = std::make_shared(appliedFilter, earlyFilter, batch); - if (batch) { - AFL_VERIFY((ui32)batch->num_columns() == FetchingPlan->GetFilterStage()->GetSize())("batch", batch->schema()->ToString())("filter", FetchingPlan->GetFilterStage()->DebugString()); - } - DoStartFetchStage(sourcePtr); -} - -void IDataSource::InitFetchingPlan(const TFetchingPlan& fetchingPlan, const std::shared_ptr& sourcePtr) { +void IDataSource::InitFetchingPlan(const std::shared_ptr& fetchingFirstStep, const std::shared_ptr& sourcePtr, const bool isExclusive) { + AFL_VERIFY(fetchingFirstStep); if (AtomicCas(&FilterStageFlag, 1, 0)) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("InitFetchingPlan", fetchingPlan.DebugString()); - Y_ABORT_UNLESS(!FetchingPlan); - FetchingPlan = fetchingPlan; + StageData = std::make_shared(isExclusive); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("InitFetchingPlan", fetchingFirstStep->DebugString())("source_idx", SourceIdx); NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFetchingPlan")); if (IsAborted()) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "InitFetchingPlanAborted"); return; } - DoStartFilterStage(sourcePtr); + if (fetchingFirstStep->ExecuteInplace(sourcePtr, fetchingFirstStep)) { + auto task = std::make_shared(sourcePtr, fetchingFirstStep->GetNextStep(), Context->GetCommonContext()->GetScanActorId()); + NConveyor::TScanServiceOperator::SendTaskToExecute(task); + } } } void IDataSource::RegisterInterval(TFetchingInterval& interval) { - if (!FetchStageData) { + if (!IsReadyFlag) { AFL_VERIFY(Intervals.emplace(interval.GetIntervalIdx(), &interval).second); } } +void IDataSource::SetIsReady() { + AFL_VERIFY(!IsReadyFlag); + IsReadyFlag = true; + for (auto&& i : Intervals) { + i.second->OnSourceFetchStageReady(SourceIdx); + } + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "source_ready")("intervals_count", Intervals.size())("source_idx", SourceIdx); + Intervals.clear(); +} + void TPortionDataSource::NeedFetchColumns(const std::set& columnIds, const std::shared_ptr& readingAction, THashMap& nullBlocks, const std::shared_ptr& filter) { @@ -90,65 +73,59 @@ void TPortionDataSource::NeedFetchColumns(const std::set& columnIds, AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "chunks_stats")("fetch", fetchedChunks)("null", nullChunks)("reading_action", readingAction->GetStorageId())("columns", columnIds.size()); } -void TPortionDataSource::DoStartFilterStage(const std::shared_ptr& sourcePtr) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "DoFetchEF"); - Y_ABORT_UNLESS(FetchingPlan->GetFilterStage()->GetSize()); - auto& columnIds = FetchingPlan->GetFilterStage()->GetColumnIds(); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "StartFilterStage")("fetching_info", FetchingPlan->DebugString()); +bool TPortionDataSource::DoStartFetchingColumns(const std::shared_ptr& sourcePtr, + const std::shared_ptr& step, const std::shared_ptr& columns) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", step->GetName()); + Y_ABORT_UNLESS(columns->GetColumnsCount()); + auto& columnIds = columns->GetColumnIds(); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", step->GetName())("fetching_info", step->DebugString()); - auto readAction = Portion->GetBlobsStorage()->StartReadingAction("CS::READ::FILTER"); + auto readAction = Portion->GetBlobsStorage()->StartReadingAction("CS::READ::" + step->GetName()); readAction->SetIsBackgroundProcess(false); - THashMap nullBlocks; - NeedFetchColumns(columnIds, readAction, nullBlocks, nullptr); + { + THashMap nullBlocks; + NeedFetchColumns(columnIds, readAction, nullBlocks, StageData->GetAppliedFilter()); + StageData->AddNulls(std::move(nullBlocks)); + } std::vector> actions = {readAction}; - auto constructor = std::make_shared(GetContext(), actions, std::move(nullBlocks), FetchingPlan->GetFilterStage(), *this, sourcePtr, FetchingPlan->CanUseEarlyFilterImmediately(), "ReaderFilter"); -// NActors::TActivationContext::AsActorContext().Send(GetContext()->GetCommonContext()->GetReadCoordinatorActorId(), new NOlap::NBlobOperations::NRead::TEvStartReadTask(constructor)); + auto constructor = std::make_shared(actions, sourcePtr, step, GetContext(), "CS::READ::" + step->GetName(), ""); NActors::TActivationContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(constructor)); + return true; } -void TPortionDataSource::DoStartFetchStage(const std::shared_ptr& sourcePtr) { - Y_ABORT_UNLESS(!FetchStageData); - Y_ABORT_UNLESS(FilterStageData); - if (FetchingPlan->GetFetchingStage()->GetSize() && !FilterStageData->IsEmptyFilter()) { - auto& columnIds = FetchingPlan->GetFetchingStage()->GetColumnIds(); +void TPortionDataSource::DoAbort() { +} - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "RealStartFetchStage")("fetching_info", FetchingPlan->DebugString()); - auto readAction = Portion->GetBlobsStorage()->StartReadingAction("CS::READ::FETCHING"); - readAction->SetIsBackgroundProcess(false); - THashMap nullBlocks; - NeedFetchColumns(columnIds, readAction, nullBlocks, GetFilterStageData().GetAppliedFilter()); - if (readAction->GetExpectedBlobsCount()) { - std::vector> actions = {readAction}; - auto constructor = std::make_shared(GetContext(), actions, std::move(nullBlocks), FetchingPlan->GetFetchingStage(), *this, sourcePtr, "ReaderFetcher"); - NActors::TActivationContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(constructor)); - return; - } - } else { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "DontStartFetchStage")("fetching_info", FetchingPlan->DebugString()); +bool TCommittedDataSource::DoStartFetchingColumns(const std::shared_ptr& sourcePtr, const std::shared_ptr& step, const std::shared_ptr& /*columns*/) { + if (ReadStarted) { + return false; } - InitFetchStageData(nullptr); -} + ReadStarted = true; + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", step->GetName())("fetching_info", step->DebugString()); -void TPortionDataSource::DoAbort() { -} + std::shared_ptr storageOperator = GetContext()->GetCommonContext()->GetStoragesManager()->GetInsertOperator(); + auto readAction = storageOperator->StartReadingAction("CS::READ::" + step->GetName()); -void TCommittedDataSource::DoFetch(const std::shared_ptr& sourcePtr) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "DoFetch"); - if (!ReadStarted) { - Y_ABORT_UNLESS(!ResultReady); - ReadStarted = true; + readAction->SetIsBackgroundProcess(false); + readAction->AddRange(CommittedBlob.GetBlobRange()); - std::shared_ptr storageOperator = GetContext()->GetCommonContext()->GetStoragesManager()->GetInsertOperator(); - auto readAction = storageOperator->StartReadingAction("CS::READ::COMMITTED"); - readAction->SetIsBackgroundProcess(false); - readAction->AddRange(CommittedBlob.GetBlobRange()); + std::vector> actions = {readAction}; + auto constructor = std::make_shared(actions, sourcePtr, step, GetContext(), "CS::READ::" + step->GetName(), ""); + NActors::TActivationContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(constructor)); + return true; +} - THashMap nullBlocks; - std::vector> actions = {readAction}; - auto constructor = std::make_shared(GetContext(), actions, std::move(nullBlocks), *this, sourcePtr, "ReaderCommitted"); - NActors::TActivationContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(constructor)); +void TCommittedDataSource::DoAssembleColumns(const std::shared_ptr& columns) { + if (!GetStageData().GetTable()) { + Y_ABORT_UNLESS(GetStageData().GetBlobs().size() == 1); + auto bData = MutableStageData().ExtractBlob(GetStageData().GetBlobs().begin()->first); + auto batch = NArrow::DeserializeBatch(bData, GetContext()->GetReadMetadata()->GetBlobSchema(CommittedBlob.GetSchemaVersion())); + Y_ABORT_UNLESS(batch); + batch = GetContext()->GetReadMetadata()->GetIndexInfo().AddSpecialColumns(batch, CommittedBlob.GetSnapshot()); + MutableStageData().AddBatch(batch); } + MutableStageData().SyncTableColumns(columns->GetSchema()->fields()); } } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h index 693a4353bcaf..34f6a1fe0412 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h @@ -7,6 +7,7 @@ #include #include #include +#include #include namespace NKikimr::NOlap { @@ -18,6 +19,7 @@ namespace NKikimr::NOlap::NPlainReader { class TFetchingInterval; class TPlainReadData; class IFetchTaskConstructor; +class IFetchingStep; class IDataSource { private: @@ -25,6 +27,8 @@ class IDataSource { YDB_READONLY_DEF(NIndexedReader::TSortableBatchPosition, Start); YDB_READONLY_DEF(NIndexedReader::TSortableBatchPosition, Finish); YDB_READONLY_DEF(std::shared_ptr, Context); + YDB_READONLY(TSnapshot, RecordSnapshotMax, TSnapshot::Zero()); + std::optional RecordsCount; YDB_READONLY(ui32, IntervalsCount, 0); virtual NJson::TJsonValue DoDebugJson() const = 0; bool MergingStartedFlag = false; @@ -32,21 +36,38 @@ class IDataSource { protected: THashMap Intervals; - std::shared_ptr FilterStageData; - std::shared_ptr FetchStageData; - - std::optional FetchingPlan; + std::shared_ptr StageData; TAtomic FilterStageFlag = 0; + bool IsReadyFlag = false; bool IsAborted() const { return AbortedFlag; } - virtual void DoStartFilterStage(const std::shared_ptr& sourcePtr) = 0; - virtual void DoStartFetchStage(const std::shared_ptr& sourcePtr) = 0; + virtual bool DoStartFetchingColumns(const std::shared_ptr& sourcePtr, const std::shared_ptr& step, const std::shared_ptr& columns) = 0; + virtual void DoAssembleColumns(const std::shared_ptr& columns) = 0; virtual void DoAbort() = 0; public: + void SetIsReady(); + + bool IsEmptyData() const { + return GetStageData().IsEmpty(); + } + + void AssembleColumns(const std::shared_ptr& columns) { + if (columns->IsEmpty()) { + return; + } + DoAssembleColumns(columns); + } + + bool StartFetchingColumns(const std::shared_ptr& sourcePtr, const std::shared_ptr& step, const std::shared_ptr& columns) { + AFL_VERIFY(columns); + return DoStartFetchingColumns(sourcePtr, step, columns); + } + void InitFetchingPlan(const std::shared_ptr& fetchingFirstStep, const std::shared_ptr& sourcePtr, const bool isExclusive); + std::shared_ptr GetLastPK() const { return Finish.ExtractSortingPosition(); } @@ -56,11 +77,6 @@ class IDataSource { virtual ui64 GetRawBytes(const std::set& columnIds) const = 0; - const TFetchingPlan& GetFetchingPlan() const { - Y_ABORT_UNLESS(FetchingPlan); - return *FetchingPlan; - } - bool IsMergingStarted() const { return MergingStartedFlag; } @@ -87,35 +103,37 @@ class IDataSource { bool OnIntervalFinished(const ui32 intervalIdx); - std::shared_ptr GetBatch() const { - if (!FilterStageData || !FetchStageData) { - return nullptr; - } - return NArrow::MergeColumns({FilterStageData->GetBatch(), FetchStageData->GetBatch()}); - } - bool IsDataReady() const { - return !!FilterStageData && !!FetchStageData; + return IsReadyFlag; } - const TFilterStageData& GetFilterStageData() const { - Y_ABORT_UNLESS(FilterStageData); - return *FilterStageData; + const TFetchedData& GetStageData() const { + AFL_VERIFY(StageData); + return *StageData; } - void InitFetchingPlan(const TFetchingPlan& fetchingPlan, const std::shared_ptr& sourcePtr); + TFetchedData& MutableStageData() { + AFL_VERIFY(StageData); + return *StageData; + } - void InitFilterStageData(const std::shared_ptr& appliedFilter, const std::shared_ptr& earlyFilter, const std::shared_ptr& batch - , const std::shared_ptr& sourcePtr); - void InitFetchStageData(const std::shared_ptr& batch); + ui32 GetRecordsCount() const { + AFL_VERIFY(RecordsCount); + return *RecordsCount; + } void RegisterInterval(TFetchingInterval& interval); - IDataSource(const ui32 sourceIdx, const std::shared_ptr& context, const NIndexedReader::TSortableBatchPosition& start, const NIndexedReader::TSortableBatchPosition& finish) + IDataSource(const ui32 sourceIdx, const std::shared_ptr& context, + const NIndexedReader::TSortableBatchPosition& start, const NIndexedReader::TSortableBatchPosition& finish, + const TSnapshot& recordSnapshotMax, const std::optional recordsCount + ) : SourceIdx(sourceIdx) , Start(start) , Finish(finish) , Context(context) + , RecordSnapshotMax(recordSnapshotMax) + , RecordsCount(recordsCount) { if (Start.IsReverseSort()) { std::swap(Start, Finish); @@ -137,8 +155,11 @@ class TPortionDataSource: public IDataSource { const std::shared_ptr& readingAction, THashMap& nullBlocks, const std::shared_ptr& filter); - virtual void DoStartFilterStage(const std::shared_ptr& sourcePtr) override; - virtual void DoStartFetchStage(const std::shared_ptr& sourcePtr) override; + virtual bool DoStartFetchingColumns(const std::shared_ptr& sourcePtr, const std::shared_ptr& step, const std::shared_ptr& columns) override; + virtual void DoAssembleColumns(const std::shared_ptr& columns) override { + auto blobSchema = GetContext()->GetReadMetadata()->GetLoadSchema(Portion->GetMinSnapshot()); + MutableStageData().AddBatch(Portion->PrepareForAssemble(*blobSchema, columns->GetFilteredSchemaVerified(), MutableStageData().MutableBlobs()).AssembleTable()); + } virtual NJson::TJsonValue DoDebugJson() const override { NJson::TJsonValue result = NJson::JSON_MAP; result.InsertValue("type", "portion"); @@ -162,7 +183,7 @@ class TPortionDataSource: public IDataSource { TPortionDataSource(const ui32 sourceIdx, const std::shared_ptr& portion, const std::shared_ptr& context, const NIndexedReader::TSortableBatchPosition& start, const NIndexedReader::TSortableBatchPosition& finish) - : TBase(sourceIdx, context, start, finish) + : TBase(sourceIdx, context, start, finish, portion->RecordSnapshotMax(), portion->GetRecordsCount()) , Portion(portion) { } @@ -173,20 +194,14 @@ class TCommittedDataSource: public IDataSource { using TBase = IDataSource; TCommittedBlob CommittedBlob; bool ReadStarted = false; - bool ResultReady = false; - void DoFetch(const std::shared_ptr& sourcePtr); virtual void DoAbort() override { } - virtual void DoStartFilterStage(const std::shared_ptr& sourcePtr) override { - DoFetch(sourcePtr); - } - - virtual void DoStartFetchStage(const std::shared_ptr& sourcePtr) override { - DoFetch(sourcePtr); - } + virtual bool DoStartFetchingColumns(const std::shared_ptr& sourcePtr, + const std::shared_ptr& step, const std::shared_ptr& columns) override; + virtual void DoAssembleColumns(const std::shared_ptr& columns) override; virtual NJson::TJsonValue DoDebugJson() const override { NJson::TJsonValue result = NJson::JSON_MAP; result.InsertValue("type", "commit"); @@ -204,7 +219,7 @@ class TCommittedDataSource: public IDataSource { TCommittedDataSource(const ui32 sourceIdx, const TCommittedBlob& committed, const std::shared_ptr& context, const NIndexedReader::TSortableBatchPosition& start, const NIndexedReader::TSortableBatchPosition& finish) - : TBase(sourceIdx, context, start, finish) + : TBase(sourceIdx, context, start, finish, committed.GetSnapshot(), {}) , CommittedBlob(committed) { } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/ya.make b/ydb/core/tx/columnshard/engines/reader/plain_reader/ya.make index c9224b8d780c..4a4db941aa67 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/ya.make +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/ya.make @@ -2,16 +2,14 @@ LIBRARY() SRCS( scanner.cpp - source.cpp constructor.cpp + source.cpp interval.cpp fetched_data.cpp plain_read_data.cpp - filter_assembler.cpp - column_assembler.cpp - committed_assembler.cpp columns_set.cpp context.cpp + fetching.cpp ) PEERDIR( diff --git a/ydb/core/tx/columnshard/engines/reader/read_context.h b/ydb/core/tx/columnshard/engines/reader/read_context.h index 8164f28f82bd..bc5d5fa1ba86 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_context.h +++ b/ydb/core/tx/columnshard/engines/reader/read_context.h @@ -57,7 +57,6 @@ class TReadContext { private: YDB_READONLY_DEF(std::shared_ptr, StoragesManager); const NColumnShard::TConcreteScanCounters Counters; - YDB_READONLY(bool, IsInternalRead, false); TReadMetadataBase::TConstPtr ReadMetadata; NResourceBroker::NSubscribe::TTaskContext ResourcesTaskContext; const TActorId ScanActorId; @@ -97,11 +96,10 @@ class TReadContext { return ResourcesTaskContext; } - TReadContext(const std::shared_ptr& storagesManager, const NColumnShard::TConcreteScanCounters& counters, const bool isInternalRead, const TReadMetadataBase::TConstPtr& readMetadata, + TReadContext(const std::shared_ptr& storagesManager, const NColumnShard::TConcreteScanCounters& counters, const TReadMetadataBase::TConstPtr& readMetadata, const TActorId& scanActorId, const TActorId& resourceSubscribeActorId, const TActorId& readCoordinatorActorId, const NOlap::TComputeShardingPolicy& computeShardingPolicy) : StoragesManager(storagesManager) , Counters(counters) - , IsInternalRead(isInternalRead) , ReadMetadata(readMetadata) , ResourcesTaskContext("CS::SCAN_READ", counters.ResourcesSubscriberCounters) , ScanActorId(scanActorId) @@ -165,8 +163,6 @@ class IDataReader { TString DebugString(const bool verbose) const { TStringBuilder sb; - sb << "internal:" << Context->GetIsInternalRead() << ";" - ; sb << DoDebugString(verbose); return sb; } diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp index 729a50ae7a2c..988d6553b327 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp +++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp @@ -56,13 +56,6 @@ std::set TReadMetadata::GetEarlyFilterColumnIds() const { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i); } } - if (Snapshot.GetPlanStep()) { - auto snapSchema = TIndexInfo::ArrowSchemaSnapshot(); - for (auto&& i : snapSchema->fields()) { - result.emplace(indexInfo.GetColumnId(i->name())); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i->name()); - } - } return result; } @@ -87,7 +80,6 @@ void TReadStats::PrintToLog() { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN) ("event", "statistic") ("begin", BeginTimestamp) - ("selected", SelectedIndex) ("index_granules", IndexGranules) ("index_portions", IndexPortions) ("index_batches", IndexBatches) diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/read_metadata.h index 4d322c3716c0..4b0580a6bbcd 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_metadata.h +++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.h @@ -25,7 +25,6 @@ class TReadContext; struct TReadStats { TInstant BeginTimestamp; - ui32 SelectedIndex{0}; ui64 IndexGranules{0}; ui64 IndexPortions{0}; ui64 IndexBatches{0}; @@ -42,9 +41,8 @@ struct TReadStats { ui32 SelectedRows = 0; - TReadStats(ui32 indexNo = 0) + TReadStats() : BeginTimestamp(TInstant::Now()) - , SelectedIndex(indexNo) {} void PrintToLog(); @@ -166,7 +164,7 @@ struct TReadMetadata : public TReadMetadataBase, public std::enable_shared_from_ , IndexVersions(info) , Snapshot(snapshot) , ResultIndexSchema(info.GetSchema(Snapshot)) - , ReadStats(std::make_shared(info.GetLastSchema()->GetIndexInfo().GetId())) + , ReadStats(std::make_shared()) { } @@ -271,7 +269,7 @@ struct TReadStatsMetadata : public TReadMetadataBase, public std::enable_shared_ if (!ResultIndexSchema) { return {}; } - auto f = ResultIndexSchema->GetFieldByColumnId(columnId); + auto f = ResultIndexSchema->GetFieldByColumnIdOptional(columnId); if (!f) { return {}; } diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp index 13dc7a35f1e9..a80496460e90 100644 --- a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp @@ -12,7 +12,7 @@ std::shared_ptr ISnapshotSchema::GetFieldByIndex(const int index) } return schema->field(index); } -std::shared_ptr ISnapshotSchema::GetFieldByColumnId(const ui32 columnId) const { +std::shared_ptr ISnapshotSchema::GetFieldByColumnIdOptional(const ui32 columnId) const { return GetFieldByIndex(GetFieldIndex(columnId)); } @@ -111,8 +111,8 @@ ui32 ISnapshotSchema::GetColumnId(const std::string& columnName) const { } std::shared_ptr ISnapshotSchema::GetFieldByColumnIdVerified(const ui32 columnId) const { - auto result = GetFieldByColumnId(columnId); - AFL_VERIFY(result)("event", "unknown_column")("column_id", columnId); + auto result = GetFieldByColumnIdOptional(columnId); + AFL_VERIFY(result)("event", "unknown_column")("column_id", columnId)("schema", DebugString()); return result; } diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h index 9232edcf1dfe..b063ba7d02ef 100644 --- a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h +++ b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h @@ -51,7 +51,7 @@ class ISnapshotSchema { ui32 GetColumnId(const std::string& columnName) const; std::shared_ptr GetFieldByIndex(const int index) const; - std::shared_ptr GetFieldByColumnId(const ui32 columnId) const; + std::shared_ptr GetFieldByColumnIdOptional(const ui32 columnId) const; std::shared_ptr GetFieldByColumnIdVerified(const ui32 columnId) const; TString DebugString() const { diff --git a/ydb/core/tx/columnshard/engines/storage/granule.h b/ydb/core/tx/columnshard/engines/storage/granule.h index 1e72a2e7ea56..b7423485094d 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule.h +++ b/ydb/core/tx/columnshard/engines/storage/granule.h @@ -206,8 +206,7 @@ class TGranuleMeta: TNonCopyable { std::shared_ptr BuildSerializationStats(ISnapshotSchema::TPtr schema) const { auto result = std::make_shared(); for (auto&& i : GetAdditiveSummary().GetCompacted().GetColumnStats()) { - auto field = schema->GetFieldByColumnId(i.first); - AFL_VERIFY(field)("column_id", i.first)("schema", schema->DebugString()); + auto field = schema->GetFieldByColumnIdVerified(i.first); NOlap::TColumnSerializationStat columnInfo(i.first, field->name()); columnInfo.Merge(i.second); result->AddStat(columnInfo); diff --git a/ydb/core/tx/columnshard/splitter/batch_slice.h b/ydb/core/tx/columnshard/splitter/batch_slice.h index 5edc9c18d119..feda743b2684 100644 --- a/ydb/core/tx/columnshard/splitter/batch_slice.h +++ b/ydb/core/tx/columnshard/splitter/batch_slice.h @@ -63,7 +63,7 @@ class TDefaultSchemaDetails: public ISchemaDetailInfo { AFL_VERIFY(Stats); } virtual std::shared_ptr GetField(const ui32 columnId) const override { - return Schema->GetFieldByColumnId(columnId); + return Schema->GetFieldByColumnIdOptional(columnId); } virtual bool NeedMinMaxForColumn(const ui32 columnId) const override { return Schema->GetIndexInfo().GetMinMaxIdxColumns().contains(columnId); diff --git a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp index dfc554c4ad12..d08e22258981 100644 --- a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp +++ b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp @@ -685,6 +685,7 @@ std::vector> TestTiers(bool reboots, const std::vector reader; if (!misconfig) { reader = std::make_unique(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep - 1, Max())); + reader->SetReplyColumns({specs[i].TtlColumn}); counter.CaptureReadEvents = specs[i].WaitEmptyAfter ? 0 : 1; // TODO: we need affected by tiering blob here counter.WaitReadsCaptured(runtime); reader->InitializeScanner();