Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,28 @@ Status ResolveOneFieldRef(
return Status::OK();
}

// Converts a field ref into a position-independent ref (containing only a sequence of
// names) based on the dataset schema. Returns `false` if no conversion was needed.
Result<FieldRef> MaybeConvertFieldRef(FieldRef ref, const Schema& dataset_schema) {
if (ARROW_PREDICT_TRUE(ref.IsNameSequence())) {
return std::move(ref);
}

ARROW_ASSIGN_OR_RAISE(auto path, ref.FindOne(dataset_schema));
std::vector<FieldRef> named_refs;
named_refs.reserve(path.indices().size());

const FieldVector* child_fields = &dataset_schema.fields();
for (auto index : path) {
const auto& child_field = *(*child_fields)[index];
named_refs.emplace_back(child_field.name());
child_fields = &child_field.type()->fields();
}

return named_refs.size() == 1 ? std::move(named_refs[0])
: FieldRef(std::move(named_refs));
}

// Compute the column projection based on the scan options
Result<std::vector<int>> InferColumnProjection(const parquet::arrow::FileReader& reader,
const ScanOptions& options) {
Expand All @@ -248,7 +270,13 @@ Result<std::vector<int>> InferColumnProjection(const parquet::arrow::FileReader&
}

std::vector<int> columns_selection;
for (const auto& ref : field_refs) {
for (auto& ref : field_refs) {
// In the (unlikely) absence of a known dataset schema, we require that all
// materialized refs are named.
if (options.dataset_schema) {
ARROW_ASSIGN_OR_RAISE(
ref, MaybeConvertFieldRef(std::move(ref), *options.dataset_schema));
}
RETURN_NOT_OK(ResolveOneFieldRef(manifest, ref, field_lookup, duplicate_fields,
&columns_selection));
}
Expand Down
36 changes: 36 additions & 0 deletions cpp/src/arrow/dataset/file_parquet_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,42 @@ TEST_P(TestParquetFileFormatScan, PredicatePushdownRowGroupFragmentsUsingStringC
CountRowGroupsInFragment(fragment, {0, 3}, equal(field_ref("x"), literal("a")));
}

// Tests projection with nested/indexed FieldRefs.
// https://github.com/apache/arrow/issues/35579
TEST_P(TestParquetFileFormatScan, ProjectWithNonNamedFieldRefs) {
auto table_schema = schema(
{field("info", struct_({field("name", utf8()),
field("data", struct_({field("amount", float64()),
field("percent", float32())}))}))});
auto table = TableFromJSON(table_schema, {R"([
{"info": {"name": "a", "data": {"amount": 10.3, "percent": 0.1}}},
{"info": {"name": "b", "data": {"amount": 11.6, "percent": 0.2}}},
{"info": {"name": "c", "data": {"amount": 12.9, "percent": 0.3}}},
{"info": {"name": "d", "data": {"amount": 14.2, "percent": 0.4}}},
{"info": {"name": "e", "data": {"amount": 15.5, "percent": 0.5}}},
{"info": {"name": "f", "data": {"amount": 16.8, "percent": 0.6}}}])"});
ASSERT_OK_AND_ASSIGN(auto expected_batch, table->CombineChunksToBatch());

TableBatchReader reader(*table);
SetSchema(reader.schema()->fields());

auto source = GetFileSource(&reader);
ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source));

std::vector<FieldRef> equivalent_refs = {
FieldRef("info", "data", "percent"), FieldRef("info", 1, 1),
FieldRef(0, 1, "percent"), FieldRef(0, 1, 1),
FieldRef(0, FieldRef("data", 1)), FieldRef(FieldRef(0), FieldRef(1, 1)),
};
for (const auto& ref : equivalent_refs) {
ARROW_SCOPED_TRACE("ref = ", ref.ToString());

Project({field_ref(ref)}, {"value"});
auto batch = SingleBatch(fragment);
AssertBatchesEqual(*expected_batch, *batch);
}
}

INSTANTIATE_TEST_SUITE_P(TestScan, TestParquetFileFormatScan,
::testing::ValuesIn(TestFormatParams::Values()),
TestFormatParams::ToTestNameString);
Expand Down
12 changes: 8 additions & 4 deletions cpp/src/arrow/dataset/test_util_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -516,16 +516,20 @@ class FileFormatFixtureMixin : public ::testing::Test {
SetProjection(opts_.get(), std::move(projection));
}

void Project(std::vector<compute::Expression> exprs, std::vector<std::string> names) {
ASSERT_OK_AND_ASSIGN(auto projection,
ProjectionDescr::FromExpressions(
std::move(exprs), std::move(names), *opts_->dataset_schema));
SetProjection(opts_.get(), std::move(projection));
}

void ProjectNested(std::vector<std::string> names) {
std::vector<compute::Expression> exprs;
for (const auto& name : names) {
ASSERT_OK_AND_ASSIGN(auto ref, FieldRef::FromDotPath(name));
exprs.push_back(field_ref(ref));
}
ASSERT_OK_AND_ASSIGN(
auto descr, ProjectionDescr::FromExpressions(std::move(exprs), std::move(names),
*opts_->dataset_schema));
SetProjection(opts_.get(), std::move(descr));
Project(std::move(exprs), std::move(names));
}

// Shared test cases
Expand Down
14 changes: 14 additions & 0 deletions cpp/src/arrow/type.h
Original file line number Diff line number Diff line change
Expand Up @@ -1823,6 +1823,20 @@ class ARROW_EXPORT FieldRef : public util::EqualityComparable<FieldRef> {
return true;
}

/// \brief Return true if this ref is a name or a nested sequence of only names
///
/// Useful for determining if iteration is possible without recursion or inner loops
bool IsNameSequence() const {
if (IsName()) return true;
if (const auto* nested = nested_refs()) {
for (const auto& ref : *nested) {
if (!ref.IsName()) return false;
}
return !nested->empty();
}
return false;
}

const FieldPath* field_path() const {
return IsFieldPath() ? &std::get<FieldPath>(impl_) : NULLPTR;
}
Expand Down