diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 3eb4c2b03f4..c02f08431b6 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -124,22 +124,32 @@ static Result GetSchemaManifest( return manifest; } -static std::shared_ptr ColumnChunkStatisticsAsExpression( +static std::shared_ptr MakeMinMaxScalar(std::shared_ptr min, + std::shared_ptr max) { + DCHECK(min->type->Equals(max->type)); + return std::make_shared(ScalarVector{min, max}, + struct_({ + field("min", min->type), + field("max", max->type), + })); +} + +static std::shared_ptr ColumnChunkStatisticsAsStructScalar( const SchemaField& schema_field, const parquet::RowGroupMetaData& metadata) { // For the remaining of this function, failure to extract/parse statistics - // are ignored by returning the `true` scalar. The goal is two fold. First - // avoid that an optimization break the computation. Second, allow the + // are ignored by returning nullptr. The goal is two fold. First + // avoid an optimization which breaks the computation. Second, allow the // following columns to maybe succeed in extracting column statistics. // For now, only leaf (primitive) types are supported. if (!schema_field.is_leaf()) { - return scalar(true); + return nullptr; } auto column_metadata = metadata.ColumnChunk(schema_field.column_index); auto statistics = column_metadata->statistics(); if (statistics == nullptr) { - return scalar(true); + return nullptr; } const auto& field = schema_field.field; @@ -147,28 +157,31 @@ static std::shared_ptr ColumnChunkStatisticsAsExpression( // Optimize for corner case where all values are nulls if (statistics->num_values() == statistics->null_count()) { - return equal(field_expr, scalar(MakeNullScalar(field->type()))); + auto null = MakeNullScalar(field->type()); + return MakeMinMaxScalar(null, null); } std::shared_ptr min, max; if (!StatisticsAsScalars(*statistics, &min, &max).ok()) { - return scalar(true); + return nullptr; } - return and_(greater_equal(field_expr, scalar(min)), - less_equal(field_expr, scalar(max))); + return MakeMinMaxScalar(std::move(min), std::move(max)); } -static std::shared_ptr RowGroupStatisticsAsExpression( +static std::shared_ptr RowGroupStatisticsAsStructScalar( const parquet::RowGroupMetaData& metadata, const SchemaManifest& manifest) { - const auto& fields = manifest.schema_fields; - ExpressionVector expressions; - expressions.reserve(fields.size()); - for (const auto& field : fields) { - expressions.emplace_back(ColumnChunkStatisticsAsExpression(field, metadata)); + FieldVector fields; + ScalarVector statistics; + for (const auto& schema_field : manifest.schema_fields) { + if (auto min_max = ColumnChunkStatisticsAsStructScalar(schema_field, metadata)) { + fields.push_back(field(schema_field.field->name(), min_max->type)); + statistics.push_back(std::move(min_max)); + } } - return expressions.empty() ? scalar(true) : and_(expressions); + return std::make_shared(std::move(statistics), + struct_(std::move(fields))); } class ParquetScanTaskIterator { @@ -349,7 +362,7 @@ static inline Result> AugmentRowGroups( if (!info.HasStatistics() && info.id() < num_row_groups) { auto row_group = metadata->RowGroup(info.id()); info.set_num_rows(row_group->num_rows()); - info.set_statistics(RowGroupStatisticsAsExpression(*row_group, manifest)); + info.set_statistics(RowGroupStatisticsAsStructScalar(*row_group, manifest)); } }; std::for_each(row_groups.begin(), row_groups.end(), augment); @@ -444,8 +457,39 @@ std::vector RowGroupInfo::FromCount(int count) { return result; } +void RowGroupInfo::SetStatisticsExpression() { + if (!HasStatistics()) { + statistics_expression_ = nullptr; + return; + } + + if (statistics_->value.empty()) { + statistics_expression_ = scalar(true); + return; + } + + ExpressionVector expressions{statistics_->value.size()}; + + for (size_t i = 0; i < expressions.size(); ++i) { + const auto& col_stats = + internal::checked_cast(*statistics_->value[i]); + auto field_expr = field_ref(statistics_->type->field(static_cast(i))->name()); + + DCHECK_EQ(col_stats.value.size(), 2); + const auto& min = col_stats.value[0]; + const auto& max = col_stats.value[1]; + + DCHECK_EQ(min->is_valid, max->is_valid); + expressions[i] = min->is_valid ? and_(greater_equal(field_expr, scalar(min)), + less_equal(field_expr, scalar(max))) + : equal(std::move(field_expr), scalar(min)); + } + + statistics_expression_ = and_(std::move(expressions)); +} + bool RowGroupInfo::Satisfy(const Expression& predicate) const { - return !HasStatistics() || predicate.IsSatisfiableWith(statistics_); + return !HasStatistics() || predicate.IsSatisfiableWith(statistics_expression_); } /// @@ -622,7 +666,7 @@ ParquetDatasetFactory::CollectParquetFragments( auto row_group = metadata.RowGroup(i); ARROW_ASSIGN_OR_RAISE(auto path, FileFromRowGroup(filesystem_.get(), base_path_, *row_group)); - auto stats = RowGroupStatisticsAsExpression(*row_group, manifest); + auto stats = RowGroupStatisticsAsStructScalar(*row_group, manifest); auto num_rows = row_group->num_rows(); // Insert the path, or increase the count of row groups. It will be diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index f5fba7ea7f9..182280e68f7 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -134,8 +134,10 @@ class ARROW_DS_EXPORT RowGroupInfo : public util::EqualityComparable statistics) - : id_(id), num_rows_(num_rows), statistics_(std::move(statistics)) {} + RowGroupInfo(int id, int64_t num_rows, std::shared_ptr statistics) + : id_(id), num_rows_(num_rows), statistics_(std::move(statistics)) { + SetStatisticsExpression(); + } /// \brief Transform a vector of identifiers into a vector of RowGroupInfos static std::vector FromIdentifiers(const std::vector ids); @@ -150,10 +152,13 @@ class ARROW_DS_EXPORT RowGroupInfo : public util::EqualityComparable& statistics() const { return statistics_; } - void set_statistics(std::shared_ptr statistics) { + /// \brief Return the RowGroup's statistics as a StructScalar with a field for + /// each column with statistics. + /// Each field will also be a StructScalar with "min" and "max" fields. + const std::shared_ptr& statistics() const { return statistics_; } + void set_statistics(std::shared_ptr statistics) { statistics_ = std::move(statistics); + SetStatisticsExpression(); } /// \brief Indicate if statistics are set. @@ -169,9 +174,12 @@ class ARROW_DS_EXPORT RowGroupInfo : public util::EqualityComparable statistics_; + std::shared_ptr statistics_expression_; + std::shared_ptr statistics_; }; /// \brief A FileFragment with parquet logic. diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 5ffb11f34c8..8047c5f62c0 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -845,6 +845,28 @@ cdef class RowGroupInfo: def num_rows(self): return self.info.num_rows() + @property + def statistics(self): + if not self.info.HasStatistics(): + return None + + cdef: + CStructScalar* c_statistics + CStructScalar* c_minmax + + statistics = dict() + c_statistics = self.info.statistics().get() + for i in range(c_statistics.value.size()): + name = frombytes(c_statistics.type.get().field(i).get().name()) + c_minmax = c_statistics.value[i].get() + + statistics[name] = { + 'min': pyarrow_wrap_scalar(c_minmax.value[0]).as_py(), + 'max': pyarrow_wrap_scalar(c_minmax.value[1]).as_py(), + } + + return statistics + def __eq__(self, other): if not isinstance(other, RowGroupInfo): return False diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 9f7b7b3cec8..d2315ee30dd 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -886,6 +886,9 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: cdef cppclass CStringScalar" arrow::StringScalar"(CScalar): shared_ptr[CBuffer] value + cdef cppclass CStructScalar" arrow::StructScalar"(CScalar): + vector[shared_ptr[CScalar]] value + shared_ptr[CScalar] MakeScalar[Value](Value value) shared_ptr[CScalar] MakeStringScalar" arrow::MakeScalar"(c_string value) diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index f80fdb9aba0..6823bd995c7 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -216,11 +216,11 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: cdef cppclass CRowGroupInfo "arrow::dataset::RowGroupInfo": CRowGroupInfo() CRowGroupInfo(int id) - CRowGroupInfo( - int id, int64_t n_rows, shared_ptr[CExpression] statistics) int id() const int64_t num_rows() const bint Equals(const CRowGroupInfo& other) + c_bool HasStatistics() const + shared_ptr[CStructScalar] statistics() const cdef cppclass CParquetFileFragment "arrow::dataset::ParquetFileFragment"( CFileFragment): diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 08fec4d376e..2a59f62c207 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -715,6 +715,12 @@ def test_fragments_parquet_row_groups(tempdir): assert len(result) == 2 assert result.equals(table.slice(0, 2)) + assert row_group_fragments[0].row_groups is not None + assert row_group_fragments[0].row_groups[0].statistics == { + 'f1': {'min': 0, 'max': 1}, + 'f2': {'min': 1, 'max': 1}, + } + fragment = list(dataset.get_fragments(filter=ds.field('f1') < 1))[0] row_group_fragments = list(fragment.split_by_row_group(ds.field('f1') < 1)) assert len(row_group_fragments) == 1