Skip to content

Commit 10eedbe

Browse files
authored
GH-35579: [C++] Support non-named FieldRefs in Parquet scanner (#35798)
### Rationale for this change When setting projections/filters for the file system scanner, the Parquet implementation requires that all materialized `FieldRef`s be position-independent (containing only names). However, it may be useful to support index-based field lookups as well - assuming the dataset schema is known. ### What changes are included in this PR? Adds a translation step for field refs prior to looking them up in the fragment schema. A known dataset schema is required to do this reliably, however (since the fragment schema may be a sub/superset of the dataset schema) - so in the absence of one, we fall back to the existing behavior. ### Are these changes tested? Yes (tests are included) ### Are there any user-facing changes? Yes * Closes: #35579 Authored-by: benibus <bpharks@gmx.com> Signed-off-by: Weston Pace <weston.pace@gmail.com>
1 parent 10708b3 commit 10eedbe

File tree

4 files changed

+87
-5
lines changed

4 files changed

+87
-5
lines changed

cpp/src/arrow/dataset/file_parquet.cc

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,28 @@ Status ResolveOneFieldRef(
224224
return Status::OK();
225225
}
226226

227+
// Converts a field ref into a position-independent ref (containing only a sequence of
228+
// names) based on the dataset schema. Returns `false` if no conversion was needed.
229+
Result<FieldRef> MaybeConvertFieldRef(FieldRef ref, const Schema& dataset_schema) {
230+
if (ARROW_PREDICT_TRUE(ref.IsNameSequence())) {
231+
return std::move(ref);
232+
}
233+
234+
ARROW_ASSIGN_OR_RAISE(auto path, ref.FindOne(dataset_schema));
235+
std::vector<FieldRef> named_refs;
236+
named_refs.reserve(path.indices().size());
237+
238+
const FieldVector* child_fields = &dataset_schema.fields();
239+
for (auto index : path) {
240+
const auto& child_field = *(*child_fields)[index];
241+
named_refs.emplace_back(child_field.name());
242+
child_fields = &child_field.type()->fields();
243+
}
244+
245+
return named_refs.size() == 1 ? std::move(named_refs[0])
246+
: FieldRef(std::move(named_refs));
247+
}
248+
227249
// Compute the column projection based on the scan options
228250
Result<std::vector<int>> InferColumnProjection(const parquet::arrow::FileReader& reader,
229251
const ScanOptions& options) {
@@ -248,7 +270,13 @@ Result<std::vector<int>> InferColumnProjection(const parquet::arrow::FileReader&
248270
}
249271

250272
std::vector<int> columns_selection;
251-
for (const auto& ref : field_refs) {
273+
for (auto& ref : field_refs) {
274+
// In the (unlikely) absence of a known dataset schema, we require that all
275+
// materialized refs are named.
276+
if (options.dataset_schema) {
277+
ARROW_ASSIGN_OR_RAISE(
278+
ref, MaybeConvertFieldRef(std::move(ref), *options.dataset_schema));
279+
}
252280
RETURN_NOT_OK(ResolveOneFieldRef(manifest, ref, field_lookup, duplicate_fields,
253281
&columns_selection));
254282
}

cpp/src/arrow/dataset/file_parquet_test.cc

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -678,6 +678,42 @@ TEST_P(TestParquetFileFormatScan, PredicatePushdownRowGroupFragmentsUsingStringC
678678
CountRowGroupsInFragment(fragment, {0, 3}, equal(field_ref("x"), literal("a")));
679679
}
680680

681+
// Tests projection with nested/indexed FieldRefs.
682+
// https://github.com/apache/arrow/issues/35579
683+
TEST_P(TestParquetFileFormatScan, ProjectWithNonNamedFieldRefs) {
684+
auto table_schema = schema(
685+
{field("info", struct_({field("name", utf8()),
686+
field("data", struct_({field("amount", float64()),
687+
field("percent", float32())}))}))});
688+
auto table = TableFromJSON(table_schema, {R"([
689+
{"info": {"name": "a", "data": {"amount": 10.3, "percent": 0.1}}},
690+
{"info": {"name": "b", "data": {"amount": 11.6, "percent": 0.2}}},
691+
{"info": {"name": "c", "data": {"amount": 12.9, "percent": 0.3}}},
692+
{"info": {"name": "d", "data": {"amount": 14.2, "percent": 0.4}}},
693+
{"info": {"name": "e", "data": {"amount": 15.5, "percent": 0.5}}},
694+
{"info": {"name": "f", "data": {"amount": 16.8, "percent": 0.6}}}])"});
695+
ASSERT_OK_AND_ASSIGN(auto expected_batch, table->CombineChunksToBatch());
696+
697+
TableBatchReader reader(*table);
698+
SetSchema(reader.schema()->fields());
699+
700+
auto source = GetFileSource(&reader);
701+
ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source));
702+
703+
std::vector<FieldRef> equivalent_refs = {
704+
FieldRef("info", "data", "percent"), FieldRef("info", 1, 1),
705+
FieldRef(0, 1, "percent"), FieldRef(0, 1, 1),
706+
FieldRef(0, FieldRef("data", 1)), FieldRef(FieldRef(0), FieldRef(1, 1)),
707+
};
708+
for (const auto& ref : equivalent_refs) {
709+
ARROW_SCOPED_TRACE("ref = ", ref.ToString());
710+
711+
Project({field_ref(ref)}, {"value"});
712+
auto batch = SingleBatch(fragment);
713+
AssertBatchesEqual(*expected_batch, *batch);
714+
}
715+
}
716+
681717
INSTANTIATE_TEST_SUITE_P(TestScan, TestParquetFileFormatScan,
682718
::testing::ValuesIn(TestFormatParams::Values()),
683719
TestFormatParams::ToTestNameString);

cpp/src/arrow/dataset/test_util_internal.h

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -516,16 +516,20 @@ class FileFormatFixtureMixin : public ::testing::Test {
516516
SetProjection(opts_.get(), std::move(projection));
517517
}
518518

519+
void Project(std::vector<compute::Expression> exprs, std::vector<std::string> names) {
520+
ASSERT_OK_AND_ASSIGN(auto projection,
521+
ProjectionDescr::FromExpressions(
522+
std::move(exprs), std::move(names), *opts_->dataset_schema));
523+
SetProjection(opts_.get(), std::move(projection));
524+
}
525+
519526
void ProjectNested(std::vector<std::string> names) {
520527
std::vector<compute::Expression> exprs;
521528
for (const auto& name : names) {
522529
ASSERT_OK_AND_ASSIGN(auto ref, FieldRef::FromDotPath(name));
523530
exprs.push_back(field_ref(ref));
524531
}
525-
ASSERT_OK_AND_ASSIGN(
526-
auto descr, ProjectionDescr::FromExpressions(std::move(exprs), std::move(names),
527-
*opts_->dataset_schema));
528-
SetProjection(opts_.get(), std::move(descr));
532+
Project(std::move(exprs), std::move(names));
529533
}
530534

531535
// Shared test cases

cpp/src/arrow/type.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1823,6 +1823,20 @@ class ARROW_EXPORT FieldRef : public util::EqualityComparable<FieldRef> {
18231823
return true;
18241824
}
18251825

1826+
/// \brief Return true if this ref is a name or a nested sequence of only names
1827+
///
1828+
/// Useful for determining if iteration is possible without recursion or inner loops
1829+
bool IsNameSequence() const {
1830+
if (IsName()) return true;
1831+
if (const auto* nested = nested_refs()) {
1832+
for (const auto& ref : *nested) {
1833+
if (!ref.IsName()) return false;
1834+
}
1835+
return !nested->empty();
1836+
}
1837+
return false;
1838+
}
1839+
18261840
const FieldPath* field_path() const {
18271841
return IsFieldPath() ? &std::get<FieldPath>(impl_) : NULLPTR;
18281842
}

0 commit comments

Comments
 (0)