diff --git a/velox/connectors/hive/HiveDataSource.cpp b/velox/connectors/hive/HiveDataSource.cpp index e120f6375982..9f86026c558c 100644 --- a/velox/connectors/hive/HiveDataSource.cpp +++ b/velox/connectors/hive/HiveDataSource.cpp @@ -412,40 +412,14 @@ HiveDataSource::HiveDataSource( } SubfieldFilters filters; - core::TypedExprPtr remainingFilter; - if (hiveTableHandle_->isFilterPushdownEnabled()) { - for (auto& [k, v] : hiveTableHandle_->subfieldFilters()) { - filters.emplace(k.clone(), v->clone()); - } - remainingFilter = extractFiltersFromRemainingFilter( - hiveTableHandle_->remainingFilter(), - expressionEvaluator_, - false, - filters); - } else { - for (auto& [field, _] : hiveTableHandle_->subfieldFilters()) { - VELOX_USER_CHECK_EQ( - field.path().size(), - 1, - "Unexpected filter on table {}, field {}", - hiveTableHandle_->tableName(), - field.toString()); - auto* nestedField = dynamic_cast( - field.path()[0].get()); - VELOX_USER_CHECK_NOT_NULL( - nestedField, - "Unexpected filter on table {}, field {}", - hiveTableHandle_->tableName(), - field.toString()); - VELOX_USER_CHECK_GT( - partitionKeys_.count(nestedField->name()), - 0, - "Unexpected filter on table {}, field {}", - hiveTableHandle_->tableName(), - field.toString()); - } - remainingFilter = hiveTableHandle_->remainingFilter(); - } + for (auto& [k, v] : hiveTableHandle_->subfieldFilters()) { + filters.emplace(k.clone(), v->clone()); + } + auto remainingFilter = extractFiltersFromRemainingFilter( + hiveTableHandle_->remainingFilter(), + expressionEvaluator_, + false, + filters); std::vector remainingFilterSubfields; if (remainingFilter) { diff --git a/velox/dwio/common/Reader.cpp b/velox/dwio/common/Reader.cpp index 85e56bc44c8b..cf9742b92094 100644 --- a/velox/dwio/common/Reader.cpp +++ b/velox/dwio/common/Reader.cpp @@ -18,9 +18,72 @@ namespace facebook::velox::dwio::common { +using namespace velox::common; + +namespace { + +template +bool filterSimpleVectorRow( + const BaseVector& vector, + Filter& filter, + vector_size_t index) { + using T = typename TypeTraits::NativeType; + auto* simpleVector = vector.asUnchecked>(); + return applyFilter(filter, simpleVector->valueAt(index)); +} + +bool filterRow(const BaseVector& vector, Filter& filter, vector_size_t index) { + if (vector.isNullAt(index)) { + return filter.testNull(); + } + switch (vector.typeKind()) { + case TypeKind::ARRAY: + case TypeKind::MAP: + case TypeKind::ROW: + VELOX_USER_CHECK( + filter.kind() == FilterKind::kIsNull || + filter.kind() == FilterKind::kIsNotNull, + "Complex type can only take null filter, got {}", + filter.toString()); + return filter.testNonNull(); + default: + return VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH( + filterSimpleVectorRow, vector.typeKind(), vector, filter, index); + } +} + +void applyFilter( + const BaseVector& vector, + const ScanSpec& spec, + uint64_t* result) { + if (spec.filter()) { + bits::forEachSetBit(result, 0, vector.size(), [&](auto i) { + if (!filterRow(vector, *spec.filter(), i)) { + bits::clearBit(result, i); + } + }); + } + if (!vector.type()->isRow()) { + // Filter on MAP or ARRAY children are pruning, and won't affect correctness + // of the result. + return; + } + auto& rowType = vector.type()->asRow(); + auto* rowVector = vector.as(); + // Should not have any lazy from non-selective reader. + VELOX_CHECK_NOT_NULL(rowVector); + for (auto& childSpec : spec.children()) { + auto child = + rowVector->childAt(rowType.getChildIdx(childSpec->fieldName())); + applyFilter(*child, *childSpec, result); + } +} + +} // namespace + VectorPtr RowReader::projectColumns( const VectorPtr& input, - const velox::common::ScanSpec& spec) { + const ScanSpec& spec) { auto* inputRow = input->as(); VELOX_CHECK_NOT_NULL(inputRow); auto& inputRowType = input->type()->asRow(); @@ -31,27 +94,43 @@ VectorPtr RowReader::projectColumns( std::vector names(numColumns); std::vector types(numColumns); std::vector children(numColumns); + std::vector passed(bits::nwords(input->size()), -1); for (auto& childSpec : spec.children()) { + VectorPtr child; + if (childSpec->isConstant()) { + child = BaseVector::wrapInConstant( + input->size(), 0, childSpec->constantValue()); + } else { + child = + inputRow->childAt(inputRowType.getChildIdx(childSpec->fieldName())); + applyFilter(*child, *childSpec, passed.data()); + } if (!childSpec->projectOut()) { continue; } auto i = childSpec->channel(); names[i] = childSpec->fieldName(); - if (childSpec->isConstant()) { - children[i] = BaseVector::wrapInConstant( - input->size(), 0, childSpec->constantValue()); - } else { - children[i] = - inputRow->childAt(inputRowType.getChildIdx(childSpec->fieldName())); + types[i] = child->type(); + children[i] = std::move(child); + } + auto rowType = ROW(std::move(names), std::move(types)); + auto size = bits::countBits(passed.data(), 0, input->size()); + if (size == 0) { + return RowVector::createEmpty(rowType, input->pool()); + } + if (size < input->size()) { + auto indices = allocateIndices(size, input->pool()); + auto* rawIndices = indices->asMutable(); + vector_size_t j = 0; + bits::forEachSetBit( + passed.data(), 0, input->size(), [&](auto i) { rawIndices[j++] = i; }); + for (auto& child : children) { + child = BaseVector::wrapInDictionary( + nullptr, indices, size, std::move(child)); } - types[i] = children[i]->type(); } return std::make_shared( - input->pool(), - ROW(std::move(names), std::move(types)), - nullptr, - input->size(), - std::move(children)); + input->pool(), rowType, nullptr, size, std::move(children)); } } // namespace facebook::velox::dwio::common diff --git a/velox/dwio/common/tests/CMakeLists.txt b/velox/dwio/common/tests/CMakeLists.txt index 4487b18ce441..853b9241fef1 100644 --- a/velox/dwio/common/tests/CMakeLists.txt +++ b/velox/dwio/common/tests/CMakeLists.txt @@ -26,6 +26,7 @@ add_executable( LoggedExceptionTest.cpp RangeTests.cpp ReadFileInputStreamTests.cpp + ReaderTest.cpp RetryTests.cpp TestBufferedInput.cpp TypeTests.cpp) @@ -34,6 +35,7 @@ target_link_libraries( velox_dwio_common_test velox_dwio_common_test_utils velox_temp_path + velox_vector_test_lib Boost::regex velox_link_libs Folly::folly diff --git a/velox/dwio/common/tests/ReaderTest.cpp b/velox/dwio/common/tests/ReaderTest.cpp new file mode 100644 index 000000000000..1be4039059ab --- /dev/null +++ b/velox/dwio/common/tests/ReaderTest.cpp @@ -0,0 +1,82 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/dwio/common/Reader.h" +#include "velox/vector/tests/utils/VectorTestBase.h" + +#include + +namespace facebook::velox::dwio::common { +namespace { + +using namespace facebook::velox::common; + +class ReaderTest : public testing::Test, public test::VectorTestBase {}; + +TEST_F(ReaderTest, projectColumnsFilterStruct) { + constexpr int kSize = 10; + auto input = makeRowVector({ + makeFlatVector(kSize, folly::identity), + makeRowVector({ + makeFlatVector(kSize, folly::identity), + }), + }); + common::ScanSpec spec(""); + spec.addField("c0", 0); + spec.getOrCreateChild(common::Subfield("c1.c0")) + ->setFilter(common::createBigintValues({2, 4, 6}, false)); + auto actual = RowReader::projectColumns(input, spec); + auto expected = makeRowVector({ + makeFlatVector({2, 4, 6}), + }); + test::assertEqualVectors(expected, actual); +} + +TEST_F(ReaderTest, projectColumnsFilterArray) { + constexpr int kSize = 10; + auto input = makeRowVector({ + makeFlatVector(kSize, folly::identity), + makeArrayVector( + kSize, + [](auto) { return 1; }, + [](auto i) { return i; }, + [](auto i) { return i % 2 != 0; }), + }); + common::ScanSpec spec(""); + spec.addField("c0", 0); + auto* c1 = spec.getOrCreateChild(common::Subfield("c1")); + { + SCOPED_TRACE("IS NULL"); + c1->setFilter(std::make_unique()); + auto actual = RowReader::projectColumns(input, spec); + auto expected = makeRowVector({ + makeFlatVector({1, 3, 5, 7, 9}), + }); + test::assertEqualVectors(expected, actual); + } + { + SCOPED_TRACE("IS NOT NULL"); + c1->setFilter(std::make_unique()); + auto actual = RowReader::projectColumns(input, spec); + auto expected = makeRowVector({ + makeFlatVector({0, 2, 4, 6, 8}), + }); + test::assertEqualVectors(expected, actual); + } +} + +} // namespace +} // namespace facebook::velox::dwio::common diff --git a/velox/exec/tests/TableScanTest.cpp b/velox/exec/tests/TableScanTest.cpp index 148923c1e20b..ed322fcc9753 100644 --- a/velox/exec/tests/TableScanTest.cpp +++ b/velox/exec/tests/TableScanTest.cpp @@ -2901,46 +2901,6 @@ TEST_F(TableScanTest, dictionaryMemo) { #endif } -TEST_F(TableScanTest, filterPushdownDisabledChecks) { - auto data = makeRowVector({makeFlatVector(10, folly::identity)}); - auto rowType = asRowType(data->type()); - auto file = TempFilePath::create(); - writeToFile(file->path, {data}); - ColumnHandleMap assignments = { - {"ds", partitionKey("ds", VARCHAR())}, - {"c0", regularColumn("c0", INTEGER())}, - }; - auto split = HiveConnectorSplitBuilder(file->path) - .partitionKey("ds", "2023-07-12") - .build(); - - auto tableHandle = makeTableHandle( - SubfieldFiltersBuilder().add("ds", equal("2023-07-12")).build(), - nullptr, - "hive_table", - nullptr, - false); - auto plan = exec::test::PlanBuilder(pool_.get()) - .tableScan(rowType, tableHandle, assignments) - .planNode(); - AssertQueryBuilder(plan).splits({split}).assertResults(data); - - tableHandle = makeTableHandle( - SubfieldFiltersBuilder().add("c0", equal(5)).build(), - nullptr, - "hive_table", - nullptr, - false); - plan = exec::test::PlanBuilder(pool_.get()) - .tableScan(rowType, tableHandle, assignments) - .planNode(); - auto query = AssertQueryBuilder(plan); - query.splits({split}); - VELOX_ASSERT_THROW( - query.copyResults(pool_.get()), - "Unexpected filter on table hive_table, field c0"); -} - TEST_F(TableScanTest, reuseRowVector) { auto iota = makeFlatVector(10, folly::identity); auto data = makeRowVector({iota, makeRowVector({iota})});