Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions ydb/core/formats/arrow/arrow_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ bool ApplyImpl(const TColumnFilter& filter, std::shared_ptr<TData>& batch, const
}
if (filter.IsTotalDenyFilter()) {
batch = batch->Slice(0, 0);
return false;
return true;
}
if (filter.IsTotalAllowFilter()) {
return true;
Expand All @@ -343,11 +343,11 @@ bool ApplyImpl(const TColumnFilter& filter, std::shared_ptr<TData>& batch, const
return false;
}

bool TColumnFilter::Apply(std::shared_ptr<arrow::Table>& batch, const std::optional<ui32> startPos, const std::optional<ui32> count) {
bool TColumnFilter::Apply(std::shared_ptr<arrow::Table>& batch, const std::optional<ui32> startPos, const std::optional<ui32> count) const {
return ApplyImpl<arrow::Datum::TABLE>(*this, batch, startPos, count);
}

bool TColumnFilter::Apply(std::shared_ptr<arrow::RecordBatch>& batch, const std::optional<ui32> startPos, const std::optional<ui32> count) {
bool TColumnFilter::Apply(std::shared_ptr<arrow::RecordBatch>& batch, const std::optional<ui32> startPos, const std::optional<ui32> count) const {
return ApplyImpl<arrow::Datum::RECORD_BATCH>(*this, batch, startPos, count);
}

Expand Down
4 changes: 2 additions & 2 deletions ydb/core/formats/arrow/arrow_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ class TColumnFilter {
// It makes a filter using composite predicate
static TColumnFilter MakePredicateFilter(const arrow::Datum& datum, const arrow::Datum& border, ECompareType compareType);

bool Apply(std::shared_ptr<arrow::Table>& batch, const std::optional<ui32> startPos = {}, const std::optional<ui32> count = {});
bool Apply(std::shared_ptr<arrow::RecordBatch>& batch, const std::optional<ui32> startPos = {}, const std::optional<ui32> count = {});
bool Apply(std::shared_ptr<arrow::Table>& batch, const std::optional<ui32> startPos = {}, const std::optional<ui32> count = {}) const;
bool Apply(std::shared_ptr<arrow::RecordBatch>& batch, const std::optional<ui32> startPos = {}, const std::optional<ui32> count = {}) const;
void Apply(const ui32 expectedRecordsCount, std::vector<arrow::Datum*>& datums) const;

// Combines filters by 'and' operator (extFilter count is true positions count in self, thought extFitler patch exactly that positions)
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/columnshard__read_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ bool TTxReadBase::ParseProgram(NKikimrSchemeOp::EOlapProgramType programType,
AFL_VERIFY(namesChecker.emplace(names.back()).second);
}
NOlap::TProgramContainer container;
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "overriden_columns")("columns", JoinSeq(",", names));
container.OverrideProcessingColumns(std::vector<TString>(names.begin(), names.end()));
read.SetProgram(std::move(container));
return true;
Expand Down
26 changes: 7 additions & 19 deletions ydb/core/tx/columnshard/columnshard__scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class TColumnShardScan : public TActorBootstrapped<TColumnShardScan>, NArrow::IR
ResourceSubscribeActorId = ctx.Register(new NOlap::NResourceBroker::NSubscribe::TActor(TabletId, SelfId()));
ReadCoordinatorActorId = ctx.Register(new NOlap::NBlobOperations::NRead::TReadCoordinatorActor(TabletId, SelfId()));

std::shared_ptr<NOlap::TReadContext> context = std::make_shared<NOlap::TReadContext>(StoragesManager, ScanCountersPool, false,
std::shared_ptr<NOlap::TReadContext> context = std::make_shared<NOlap::TReadContext>(StoragesManager, ScanCountersPool,
ReadMetadataRanges[ReadMetadataIndex], SelfId(), ResourceSubscribeActorId, ReadCoordinatorActorId, ComputeShardingPolicy);
ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan(context);

Expand Down Expand Up @@ -367,7 +367,7 @@ class TColumnShardScan : public TActorBootstrapped<TColumnShardScan>, NArrow::IR
return Finish();
}

auto context = std::make_shared<NOlap::TReadContext>(StoragesManager, ScanCountersPool, false, ReadMetadataRanges[ReadMetadataIndex], SelfId(),
auto context = std::make_shared<NOlap::TReadContext>(StoragesManager, ScanCountersPool, ReadMetadataRanges[ReadMetadataIndex], SelfId(),
ResourceSubscribeActorId, ReadCoordinatorActorId, ComputeShardingPolicy);
ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan(context);
}
Expand Down Expand Up @@ -921,29 +921,17 @@ class TCurrentBatch {
Results.emplace_back(std::move(res));
}

void FillResult(std::vector<TPartialReadResult>& result, const bool mergePartsToMax) const {
void FillResult(std::vector<TPartialReadResult>& result) const {
if (Results.empty()) {
return;
}
if (mergePartsToMax) {
std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
std::vector<std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>> guards;
for (auto&& i : Results) {
batches.emplace_back(i.GetResultBatchPtrVerified());
guards.insert(guards.end(), i.GetResourcesGuards().begin(), i.GetResourcesGuards().end());
}
auto res = NArrow::CombineBatches(batches);
AFL_VERIFY(res);
result.emplace_back(TPartialReadResult(guards, NArrow::TShardedRecordBatch(res), Results.back().GetLastReadKey()));
} else {
for (auto&& i : Results) {
result.emplace_back(std::move(i));
}
for (auto&& i : Results) {
result.emplace_back(std::move(i));
}
}
};

std::vector<NKikimr::NOlap::TPartialReadResult> TPartialReadResult::SplitResults(std::vector<TPartialReadResult>&& resultsExt, const ui32 maxRecordsInResult, const bool mergePartsToMax) {
std::vector<NKikimr::NOlap::TPartialReadResult> TPartialReadResult::SplitResults(std::vector<TPartialReadResult>&& resultsExt, const ui32 maxRecordsInResult) {
std::vector<TCurrentBatch> resultBatches;
TCurrentBatch currentBatch;
for (auto&& i : resultsExt) {
Expand All @@ -960,7 +948,7 @@ std::vector<NKikimr::NOlap::TPartialReadResult> TPartialReadResult::SplitResults

std::vector<TPartialReadResult> result;
for (auto&& i : resultBatches) {
i.FillResult(result, mergePartsToMax);
i.FillResult(result);
}
return result;
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/columnshard__scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class TPartialReadResult {
return ResultBatch.GetRecordsCount();
}

static std::vector<TPartialReadResult> SplitResults(std::vector<TPartialReadResult>&& resultsExt, const ui32 maxRecordsInResult, const bool mergePartsToMax);
static std::vector<TPartialReadResult> SplitResults(std::vector<TPartialReadResult>&& resultsExt, const ui32 maxRecordsInResult);

const NArrow::TShardedRecordBatch& GetShardedBatch() const {
return ResultBatch;
Expand Down
32 changes: 18 additions & 14 deletions ydb/core/tx/columnshard/engines/portions/portion_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class TPortionInfo {
TSerializationStats GetSerializationStat(const ISnapshotSchema& schema) const {
TSerializationStats result;
for (auto&& i : Records) {
if (schema.GetFieldByColumnId(i.ColumnId)) {
if (schema.GetFieldByColumnIdOptional(i.ColumnId)) {
result.AddStat(i.GetSerializationStat(schema.GetFieldByColumnIdVerified(i.ColumnId)->name()));
}
}
Expand Down Expand Up @@ -151,8 +151,7 @@ class TPortionInfo {
: PathId(pathId)
, Portion(portionId)
, MinSnapshot(minSnapshot)
, BlobsOperator(blobsOperator)
{
, BlobsOperator(blobsOperator) {
}

TString DebugString(const bool withDetails = false) const;
Expand Down Expand Up @@ -399,22 +398,30 @@ class TPortionInfo {
public:
TAssembleBlobInfo(const ui32 rowsCount)
: NullRowsCount(rowsCount) {

AFL_VERIFY(NullRowsCount);
}

TAssembleBlobInfo(const TString& data)
: Data(data) {

AFL_VERIFY(!!Data);
}

ui32 GetNullRowsCount() const noexcept {
return NullRowsCount;
return NullRowsCount;
}

const TString& GetData() const noexcept {
return Data;
}

bool IsBlob() const {
return !NullRowsCount && !!Data;
}

bool IsNull() const {
return NullRowsCount && !Data;
}

std::shared_ptr<arrow::RecordBatch> BuildRecordBatch(const TColumnLoader& loader) const;
};

Expand All @@ -437,8 +444,7 @@ class TPortionInfo {

TPreparedColumn(std::vector<TAssembleBlobInfo>&& blobs, const std::shared_ptr<TColumnLoader>& loader)
: Loader(loader)
, Blobs(std::move(blobs))
{
, Blobs(std::move(blobs)) {
Y_ABORT_UNLESS(Loader);
Y_ABORT_UNLESS(Loader->GetExpectedSchema()->num_fields() == 1);
}
Expand Down Expand Up @@ -505,8 +511,7 @@ class TPortionInfo {
TPreparedBatchData(std::vector<TPreparedColumn>&& columns, std::shared_ptr<arrow::Schema> schema, const size_t rowsCount)
: Columns(std::move(columns))
, Schema(schema)
, RowsCount(rowsCount)
{
, RowsCount(rowsCount) {
}

std::shared_ptr<arrow::RecordBatch> Assemble(const TAssembleOptions& options = {}) const;
Expand All @@ -525,8 +530,7 @@ class TPortionInfo {
: ColumnId(resultLoader->GetColumnId())
, NumRows(numRows)
, DataLoader(dataLoader)
, ResultLoader(resultLoader)
{
, ResultLoader(resultLoader) {
AFL_VERIFY(ResultLoader);
if (DataLoader) {
AFL_VERIFY(ResultLoader->GetColumnId() == DataLoader->GetColumnId());
Expand Down Expand Up @@ -598,8 +602,8 @@ class TPortionInfo {
}

std::shared_ptr<arrow::RecordBatch> AssembleInBatch(const ISnapshotSchema& dataSchema,
const ISnapshotSchema& resultSchema,
THashMap<TBlobRange, TString>& data) const {
const ISnapshotSchema& resultSchema,
THashMap<TBlobRange, TString>& data) const {
auto batch = PrepareForAssemble(dataSchema, resultSchema, data).Assemble();
Y_ABORT_UNLESS(batch->Validate().ok());
return batch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ TString TColumnsSet::DebugString() const {
}

NKikimr::NOlap::NPlainReader::TColumnsSet TColumnsSet::operator-(const TColumnsSet& external) const {
if (external.IsEmpty()) {
return *this;
}
if (IsEmpty()) {
return external;
}
TColumnsSet result = *this;
for (auto&& i : external.ColumnIds) {
result.ColumnIds.erase(i);
Expand All @@ -28,6 +34,12 @@ NKikimr::NOlap::NPlainReader::TColumnsSet TColumnsSet::operator-(const TColumnsS
}

NKikimr::NOlap::NPlainReader::TColumnsSet TColumnsSet::operator+(const TColumnsSet& external) const {
if (external.IsEmpty()) {
return *this;
}
if (IsEmpty()) {
return external;
}
TColumnsSet result = *this;
result.ColumnIds.insert(external.ColumnIds.begin(), external.ColumnIds.end());
auto fields = result.Schema->fields();
Expand All @@ -42,7 +54,7 @@ NKikimr::NOlap::NPlainReader::TColumnsSet TColumnsSet::operator+(const TColumnsS
}

bool TColumnsSet::ColumnsOnly(const std::vector<std::string>& fieldNames) const {
if (fieldNames.size() != GetSize()) {
if (fieldNames.size() != GetColumnsCount()) {
return false;
}
std::set<std::string> fieldNamesSet;
Expand All @@ -64,11 +76,7 @@ void TColumnsSet::Rebuild() {
ColumnNamesVector.emplace_back(i);
ColumnNames.emplace(i);
}
if (ColumnIds.size()) {
FilteredSchema = std::make_shared<TFilteredSnapshotSchema>(FullReadSchema, ColumnIds);
} else {
FilteredSchema = FullReadSchema;
}
FilteredSchema = std::make_shared<TFilteredSnapshotSchema>(FullReadSchema, ColumnIds);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,19 @@ class TColumnsSet {

public:
TColumnsSet() = default;
bool IsEmpty() const {
return ColumnIds.empty();
}

bool operator!() const {
return IsEmpty();
}

const std::vector<TString>& GetColumnNamesVector() const {
return ColumnNamesVector;
}

ui32 GetSize() const {
ui32 GetColumnsCount() const {
return ColumnIds.size();
}

Expand Down Expand Up @@ -96,27 +103,4 @@ class TColumnsSet {
TColumnsSet operator-(const TColumnsSet& external) const;
};

class TFetchingPlan {
private:
YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, FilterStage);
YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, FetchingStage);
bool CanUseEarlyFilterImmediatelyFlag = false;
public:
TFetchingPlan(const std::shared_ptr<TColumnsSet>& filterStage, const std::shared_ptr<TColumnsSet>& fetchingStage, const bool canUseEarlyFilterImmediately)
: FilterStage(filterStage)
, FetchingStage(fetchingStage)
, CanUseEarlyFilterImmediatelyFlag(canUseEarlyFilterImmediately) {

}

TString DebugString() const {
return TStringBuilder() << "{filter=" << (FilterStage ? FilterStage->DebugString() : "NO") << ";fetching=" <<
(FetchingStage ? FetchingStage->DebugString() : "NO") << ";use_filter=" << CanUseEarlyFilterImmediatelyFlag << "}";
}

bool CanUseEarlyFilterImmediately() const {
return CanUseEarlyFilterImmediatelyFlag;
}
};

}

This file was deleted.

This file was deleted.

Loading