Skip to content

Commit

Permalink
Add SelectivityVector to DecodedVector::nulls() (#8579)
Browse files Browse the repository at this point in the history
Summary:
If DecodedVector decodes a partial set of rows and makes a local materialization of indices, nulls() without SelectivityVector will access all elements in indices, which will hit random memory locations. Therefore, pass a SelectivityVector to nulls(). Require that if decode() is for partial rows, then nulls() is also for partial rows. There The rows for null() must be a subset of the rows for decode(). We check that the nength at nulls() is no greater than that at decode().

Updates call sites of DecodedVector::nulls(). Updates tests.

Pull Request resolved: #8579

Reviewed By: bikramSingh91

Differential Revision: D53158552

Pulled By: oerling

fbshipit-source-id: 7431b5d72252b8cc3d2b69ff4e5c47f2092c5db3
  • Loading branch information
Orri Erling authored and facebook-github-bot committed Feb 22, 2024
1 parent 3bb21d2 commit ffd136f
Show file tree
Hide file tree
Showing 26 changed files with 92 additions and 56 deletions.
5 changes: 3 additions & 2 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,8 @@ void HashBuild::removeInputRowsForAntiJoinFilter() {
changed = true;
// NOTE: the true value of a raw null bit indicates non-null so we AND
// 'rawActiveRows' with the raw bit.
bits::andBits(rawActiveRows, decoded.nulls(), 0, activeRows_.end());
bits::andBits(
rawActiveRows, decoded.nulls(&activeRows_), 0, activeRows_.end());
}
};
for (auto channel : keyFilterChannels_) {
Expand Down Expand Up @@ -358,7 +359,7 @@ void HashBuild::addInput(RowVectorPtr input) {
for (auto& hasher : hashers) {
auto& decoded = hasher->decodedVector();
if (decoded.mayHaveNulls()) {
auto* nulls = decoded.nulls();
auto* nulls = decoded.nulls(&activeRows_);
if (nulls && bits::countNulls(nulls, 0, activeRows_.end()) > 0) {
joinHasNullKeys_ = true;
break;
Expand Down
13 changes: 12 additions & 1 deletion velox/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1009,10 +1009,21 @@ void HashProbe::prepareFilterRowsForNullAwareJoin(
filterInputColumnDecodedVector_.decode(
*filterInput_->childAt(projection.outputChannel), filterInputRows_);
if (filterInputColumnDecodedVector_.mayHaveNulls()) {
SelectivityVector nullsInActiveRows(numRows);
memcpy(
nullsInActiveRows.asMutableRange().bits(),
filterInputColumnDecodedVector_.nulls(&filterInputRows_),
bits::nbytes(numRows));
// All rows that are not active count as non-null here.
bits::orWithNegatedBits(
nullsInActiveRows.asMutableRange().bits(),
filterInputRows_.asRange().bits(),
0,
numRows);
// NOTE: the false value of a raw null bit indicates null so we OR with
// negative of the raw bit.
bits::orWithNegatedBits(
rawNullRows, filterInputColumnDecodedVector_.nulls(), 0, numRows);
rawNullRows, nullsInActiveRows.asRange().bits(), 0, numRows);
}
}
nullFilterInputRows_.updateBounds();
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/OperatorUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ void deselectRowsWithNulls(
auto& decoded = hashers[i]->decodedVector();
if (decoded.mayHaveNulls()) {
anyChange = true;
const auto* nulls = hashers[i]->decodedVector().nulls();
const auto* nulls = hashers[i]->decodedVector().nulls(&rows);
bits::andBits(rows.asMutableRange().bits(), nulls, 0, rows.end());
}
}
Expand Down Expand Up @@ -219,7 +219,7 @@ vector_size_t processEncodedFilterResults(
DecodedVector& decoded = filterEvalCtx.decodedResult;
decoded.decode(*filterResult.get(), rows);
auto values = decoded.data<uint64_t>();
auto nulls = decoded.nulls();
auto nulls = decoded.nulls(&rows);
auto indices = decoded.indices();

vector_size_t passed = 0;
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/PartitionedOutput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ void PartitionedOutput::collectNullRows() {
auto& keyVector = input_->childAt(i);
if (keyVector->mayHaveNulls()) {
decodedVectors_[i].decode(*keyVector, rows_);
if (auto* rawNulls = decodedVectors_[i].nulls()) {
if (auto* rawNulls = decodedVectors_[i].nulls(&rows_)) {
bits::orWithNegatedBits(
nullRows_.asMutableRange().bits(), rawNulls, 0, size);
}
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/VectorHasher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ bool VectorHasher::makeValueIdsFlatWithNulls<bool>(
const SelectivityVector& rows,
uint64_t* result) {
const auto* values = decoded_.data<uint64_t>();
const auto* nulls = decoded_.nulls();
const auto* nulls = decoded_.nulls(&rows);
rows.applyToSelected([&](vector_size_t row) INLINE_LAMBDA {
if (bits::isBitNull(nulls, row)) {
if (multiplier_ == 1) {
Expand Down Expand Up @@ -210,7 +210,7 @@ bool VectorHasher::makeValueIdsFlatWithNulls(
const SelectivityVector& rows,
uint64_t* result) {
const auto* values = decoded_.data<T>();
const auto* nulls = decoded_.nulls();
const auto* nulls = decoded_.nulls(&rows);

bool success = true;
rows.applyToSelected([&](vector_size_t row) INLINE_LAMBDA {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ class GeneratedVectorFunction : public GeneratedVectorFunctionBase {
auto deselectNull = [&](const VectorPtr& arg) {
if (arg->mayHaveNulls()) {
exec::LocalDecodedVector decodedVector(context, *arg, rowsNotNull);
if (auto* rawNulls = decodedVector->nulls()) {
if (auto* rawNulls = decodedVector->nulls(&rowsNotNull)) {
rowsNotNull.deselectNulls(rawNulls, rows.begin(), rows.end());
}
}
Expand Down
2 changes: 1 addition & 1 deletion velox/expression/BooleanMix.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ BooleanMix getFlatBool(
memset(valuesToSet, 0, bits::nbytes(size));
DecodedVector decoded(*vector, activeRows);
auto values = decoded.data<uint64_t>();
auto nulls = decoded.nulls();
auto nulls = decoded.nulls(&activeRows);
auto indices = decoded.indices();
activeRows.applyToSelected([&](int32_t i) {
auto index = indices[i];
Expand Down
2 changes: 1 addition & 1 deletion velox/expression/CastExpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,7 @@ void CastExpr::apply(
context.deselectErrors(*remainingRows);

LocalDecodedVector decoded(context, *input, *remainingRows);
auto* rawNulls = decoded->nulls();
auto* rawNulls = decoded->nulls(remainingRows.get());

if (rawNulls) {
remainingRows->deselectNulls(
Expand Down
2 changes: 1 addition & 1 deletion velox/expression/CoalesceExpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ void CoalesceExpr::evalSpecialForm(
}

decodedVector.get()->decode(*result, *activeRows);
const uint64_t* rawNulls = decodedVector->nulls();
const uint64_t* rawNulls = decodedVector->nulls(activeRows);
if (!rawNulls) {
// No nulls left.
return;
Expand Down
4 changes: 2 additions & 2 deletions velox/expression/Expr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ bool Expr::evalArgsDefaultNulls(
auto& arg = inputValues_[i];
if (arg->mayHaveNulls()) {
decoded.get()->decode(*arg, rows.rows());
flatNulls = decoded.get()->nulls();
flatNulls = decoded.get()->nulls(&rows.rows());
}
// A null with no error deselects the row.
// An error adds itself to argument errors.
Expand Down Expand Up @@ -1111,7 +1111,7 @@ bool Expr::removeSureNulls(

if (values->mayHaveNulls()) {
LocalDecodedVector decoded(context, *values, rows);
if (auto* rawNulls = decoded->nulls()) {
if (auto* rawNulls = decoded->nulls(&rows)) {
if (!result) {
result = nullHolder.get(rows);
}
Expand Down
6 changes: 3 additions & 3 deletions velox/expression/FieldReference.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ void FieldReference::apply(
if (decoded.mayHaveNulls()) {
nonNullRowsHolder.get(rows);
nonNullRowsHolder->deselectNulls(
decoded.nulls(), rows.begin(), rows.end());
decoded.nulls(&rows), rows.begin(), rows.end());
nonNullRows = nonNullRowsHolder.get();
if (!nonNullRows->hasSelections()) {
addNulls(rows, decoded.nulls(), context, result);
addNulls(rows, decoded.nulls(&rows), context, result);
return;
}
}
Expand Down Expand Up @@ -116,7 +116,7 @@ void FieldReference::apply(

// Check for nulls in the input struct. Propagate these nulls to 'result'.
if (!inputs_.empty() && decoded.mayHaveNulls()) {
addNulls(rows, decoded.nulls(), context, result);
addNulls(rows, decoded.nulls(&rows), context, result);
}
}

Expand Down
4 changes: 2 additions & 2 deletions velox/expression/SwitchExpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ void SwitchExpr::evalSpecialForm(
const auto& vector = context.getField(field->index(context));
if (vector->mayHaveNulls()) {
LocalDecodedVector decoded(context, *vector, remaining);
addNulls(remaining, decoded->nulls(), context, localResult);
addNulls(remaining, decoded->nulls(&remaining), context, localResult);
remaining.deselectNulls(
decoded->nulls(), remaining.begin(), remaining.end());
decoded->nulls(&remaining), remaining.begin(), remaining.end());
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion velox/expression/tests/ExpressionFuzzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ BufferPtr extractNonNullIndices(const RowVectorPtr& data) {

for (auto& child : data->children()) {
decoded.decode(*child);
auto* rawNulls = decoded.nulls();
auto* rawNulls = decoded.nulls(nullptr);
if (rawNulls) {
nonNullRows.deselectNulls(rawNulls, 0, data->size());
}
Expand Down
2 changes: 1 addition & 1 deletion velox/expression/tests/ExpressionFuzzerVerifier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ BufferPtr extractNonNullIndices(const RowVectorPtr& data) {

for (auto& child : data->children()) {
decoded.decode(*child);
auto* rawNulls = decoded.nulls();
auto* rawNulls = decoded.nulls(nullptr);
if (rawNulls) {
nonNullRows.deselectNulls(rawNulls, 0, data->size());
}
Expand Down
2 changes: 1 addition & 1 deletion velox/functions/lib/IsNull.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class IsNullFunction : public exec::VectorFunction {
isNull = AlignedBuffer::allocate<bool>(rows.end(), pool);
memcpy(
isNull->asMutable<int64_t>(),
decodedArgs.at(0)->nulls(),
decodedArgs.at(0)->nulls(&rows),
bits::nbytes(rows.end()));

if (!IsNotNULL) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ class ApproxPercentileAggregate : public exec::Aggregate {
innerRows.updateBounds();
} else {
velox::translateToInnerRows(
rows, decoded.indices(), decoded.nulls(), innerRows);
rows, decoded.indices(), decoded.nulls(&rows), innerRows);
}
baseRows = &innerRows;
}
Expand Down
2 changes: 1 addition & 1 deletion velox/functions/prestosql/aggregates/ReduceAgg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ class ReduceAgg : public exec::Aggregate {
SelectivityVector remainingRows = rows;
if (input->mayHaveNulls()) {
DecodedVector decoded(*input, rows);
if (auto* rawNulls = decoded.nulls()) {
if (auto* rawNulls = decoded.nulls(&rows)) {
remainingRows.deselectNulls(rawNulls, rows.begin(), rows.end());
}
}
Expand Down
4 changes: 2 additions & 2 deletions velox/functions/sparksql/Comparisons.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ void applyTyped(
} else {
// (isnull(a) AND isnull(b)) || (a == b)
// When DecodedVector::nulls() is null it means there are no nulls.
auto* rawNulls0 = decodedLhs->nulls();
auto* rawNulls1 = decodedRhs->nulls();
auto* rawNulls0 = decodedLhs->nulls(&rows);
auto* rawNulls1 = decodedRhs->nulls(&rows);
rows.applyToSelected([&](vector_size_t i) {
auto isNull0 = rawNulls0 && bits::isBitNull(rawNulls0, i);
auto isNull1 = rawNulls1 && bits::isBitNull(rawNulls1, i);
Expand Down
2 changes: 1 addition & 1 deletion velox/functions/sparksql/Hash.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ void applyWithType(
if (args[i]->mayHaveNulls()) {
*selectedMinusNulls.get(rows.end()) = rows;
selectedMinusNulls->deselectNulls(
decoded->nulls(), rows.begin(), rows.end());
decoded->nulls(&rows), rows.begin(), rows.end());
selected = selectedMinusNulls.get();
}
switch (args[i]->type()->kind()) {
Expand Down
2 changes: 1 addition & 1 deletion velox/functions/sparksql/LeastGreatest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class LeastGreatestFunction final : public exec::VectorFunction {

// Only compare with non-null elements of each argument
*cmpRows = rows;
if (auto* rawNulls = decodedVectorHolder->nulls()) {
if (auto* rawNulls = decodedVectorHolder->nulls(&rows)) {
cmpRows->deselectNulls(rawNulls, 0, nrows);
}

Expand Down
2 changes: 1 addition & 1 deletion velox/vector/ComplexVector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ void RowVector::copy(
}
}
} else {
auto nulls = decodedSource.nulls();
auto nulls = decodedSource.nulls(nullptr);

if (nulls) {
rows.applyToSelected([&](auto row) {
Expand Down
17 changes: 14 additions & 3 deletions velox/vector/DecodedVector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ void DecodedVector::decode(
const SelectivityVector* rows,
bool loadLazy) {
reset(end(vector.size(), rows));
partialRowsDecoded_ = rows != nullptr;
loadLazy_ = loadLazy;
bool isTopLevelLazyAndLoaded =
vector.isLazy() && vector.asUnchecked<LazyVector>()->isLoaded();
Expand Down Expand Up @@ -413,7 +414,7 @@ VectorPtr DecodedVector::wrap(
std::move(data));
}

const uint64_t* DecodedVector::nulls() {
const uint64_t* DecodedVector::nulls(const SelectivityVector* rows) {
if (allNulls_.has_value()) {
return allNulls_.value();
}
Expand All @@ -433,9 +434,19 @@ const uint64_t* DecodedVector::nulls() {
// Copy base nulls.
copiedNulls_.resize(bits::nwords(size_));
auto* rawCopiedNulls = copiedNulls_.data();
for (auto i = 0; i < size_; ++i) {
bits::setNull(rawCopiedNulls, i, bits::isBitNull(nulls_, indices_[i]));
VELOX_CHECK(
partialRowsDecoded_ == (rows != nullptr),
"DecodedVector::nulls() must be called with the same rows as decode()");
if (rows != nullptr) {
// Partial consistency check: The end may be less than the decode time
// end but not greater.
VELOX_CHECK_LE(rows->end(), size_);
}
auto baseSize = baseVector_->size();
applyToRows(rows, [&](auto i) {
VELOX_DCHECK_LT(indices_[i], baseSize);
bits::setNull(rawCopiedNulls, i, bits::isBitNull(nulls_, indices_[i]));
});
allNulls_ = copiedNulls_.data();
}
}
Expand Down
11 changes: 8 additions & 3 deletions velox/vector/DecodedVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,10 @@ class DecodedVector {
///
/// nulls() ? bits::isBitNull(nulls(), i) : false
///
/// returns the null flag for top-level row 'i' given that 'i' is one of the
/// rows specified for decoding.
const uint64_t* nulls();
/// Only bit positions for decoded rows are valid. If 'rows' were
/// specified with decode(), the same rows have to be specified
/// here.
const uint64_t* nulls(const SelectivityVector* rows = nullptr);

/// Returns true if wrappings may have added nulls.
bool hasExtraNulls() const {
Expand Down Expand Up @@ -404,6 +405,10 @@ class DecodedVector {

bool loadLazy_ = false;

// True if decode() was called with rows != nullptr. If so, nulls() must also
// be called with the same 'rows'.
bool partialRowsDecoded_{true};

// Index of an element of the baseVector_ that points to a constant value of
// complex type. Applies only when isConstantMapping_ is true and baseVector_
// is of complex type (array, map, row).
Expand Down
6 changes: 3 additions & 3 deletions velox/vector/fuzzer/VectorFuzzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -998,15 +998,15 @@ VectorPtr VectorLoaderWrap::makeEncodingPreservedCopy(
[&](auto row) { rawIndices[row] = decodedIndices[row]; });

BufferPtr nulls = nullptr;
if (decoded.nulls() || vectorSize > rows.end()) {
if (decoded.nulls(&rows) || vectorSize > rows.end()) {
// We fill [rows.end(), vectorSize) with nulls then copy nulls for selected
// baseRows.
nulls = allocateNulls(vectorSize, vector_->pool(), bits::kNull);
if (baseRows.hasSelections()) {
if (decoded.nulls()) {
if (decoded.nulls(&rows)) {
std::memcpy(
nulls->asMutable<uint64_t>(),
decoded.nulls(),
decoded.nulls(&rows),
bits::nbytes(rows.end()));
} else {
bits::fillBits(
Expand Down
Loading

0 comments on commit ffd136f

Please sign in to comment.