Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse result vector in Alpha reader #26

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 18 additions & 31 deletions dwio/alpha/velox/FieldReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1432,16 +1432,15 @@ class FlatMapKeyNode {
ALPHA_DCHECK(numValues == vector->size(), "Items not loaded");
}

// Try to load numValues from current position
// return true if 0+ values are read
bool load(uint32_t numValues) {
auto numItems = readInMapData(numValues);
if (numItems > 0) {
valueReader_->next(numItems, valueVector_, /* scatterBitmap */ nullptr);
ALPHA_DCHECK(numItems == valueVector_->size(), "Items not loaded");
return true;
}
return false;
uint32_t readInMapData(uint32_t numValues) {
inMapData_.resize(numValues);
numValues_ = readBooleanValues(inMapDecoder_, inMapData_.data(), numValues);
return numValues_;
}

void loadValues(velox::VectorPtr& values) {
valueReader_->next(numValues_, values, /* scatterBitmap */ nullptr);
ALPHA_DCHECK(numValues_ == values->size(), "Items not loaded");
}

void skip(uint32_t numValues) {
Expand All @@ -1459,21 +1458,12 @@ class FlatMapKeyNode {
return inMapData_[index];
}

const velox::VectorPtr& values() const {
return valueVector_;
}

void reset() {
inMapDecoder_->reset();
valueReader_->reset();
}

private:
uint32_t readInMapData(uint32_t numValues) {
inMapData_.resize(numValues);
return readBooleanValues(inMapDecoder_, inMapData_.data(), numValues);
}

// Merge the mapNulls and inMapData into mergedNulls
uint32_t mergeNulls(
uint32_t numValues,
Expand Down Expand Up @@ -1506,7 +1496,7 @@ class FlatMapKeyNode {
Decoder* inMapDecoder_;
Vector<bool> inMapData_;
const KeyValue<T>& key_;
velox::VectorPtr valueVector_;
uint32_t numValues_;
// nulls buffer used in parallel read cases.
Vector<char> mergedNulls_;
};
Expand Down Expand Up @@ -1818,24 +1808,20 @@ class MergedFlatMapFieldReader final
uint32_t nonNullCount = this->loadNulls(rowCount, vector);

nodes_.clear();
std::vector<const velox::BaseVector*> nodeValues;
size_t totalChildren = 0;
for (auto& node : this->keyNodes_) {
auto hasValues = node->load(nonNullCount);
if (hasValues) {
auto numValues = node->readInMapData(nonNullCount);
if (numValues > 0) {
nodes_.push_back(node.get());
const auto& nodeValueVector = node->values();
nodeValues.push_back(nodeValueVector.get());
totalChildren += nodeValueVector->size();
totalChildren += numValues;
}
}

auto& mapValueType = this->type_->asMap().valueType();
velox::VectorPtr valuesVector;
velox::VectorPtr nodeValues;
velox::VectorPtr& valuesVector = vector->mapValues();
if (totalChildren > 0) {
keysVector->resize(totalChildren, false);
initializeVector(valuesVector, mapValueType, this->pool_, nodeValues);
valuesVector->resize(totalChildren, false);
velox::BaseVector::prepareForReuse(valuesVector, totalChildren);
}

auto* offsetsPtr = offsets->asMutable<velox::vector_size_t>();
Expand All @@ -1861,7 +1847,8 @@ class MergedFlatMapFieldReader final
flatKeysVector->set(offsetsPtr[i], nodes_[j]->key().get());
++offsetsPtr[i];
}
valuesVector->copyRanges(nodeValues[j], copyRanges_);
nodes_[j]->loadValues(nodeValues);
valuesVector->copyRanges(nodeValues.get(), copyRanges_);
}
if (rowCount > 0) {
ALPHA_ASSERT(
Expand Down