Skip to content

Commit

Permalink
Add domain filter support for non-selective reader (#6977)
Browse files Browse the repository at this point in the history
Summary: Pull Request resolved: #6977

Reviewed By: oerling

Differential Revision: D50045199

fbshipit-source-id: d9645841547c25e2382004fd2285c23d284c36ca
  • Loading branch information
Yuhta authored and facebook-github-bot committed Oct 12, 2023
1 parent 42132d9 commit fbc6dcf
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 87 deletions.
42 changes: 8 additions & 34 deletions velox/connectors/hive/HiveDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -412,40 +412,14 @@ HiveDataSource::HiveDataSource(
}

SubfieldFilters filters;
core::TypedExprPtr remainingFilter;
if (hiveTableHandle_->isFilterPushdownEnabled()) {
for (auto& [k, v] : hiveTableHandle_->subfieldFilters()) {
filters.emplace(k.clone(), v->clone());
}
remainingFilter = extractFiltersFromRemainingFilter(
hiveTableHandle_->remainingFilter(),
expressionEvaluator_,
false,
filters);
} else {
for (auto& [field, _] : hiveTableHandle_->subfieldFilters()) {
VELOX_USER_CHECK_EQ(
field.path().size(),
1,
"Unexpected filter on table {}, field {}",
hiveTableHandle_->tableName(),
field.toString());
auto* nestedField = dynamic_cast<const common::Subfield::NestedField*>(
field.path()[0].get());
VELOX_USER_CHECK_NOT_NULL(
nestedField,
"Unexpected filter on table {}, field {}",
hiveTableHandle_->tableName(),
field.toString());
VELOX_USER_CHECK_GT(
partitionKeys_.count(nestedField->name()),
0,
"Unexpected filter on table {}, field {}",
hiveTableHandle_->tableName(),
field.toString());
}
remainingFilter = hiveTableHandle_->remainingFilter();
}
for (auto& [k, v] : hiveTableHandle_->subfieldFilters()) {
filters.emplace(k.clone(), v->clone());
}
auto remainingFilter = extractFiltersFromRemainingFilter(
hiveTableHandle_->remainingFilter(),
expressionEvaluator_,
false,
filters);

std::vector<common::Subfield> remainingFilterSubfields;
if (remainingFilter) {
Expand Down
105 changes: 92 additions & 13 deletions velox/dwio/common/Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,72 @@

namespace facebook::velox::dwio::common {

using namespace velox::common;

namespace {

template <TypeKind kKind>
bool filterSimpleVectorRow(
const BaseVector& vector,
Filter& filter,
vector_size_t index) {
using T = typename TypeTraits<kKind>::NativeType;
auto* simpleVector = vector.asUnchecked<SimpleVector<T>>();
return applyFilter(filter, simpleVector->valueAt(index));
}

bool filterRow(const BaseVector& vector, Filter& filter, vector_size_t index) {
if (vector.isNullAt(index)) {
return filter.testNull();
}
switch (vector.typeKind()) {
case TypeKind::ARRAY:
case TypeKind::MAP:
case TypeKind::ROW:
VELOX_USER_CHECK(
filter.kind() == FilterKind::kIsNull ||
filter.kind() == FilterKind::kIsNotNull,
"Complex type can only take null filter, got {}",
filter.toString());
return filter.testNonNull();
default:
return VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH(
filterSimpleVectorRow, vector.typeKind(), vector, filter, index);
}
}

void applyFilter(
const BaseVector& vector,
const ScanSpec& spec,
uint64_t* result) {
if (spec.filter()) {
bits::forEachSetBit(result, 0, vector.size(), [&](auto i) {
if (!filterRow(vector, *spec.filter(), i)) {
bits::clearBit(result, i);
}
});
}
if (!vector.type()->isRow()) {
// Filter on MAP or ARRAY children are pruning, and won't affect correctness
// of the result.
return;
}
auto& rowType = vector.type()->asRow();
auto* rowVector = vector.as<RowVector>();
// Should not have any lazy from non-selective reader.
VELOX_CHECK_NOT_NULL(rowVector);
for (auto& childSpec : spec.children()) {
auto child =
rowVector->childAt(rowType.getChildIdx(childSpec->fieldName()));
applyFilter(*child, *childSpec, result);
}
}

} // namespace

VectorPtr RowReader::projectColumns(
const VectorPtr& input,
const velox::common::ScanSpec& spec) {
const ScanSpec& spec) {
auto* inputRow = input->as<RowVector>();
VELOX_CHECK_NOT_NULL(inputRow);
auto& inputRowType = input->type()->asRow();
Expand All @@ -31,27 +94,43 @@ VectorPtr RowReader::projectColumns(
std::vector<std::string> names(numColumns);
std::vector<TypePtr> types(numColumns);
std::vector<VectorPtr> children(numColumns);
std::vector<uint64_t> passed(bits::nwords(input->size()), -1);
for (auto& childSpec : spec.children()) {
VectorPtr child;
if (childSpec->isConstant()) {
child = BaseVector::wrapInConstant(
input->size(), 0, childSpec->constantValue());
} else {
child =
inputRow->childAt(inputRowType.getChildIdx(childSpec->fieldName()));
applyFilter(*child, *childSpec, passed.data());
}
if (!childSpec->projectOut()) {
continue;
}
auto i = childSpec->channel();
names[i] = childSpec->fieldName();
if (childSpec->isConstant()) {
children[i] = BaseVector::wrapInConstant(
input->size(), 0, childSpec->constantValue());
} else {
children[i] =
inputRow->childAt(inputRowType.getChildIdx(childSpec->fieldName()));
types[i] = child->type();
children[i] = std::move(child);
}
auto rowType = ROW(std::move(names), std::move(types));
auto size = bits::countBits(passed.data(), 0, input->size());
if (size == 0) {
return RowVector::createEmpty(rowType, input->pool());
}
if (size < input->size()) {
auto indices = allocateIndices(size, input->pool());
auto* rawIndices = indices->asMutable<vector_size_t>();
vector_size_t j = 0;
bits::forEachSetBit(
passed.data(), 0, input->size(), [&](auto i) { rawIndices[j++] = i; });
for (auto& child : children) {
child = BaseVector::wrapInDictionary(
nullptr, indices, size, std::move(child));
}
types[i] = children[i]->type();
}
return std::make_shared<RowVector>(
input->pool(),
ROW(std::move(names), std::move(types)),
nullptr,
input->size(),
std::move(children));
input->pool(), rowType, nullptr, size, std::move(children));
}

} // namespace facebook::velox::dwio::common
2 changes: 2 additions & 0 deletions velox/dwio/common/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ add_executable(
LoggedExceptionTest.cpp
RangeTests.cpp
ReadFileInputStreamTests.cpp
ReaderTest.cpp
RetryTests.cpp
TestBufferedInput.cpp
TypeTests.cpp)
Expand All @@ -34,6 +35,7 @@ target_link_libraries(
velox_dwio_common_test
velox_dwio_common_test_utils
velox_temp_path
velox_vector_test_lib
Boost::regex
velox_link_libs
Folly::folly
Expand Down
82 changes: 82 additions & 0 deletions velox/dwio/common/tests/ReaderTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/dwio/common/Reader.h"
#include "velox/vector/tests/utils/VectorTestBase.h"

#include <gtest/gtest.h>

namespace facebook::velox::dwio::common {
namespace {

using namespace facebook::velox::common;

class ReaderTest : public testing::Test, public test::VectorTestBase {};

TEST_F(ReaderTest, projectColumnsFilterStruct) {
constexpr int kSize = 10;
auto input = makeRowVector({
makeFlatVector<int64_t>(kSize, folly::identity),
makeRowVector({
makeFlatVector<int64_t>(kSize, folly::identity),
}),
});
common::ScanSpec spec("<root>");
spec.addField("c0", 0);
spec.getOrCreateChild(common::Subfield("c1.c0"))
->setFilter(common::createBigintValues({2, 4, 6}, false));
auto actual = RowReader::projectColumns(input, spec);
auto expected = makeRowVector({
makeFlatVector<int64_t>({2, 4, 6}),
});
test::assertEqualVectors(expected, actual);
}

TEST_F(ReaderTest, projectColumnsFilterArray) {
constexpr int kSize = 10;
auto input = makeRowVector({
makeFlatVector<int64_t>(kSize, folly::identity),
makeArrayVector<int64_t>(
kSize,
[](auto) { return 1; },
[](auto i) { return i; },
[](auto i) { return i % 2 != 0; }),
});
common::ScanSpec spec("<root>");
spec.addField("c0", 0);
auto* c1 = spec.getOrCreateChild(common::Subfield("c1"));
{
SCOPED_TRACE("IS NULL");
c1->setFilter(std::make_unique<common::IsNull>());
auto actual = RowReader::projectColumns(input, spec);
auto expected = makeRowVector({
makeFlatVector<int64_t>({1, 3, 5, 7, 9}),
});
test::assertEqualVectors(expected, actual);
}
{
SCOPED_TRACE("IS NOT NULL");
c1->setFilter(std::make_unique<common::IsNotNull>());
auto actual = RowReader::projectColumns(input, spec);
auto expected = makeRowVector({
makeFlatVector<int64_t>({0, 2, 4, 6, 8}),
});
test::assertEqualVectors(expected, actual);
}
}

} // namespace
} // namespace facebook::velox::dwio::common
40 changes: 0 additions & 40 deletions velox/exec/tests/TableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2901,46 +2901,6 @@ TEST_F(TableScanTest, dictionaryMemo) {
#endif
}

TEST_F(TableScanTest, filterPushdownDisabledChecks) {
auto data = makeRowVector({makeFlatVector<int32_t>(10, folly::identity)});
auto rowType = asRowType(data->type());
auto file = TempFilePath::create();
writeToFile(file->path, {data});
ColumnHandleMap assignments = {
{"ds", partitionKey("ds", VARCHAR())},
{"c0", regularColumn("c0", INTEGER())},
};
auto split = HiveConnectorSplitBuilder(file->path)
.partitionKey("ds", "2023-07-12")
.build();

auto tableHandle = makeTableHandle(
SubfieldFiltersBuilder().add("ds", equal("2023-07-12")).build(),
nullptr,
"hive_table",
nullptr,
false);
auto plan = exec::test::PlanBuilder(pool_.get())
.tableScan(rowType, tableHandle, assignments)
.planNode();
AssertQueryBuilder(plan).splits({split}).assertResults(data);

tableHandle = makeTableHandle(
SubfieldFiltersBuilder().add("c0", equal(5)).build(),
nullptr,
"hive_table",
nullptr,
false);
plan = exec::test::PlanBuilder(pool_.get())
.tableScan(rowType, tableHandle, assignments)
.planNode();
auto query = AssertQueryBuilder(plan);
query.splits({split});
VELOX_ASSERT_THROW(
query.copyResults(pool_.get()),
"Unexpected filter on table hive_table, field c0");
}

TEST_F(TableScanTest, reuseRowVector) {
auto iota = makeFlatVector<int32_t>(10, folly::identity);
auto data = makeRowVector({iota, makeRowVector({iota})});
Expand Down

0 comments on commit fbc6dcf

Please sign in to comment.