diff --git a/cpp/examples/arrow/dataset-parquet-scan-example.cc b/cpp/examples/arrow/dataset-parquet-scan-example.cc index 16d674bb16e..40e15560681 100644 --- a/cpp/examples/arrow/dataset-parquet-scan-example.cc +++ b/cpp/examples/arrow/dataset-parquet-scan-example.cc @@ -15,11 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include -#include - #include - #include #include #include @@ -27,6 +23,10 @@ #include #include #include +#include + +#include +#include using arrow::field; using arrow::int16; @@ -73,12 +73,12 @@ std::shared_ptr GetFileSystemFromUri(const std::string& uri, return fs::FileSystemFromUri(uri, path).ValueOrDie(); } -std::shared_ptr GetDatasetFromPath(std::shared_ptr fs, - std::shared_ptr format, - std::string path) { +std::shared_ptr GetDatasetFromDirectory( + std::shared_ptr fs, std::shared_ptr format, + std::string dir) { // Find all files under `path` fs::FileSelector s; - s.base_dir = path; + s.base_dir = dir; s.recursive = true; ds::FileSystemFactoryOptions options; @@ -97,6 +97,50 @@ std::shared_ptr GetDatasetFromPath(std::shared_ptr return dataset.ValueOrDie(); } +std::shared_ptr GetParquetDatasetFromMetadata( + std::shared_ptr fs, std::shared_ptr format, + std::string metadata_path) { + auto factory = ds::ParquetDatasetFactory::Make(metadata_path, fs, format).ValueOrDie(); + return factory->Finish().ValueOrDie(); +} + +std::shared_ptr GetDatasetFromFile( + std::shared_ptr fs, std::shared_ptr format, + std::string file) { + ds::FileSystemFactoryOptions options; + // The factory will try to build a child dataset. + auto factory = ds::FileSystemDatasetFactory::Make(fs, {file}, format, options).ValueOrDie(); + + // Try to infer a common schema for all files. + auto schema = factory->Inspect(conf.inspect_options).ValueOrDie(); + // Caller can optionally decide another schema as long as it is compatible + // with the previous one, e.g. `factory->Finish(compatible_schema)`. + auto child = factory->Finish(conf.finish_options).ValueOrDie(); + + ds::DatasetVector children{conf.repeat, child}; + auto dataset = ds::UnionDataset::Make(std::move(schema), std::move(children)); + + return dataset.ValueOrDie(); +} + +std::shared_ptr GetDatasetFromPath( + std::shared_ptr fs, std::shared_ptr format, + std::string path) { + auto info = fs->GetFileInfo(path).ValueOrDie(); + if (info.IsDirectory()) { + return GetDatasetFromDirectory(fs, format, path); + } + + auto dirname_basename = arrow::fs::internal::GetAbstractPathParent(path); + auto basename = dirname_basename.second; + + if (basename == "_metadata") { + return GetParquetDatasetFromMetadata(fs, format, path); + } + + return GetDatasetFromFile(fs, format, path); +} + std::shared_ptr GetScannerFromDataset(std::shared_ptr dataset, std::vector columns, std::shared_ptr filter, diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 0c895784ff3..23f45aee32f 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -18,6 +18,7 @@ #include "arrow/dataset/file_parquet.h" #include +#include #include #include #include @@ -25,6 +26,7 @@ #include "arrow/dataset/dataset_internal.h" #include "arrow/dataset/filter.h" #include "arrow/dataset/scanner.h" +#include "arrow/filesystem/path_util.h" #include "arrow/table.h" #include "arrow/util/iterator.h" #include "arrow/util/range.h" @@ -44,12 +46,12 @@ using parquet::arrow::StatisticsAsScalars; /// \brief A ScanTask backed by a parquet file and a RowGroup within a parquet file. class ParquetScanTask : public ScanTask { public: - ParquetScanTask(int row_group, std::vector column_projection, + ParquetScanTask(RowGroupInfo row_group, std::vector column_projection, std::shared_ptr reader, std::shared_ptr options, std::shared_ptr context) : ScanTask(std::move(options), std::move(context)), - row_group_(row_group), + row_group_(std::move(row_group)), column_projection_(std::move(column_projection)), reader_(std::move(reader)) {} @@ -61,13 +63,13 @@ class ParquetScanTask : public ScanTask { // Thus the memory incurred by the RecordBatchReader is allocated when // Scan is called. std::unique_ptr record_batch_reader; - RETURN_NOT_OK(reader_->GetRecordBatchReader({row_group_}, column_projection_, + RETURN_NOT_OK(reader_->GetRecordBatchReader({row_group_.id()}, column_projection_, &record_batch_reader)); return IteratorFromReader(std::move(record_batch_reader)); } private: - int row_group_; + RowGroupInfo row_group_; std::vector column_projection_; // The ScanTask _must_ hold a reference to reader_ because there's no // guarantee the producing ParquetScanTaskIterator is still alive. This is a @@ -102,14 +104,12 @@ static parquet::ReaderProperties MakeReaderProperties( } static parquet::ArrowReaderProperties MakeArrowReaderProperties( - const ParquetFileFormat& format, int64_t batch_size, - const parquet::ParquetFileReader& reader) { + const ParquetFileFormat& format, const parquet::FileMetaData& metadata) { parquet::ArrowReaderProperties properties(/* use_threads = */ false); for (const std::string& name : format.reader_options.dict_columns) { - auto column_index = reader.metadata()->schema()->ColumnIndex(name); + auto column_index = metadata.schema()->ColumnIndex(name); properties.set_read_dictionary(column_index, true); } - properties.set_batch_size(batch_size); return properties; } @@ -136,19 +136,14 @@ static std::shared_ptr ColumnChunkStatisticsAsExpression( } auto column_metadata = metadata.ColumnChunk(schema_field.column_index); - auto field = schema_field.field; - auto field_expr = field_ref(field->name()); - - // In case of missing statistics, return nothing. - if (!column_metadata->is_stats_set()) { - return scalar(true); - } - auto statistics = column_metadata->statistics(); if (statistics == nullptr) { return scalar(true); } + const auto& field = schema_field.field; + auto field_expr = field_ref(field->name()); + // Optimize for corner case where all values are nulls if (statistics->num_values() == statistics->null_count()) { return equal(field_expr, scalar(MakeNullScalar(field->type()))); @@ -163,126 +158,50 @@ static std::shared_ptr ColumnChunkStatisticsAsExpression( less_equal(field_expr, scalar(max))); } -static Result> RowGroupStatisticsAsExpression( - const parquet::RowGroupMetaData& metadata, - const parquet::ArrowReaderProperties& properties) { - ARROW_ASSIGN_OR_RAISE(auto manifest, GetSchemaManifest(metadata, properties)); - +static std::shared_ptr RowGroupStatisticsAsExpression( + const parquet::RowGroupMetaData& metadata, const SchemaManifest& manifest) { + const auto& fields = manifest.schema_fields; ExpressionVector expressions; - for (const auto& schema_field : manifest.schema_fields) { - expressions.emplace_back(ColumnChunkStatisticsAsExpression(schema_field, metadata)); + expressions.reserve(fields.size()); + for (const auto& field : fields) { + expressions.emplace_back(ColumnChunkStatisticsAsExpression(field, metadata)); } return expressions.empty() ? scalar(true) : and_(expressions); } -// Skip RowGroups with a filter and metadata -class RowGroupSkipper { - public: - static constexpr int kIterationDone = -1; - - RowGroupSkipper(std::shared_ptr metadata, - parquet::ArrowReaderProperties arrow_properties, - std::shared_ptr filter, std::vector row_groups) - : metadata_(std::move(metadata)), - arrow_properties_(std::move(arrow_properties)), - filter_(std::move(filter)), - row_group_idx_(0), - row_groups_(std::move(row_groups)), - num_row_groups_(row_groups_.empty() ? metadata_->num_row_groups() - : static_cast(row_groups_.size())) {} - - int Next() { - while (row_group_idx_ < num_row_groups_) { - const int row_group = - row_groups_.empty() ? row_group_idx_++ : row_groups_[row_group_idx_++]; - - const auto row_group_metadata = metadata_->RowGroup(row_group); - - const int64_t num_rows = row_group_metadata->num_rows(); - if (CanSkip(*row_group_metadata)) { - rows_skipped_ += num_rows; - continue; - } - - return row_group; - } - - return kIterationDone; - } - - private: - bool CanSkip(const parquet::RowGroupMetaData& metadata) const { - auto maybe_stats_expr = RowGroupStatisticsAsExpression(metadata, arrow_properties_); - // Errors with statistics are ignored and post-filtering will apply. - if (!maybe_stats_expr.ok()) { - return false; - } - - auto stats_expr = maybe_stats_expr.ValueOrDie(); - return !filter_->Assume(stats_expr)->IsSatisfiable(); - } - - std::shared_ptr metadata_; - parquet::ArrowReaderProperties arrow_properties_; - std::shared_ptr filter_; - int row_group_idx_; - std::vector row_groups_; - int num_row_groups_; - int64_t rows_skipped_; -}; - class ParquetScanTaskIterator { public: static Result Make(std::shared_ptr options, std::shared_ptr context, - std::unique_ptr reader, - parquet::ArrowReaderProperties arrow_properties, - const std::vector& row_groups) { - auto metadata = reader->metadata(); - - auto column_projection = InferColumnProjection(*metadata, arrow_properties, options); - - std::unique_ptr arrow_reader; - RETURN_NOT_OK(parquet::arrow::FileReader::Make(context->pool, std::move(reader), - arrow_properties, &arrow_reader)); - - RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties), - options->filter, row_groups); - - return ScanTaskIterator(ParquetScanTaskIterator( - std::move(options), std::move(context), std::move(column_projection), - std::move(skipper), std::move(arrow_reader))); + FileSource source, + std::unique_ptr reader, + std::vector row_groups) { + auto column_projection = InferColumnProjection(*reader, *options); + return static_cast(ParquetScanTaskIterator( + std::move(options), std::move(context), std::move(source), std::move(reader), + std::move(column_projection), std::move(row_groups))); } Result> Next() { - auto row_group = skipper_.Next(); - - // Iteration is done. - if (row_group == RowGroupSkipper::kIterationDone) { + if (idx_ >= row_groups_.size()) { return nullptr; } + auto row_group = row_groups_[idx_++]; return std::shared_ptr( new ParquetScanTask(row_group, column_projection_, reader_, options_, context_)); } private: // Compute the column projection out of an optional arrow::Schema - static std::vector InferColumnProjection( - const parquet::FileMetaData& metadata, - const parquet::ArrowReaderProperties& arrow_properties, - const std::shared_ptr& options) { - auto maybe_manifest = GetSchemaManifest(metadata, arrow_properties); - if (!maybe_manifest.ok()) { - return internal::Iota(metadata.num_columns()); - } - auto manifest = std::move(maybe_manifest).ValueOrDie(); - + static std::vector InferColumnProjection(const parquet::arrow::FileReader& reader, + const ScanOptions& options) { + auto manifest = reader.manifest(); // Checks if the field is needed in either the projection or the filter. - auto fields_name = options->MaterializedFields(); - std::unordered_set materialized_fields{fields_name.cbegin(), - fields_name.cend()}; + auto field_names = options.MaterializedFields(); + std::unordered_set materialized_fields{field_names.cbegin(), + field_names.cend()}; auto should_materialize_column = [&materialized_fields](const std::string& f) { return materialized_fields.find(f) != materialized_fields.end(); }; @@ -315,20 +234,28 @@ class ParquetScanTaskIterator { } ParquetScanTaskIterator(std::shared_ptr options, - std::shared_ptr context, - std::vector column_projection, RowGroupSkipper skipper, - std::unique_ptr reader) + std::shared_ptr context, FileSource source, + std::unique_ptr reader, + std::vector column_projection, + std::vector row_groups) : options_(std::move(options)), context_(std::move(context)), + source_(std::move(source)), + reader_(std::move(reader)), column_projection_(std::move(column_projection)), - skipper_(std::move(skipper)), - reader_(std::move(reader)) {} + row_groups_(std::move(row_groups)) {} std::shared_ptr options_; std::shared_ptr context_; - std::vector column_projection_; - RowGroupSkipper skipper_; + + FileSource source_; std::shared_ptr reader_; + + std::vector column_projection_; + std::vector row_groups_; + + // row group index. + size_t idx_ = 0; }; ParquetFileFormat::ParquetFileFormat(const parquet::ReaderProperties& reader_properties) { @@ -359,19 +286,29 @@ Result ParquetFileFormat::IsSupported(const FileSource& source) const { Result> ParquetFileFormat::Inspect( const FileSource& source) const { - auto properties = MakeReaderProperties(*this); + ARROW_ASSIGN_OR_RAISE(auto reader, GetReader(source)); + std::shared_ptr schema; + RETURN_NOT_OK(reader->GetSchema(&schema)); + return schema; +} + +Result> ParquetFileFormat::GetReader( + const FileSource& source, ScanOptions* options, ScanContext* context) const { + MemoryPool* pool = context ? context->pool : default_memory_pool(); + auto properties = MakeReaderProperties(*this, pool); ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, std::move(properties))); - auto arrow_properties = - MakeArrowReaderProperties(*this, parquet::kArrowDefaultBatchSize, *reader); - std::unique_ptr arrow_reader; - RETURN_NOT_OK(parquet::arrow::FileReader::Make(default_memory_pool(), std::move(reader), - std::move(arrow_properties), - &arrow_reader)); + auto metadata = reader->metadata(); + auto arrow_properties = MakeArrowReaderProperties(*this, *metadata); - std::shared_ptr schema; - RETURN_NOT_OK(arrow_reader->GetSchema(&schema)); - return schema; + if (options) { + arrow_properties.set_batch_size(options->batch_size); + } + + std::unique_ptr arrow_reader; + RETURN_NOT_OK(parquet::arrow::FileReader::Make( + pool, std::move(reader), std::move(arrow_properties), &arrow_reader)); + return std::move(arrow_reader); } Result ParquetFileFormat::ScanFile( @@ -380,77 +317,318 @@ Result ParquetFileFormat::ScanFile( return ScanFile(source, std::move(options), std::move(context), {}); } +static inline bool RowGroupInfosAreComplete(const std::vector& infos) { + return !infos.empty() && + std::all_of(infos.cbegin(), infos.cend(), + [](const RowGroupInfo& i) { return i.HasStatistics(); }); +} + +static inline std::vector FilterRowGroups( + std::vector row_groups, const Expression& predicate) { + auto filter = [&predicate](const RowGroupInfo& info) { + return !info.Satisfy(predicate); + }; + auto end = std::remove_if(row_groups.begin(), row_groups.end(), filter); + row_groups.erase(end, row_groups.end()); + return row_groups; +} + +static inline Result> AugmentRowGroups( + std::vector row_groups, parquet::arrow::FileReader* reader) { + auto metadata = reader->parquet_reader()->metadata(); + auto manifest = reader->manifest(); + auto num_row_groups = metadata->num_row_groups(); + + if (row_groups.empty()) { + row_groups = RowGroupInfo::FromCount(num_row_groups); + } + + // Augment a RowGroup with statistics if missing. + auto augment = [&](RowGroupInfo& info) { + if (!info.HasStatistics() && info.id() < num_row_groups) { + auto row_group = metadata->RowGroup(info.id()); + info.set_num_rows(row_group->num_rows()); + info.set_statistics(RowGroupStatisticsAsExpression(*row_group, manifest)); + } + }; + std::for_each(row_groups.begin(), row_groups.end(), augment); + + return row_groups; +} + Result ParquetFileFormat::ScanFile( const FileSource& source, std::shared_ptr options, - std::shared_ptr context, const std::vector& row_groups) const { - auto properties = MakeReaderProperties(*this, context->pool); - ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, std::move(properties))); + std::shared_ptr context, std::vector row_groups) const { + bool row_groups_are_complete = RowGroupInfosAreComplete(row_groups); + // The following block is required to avoid any IO if all RowGroups are + // excluded due to prior statistics knowledge. + if (row_groups_are_complete) { + // Apply a pre-filtering if the user requested an explicit sub-set of + // row-groups. In the case where a RowGroup doesn't have statistics + // metdata, it will not be excluded. + row_groups = FilterRowGroups(std::move(row_groups), *options->filter); + if (row_groups.empty()) { + return MakeEmptyIterator>(); + } + } + + // Open the reader and pay the real IO cost. + ARROW_ASSIGN_OR_RAISE(auto reader, GetReader(source, options.get(), context.get())); - for (int i : row_groups) { - if (i >= reader->metadata()->num_row_groups()) { - return Status::IndexError("trying to scan row group ", i, " but ", source.path(), - " only has ", reader->metadata()->num_row_groups(), + // Ensure RowGroups are indexing valid RowGroups before augmenting. + auto num_row_groups = reader->num_row_groups(); + for (const auto& row_group : row_groups) { + if (row_group.id() >= num_row_groups) { + return Status::IndexError("Trying to scan row group ", row_group.id(), " but ", + source.path(), " only has ", num_row_groups, " row groups"); } } - auto arrow_properties = MakeArrowReaderProperties(*this, options->batch_size, *reader); - return ParquetScanTaskIterator::Make(std::move(options), std::move(context), - std::move(reader), std::move(arrow_properties), - row_groups); + if (!row_groups_are_complete) { + ARROW_ASSIGN_OR_RAISE(row_groups, + AugmentRowGroups(std::move(row_groups), reader.get())); + row_groups = FilterRowGroups(std::move(row_groups), *options->filter); + } + + if (row_groups.empty()) { + return MakeEmptyIterator>(); + } + + return ParquetScanTaskIterator::Make(std::move(options), std::move(context), source, + std::move(reader), std::move(row_groups)); } Result> ParquetFileFormat::MakeFragment( FileSource source, std::shared_ptr partition_expression, - std::vector row_groups) { + std::vector row_groups) { return std::shared_ptr( new ParquetFileFragment(std::move(source), shared_from_this(), std::move(partition_expression), std::move(row_groups))); } +Result> ParquetFileFormat::MakeFragment( + FileSource source, std::shared_ptr partition_expression, + std::vector row_groups) { + return std::shared_ptr(new ParquetFileFragment( + std::move(source), shared_from_this(), std::move(partition_expression), + RowGroupInfo::FromIdentifiers(row_groups))); +} + Result> ParquetFileFormat::MakeFragment( FileSource source, std::shared_ptr partition_expression) { return std::shared_ptr(new ParquetFileFragment( std::move(source), shared_from_this(), std::move(partition_expression), {})); } -Result ParquetFileFormat::GetRowGroupFragments( - const ParquetFileFragment& fragment, std::shared_ptr filter) { - auto properties = MakeReaderProperties(*this); - ARROW_ASSIGN_OR_RAISE(auto reader, - OpenReader(fragment.source(), std::move(properties))); +/// +/// RowGroupInfo +/// - auto arrow_properties = - MakeArrowReaderProperties(*this, parquet::kArrowDefaultBatchSize, *reader); - auto metadata = reader->metadata(); - - auto row_groups = fragment.row_groups(); - if (row_groups.empty()) { - row_groups = internal::Iota(metadata->num_row_groups()); +std::vector RowGroupInfo::FromIdentifiers(const std::vector ids) { + std::vector results; + results.reserve(ids.size()); + for (auto i : ids) { + results.emplace_back(i); } - FragmentVector fragments(row_groups.size()); - - RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties), - std::move(filter), std::move(row_groups)); + return results; +} - for (int i = 0, row_group = skipper.Next(); - row_group != RowGroupSkipper::kIterationDone; row_group = skipper.Next()) { - ARROW_ASSIGN_OR_RAISE( - fragments[i++], - MakeFragment(fragment.source(), fragment.partition_expression(), {row_group})); +std::vector RowGroupInfo::FromCount(int count) { + std::vector result; + result.reserve(count); + for (int i = 0; i < count; i++) { + result.emplace_back(i); } + return result; +} - return MakeVectorIterator(std::move(fragments)); +bool RowGroupInfo::Satisfy(const Expression& predicate) const { + return !HasStatistics() || predicate.IsSatisfiableWith(statistics_); } +/// +/// ParquetFileFragment +/// + +ParquetFileFragment::ParquetFileFragment(FileSource source, + std::shared_ptr format, + std::shared_ptr partition_expression, + std::vector row_groups) + : FileFragment(std::move(source), std::move(format), std::move(partition_expression)), + row_groups_(std::move(row_groups)), + parquet_format_(internal::checked_cast(*format_)), + has_complete_metadata_(RowGroupInfosAreComplete(row_groups_)) {} + Result ParquetFileFragment::Scan(std::shared_ptr options, std::shared_ptr context) { - return parquet_format().ScanFile(source_, std::move(options), std::move(context), - row_groups_); + return parquet_format_.ScanFile(source_, std::move(options), std::move(context), + row_groups_); +} + +Result ParquetFileFragment::SplitByRowGroup( + const std::shared_ptr& predicate) { + std::vector row_groups; + if (HasCompleteMetadata()) { + row_groups = FilterRowGroups(row_groups_, *predicate); + } else { + ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_)); + ARROW_ASSIGN_OR_RAISE(row_groups, AugmentRowGroups(row_groups_, reader.get())); + row_groups = FilterRowGroups(std::move(row_groups), *predicate); + } + + FragmentVector fragments; + fragments.reserve(row_groups.size()); + for (auto&& row_group : row_groups) { + ARROW_ASSIGN_OR_RAISE(auto fragment, + parquet_format_.MakeFragment(source_, partition_expression(), + {std::move(row_group)})); + fragments.push_back(std::move(fragment)); + } + + return fragments; +} + +/// +/// ParquetDatasetFactory +/// + +ParquetDatasetFactory::ParquetDatasetFactory( + std::shared_ptr filesystem, std::shared_ptr format, + std::shared_ptr metadata, std::string base_path) + : filesystem_(std::move(filesystem)), + format_(std::move(format)), + metadata_(std::move(metadata)), + base_path_(std::move(base_path)) {} + +Result> ParquetDatasetFactory::Make( + const std::string& metadata_path, std::shared_ptr filesystem, + std::shared_ptr format) { + // Paths in ColumnChunk are relative to the `_metadata` file. Thus, the base + // directory of all parquet files is `dirname(metadata_path)`. + auto dirname = arrow::fs::internal::GetAbstractPathParent(metadata_path).first; + return Make({metadata_path, filesystem}, dirname, filesystem, format); } -const ParquetFileFormat& ParquetFileFragment::parquet_format() const { - return internal::checked_cast(*format_); +Result> ParquetDatasetFactory::Make( + const FileSource& metadata_source, const std::string& base_path, + std::shared_ptr filesystem, + std::shared_ptr format) { + DCHECK_NE(filesystem, nullptr); + DCHECK_NE(format, nullptr); + + ARROW_ASSIGN_OR_RAISE(auto reader, format->GetReader(metadata_source)); + auto metadata = reader->parquet_reader()->metadata(); + + return std::shared_ptr(new ParquetDatasetFactory( + std::move(filesystem), std::move(format), std::move(metadata), base_path)); +} + +Result>> ParquetDatasetFactory::InspectSchemas( + InspectOptions options) { + std::shared_ptr schema; + RETURN_NOT_OK(parquet::arrow::FromParquetSchema(metadata_->schema(), &schema)); + return std::vector>{schema}; +} + +static Result FileFromRowGroup(const std::string& base_path, + const parquet::RowGroupMetaData& row_group) { + try { + auto n_columns = row_group.num_columns(); + if (n_columns == 0) { + return Status::Invalid( + "Extracting file path from RowGroup failed. RowGroup must have a least one " + "columns to extract path"); + } + + auto first_column = row_group.ColumnChunk(0); + auto path = first_column->file_path(); + if (path == "") { + return Status::Invalid( + "Extracting file path from RowGroup failed. The column chunks " + "file path should be set, but got an empty file path."); + } + + for (int i = 1; i < n_columns; i++) { + auto column = row_group.ColumnChunk(i); + auto column_path = column->file_path(); + if (column_path != path) { + return Status::Invalid("Extracting file path from RowGroup failed. Path '", + column_path, "' not equal to path '", path, + ", for ColumnChunk at index ", i, + "; ColumnChunks in a RowGroup must have the same path."); + } + } + + return fs::internal::JoinAbstractPath(std::vector{base_path, path}); + } catch (const ::parquet::ParquetException& e) { + return Status::Invalid("Extracting file path from RowGroup failed. Parquet threw:", + e.what()); + } +} + +Result>> +ParquetDatasetFactory::CollectParquetFragments( + const parquet::FileMetaData& metadata, + const parquet::ArrowReaderProperties& properties) { + try { + auto n_columns = metadata.num_columns(); + if (n_columns == 0) { + return Status::Invalid( + "ParquetDatasetFactory must contain a schema with at least one column"); + } + + std::unordered_map> path_to_row_group_infos; + + ARROW_ASSIGN_OR_RAISE(auto manifest, GetSchemaManifest(metadata, properties)); + + for (int i = 0; i < metadata.num_row_groups(); i++) { + auto row_group = metadata.RowGroup(i); + ARROW_ASSIGN_OR_RAISE(auto path, FileFromRowGroup(base_path_, *row_group)); + // Normalizing path is required for Windows. + ARROW_ASSIGN_OR_RAISE(path, filesystem_->NormalizePath(std::move(path))); + auto stats = RowGroupStatisticsAsExpression(*row_group, manifest); + auto num_rows = row_group->num_rows(); + + // Insert the path, or increase the count of row groups. It will be + // assumed that the RowGroup of a file are ordered exactly like in + // the metadata file. + auto elem_and_inserted = + path_to_row_group_infos.insert({path, {{0, num_rows, stats}}}); + if (!elem_and_inserted.second) { + auto& path_and_count = *elem_and_inserted.first; + auto& row_groups = path_and_count.second; + auto row_group_id = static_cast(row_groups.size()); + path_and_count.second.emplace_back(row_group_id, num_rows, stats); + } + } + + std::vector> fragments; + fragments.reserve(path_to_row_group_infos.size()); + for (auto&& elem : path_to_row_group_infos) { + ARROW_ASSIGN_OR_RAISE(auto fragment, + format_->MakeFragment({std::move(elem.first), filesystem_}, + scalar(true), std::move(elem.second))); + fragments.push_back(std::move(fragment)); + } + + return fragments; + } catch (const ::parquet::ParquetException& e) { + return Status::Invalid("Could not infer file paths from FileMetaData:", e.what()); + } +} + +Result> ParquetDatasetFactory::Finish(FinishOptions options) { + std::shared_ptr schema = options.schema; + bool schema_missing = schema == nullptr; + if (schema_missing) { + ARROW_ASSIGN_OR_RAISE(schema, Inspect(options.inspect_options)); + } + + auto properties = MakeArrowReaderProperties(*format_, *metadata_); + ARROW_ASSIGN_OR_RAISE(auto fragments, CollectParquetFragments(*metadata_, properties)); + return FileSystemDataset::Make(std::move(schema), scalar(true), format_, + std::move(fragments)); } } // namespace dataset diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index 574e9b0824c..6ac8a1b49c1 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -25,6 +25,7 @@ #include #include +#include "arrow/dataset/discovery.h" #include "arrow/dataset/file_base.h" #include "arrow/dataset/type_fwd.h" #include "arrow/dataset/visibility.h" @@ -36,11 +37,16 @@ class FileMetaData; class FileDecryptionProperties; class ReaderProperties; class ArrowReaderProperties; +namespace arrow { +class FileReader; +}; // namespace arrow } // namespace parquet namespace arrow { namespace dataset { +class RowGroupInfo; + /// \brief A FileFormat implementation that reads from Parquet files class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { public: @@ -97,53 +103,177 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { Result ScanFile(const FileSource& source, std::shared_ptr options, std::shared_ptr context, - const std::vector& row_groups) const; + std::vector row_groups) const; using FileFormat::MakeFragment; + /// \brief Create a Fragment, restricted to the specified row groups. Result> MakeFragment( - FileSource source, std::shared_ptr partition_expression) override; + FileSource source, std::shared_ptr partition_expression, + std::vector row_groups); - /// \brief Create a Fragment, restricted to the specified row groups. Result> MakeFragment( FileSource source, std::shared_ptr partition_expression, std::vector row_groups); - /// \brief Split a ParquetFileFragment into a Fragment for each row group. + /// \brief Create a Fragment targeting all RowGroups. + Result> MakeFragment( + FileSource source, std::shared_ptr partition_expression) override; + + /// \brief Return a FileReader on the given source. + Result> GetReader( + const FileSource& source, ScanOptions* = NULLPTR, ScanContext* = NULLPTR) const; +}; + +/// \brief Represents a parquet's RowGroup with extra information. +class ARROW_DS_EXPORT RowGroupInfo : public util::EqualityComparable { + public: + RowGroupInfo() : RowGroupInfo(-1) {} + + /// \brief Construct a RowGroup from an identifier. + explicit RowGroupInfo(int id) : RowGroupInfo(id, -1, NULLPTR) {} + + /// \brief Construct a RowGroup from an identifier with statistics. + RowGroupInfo(int id, int64_t num_rows, std::shared_ptr statistics) + : id_(id), num_rows_(num_rows), statistics_(std::move(statistics)) {} + + /// \brief Transform a vector of identifiers into a vector of RowGroupInfos + static std::vector FromIdentifiers(const std::vector ids); + static std::vector FromCount(int count); + + /// \brief Return the RowGroup's identifier (index in the file). + int id() const { return id_; } + + /// \brief Return the RowGroup's number of rows. /// - /// \param[in] fragment to split - /// \param[in] filter expression that will ignore RowGroup that can't satisfy - /// the filter. + /// If statistics are not provided, return -1. + int64_t num_rows() const { return num_rows_; } + void set_num_rows(int64_t num_rows) { num_rows_ = num_rows; } + + /// \brief Return the RowGroup's statistics + const std::shared_ptr& statistics() const { return statistics_; } + void set_statistics(std::shared_ptr statistics) { + statistics_ = std::move(statistics); + } + + /// \brief Indicate if statistics are set. + bool HasStatistics() const { return statistics_ != NULLPTR; } + + /// \brief Indicate if the RowGroup's statistics satisfy the predicate. /// - /// \return An iterator of fragment. - Result GetRowGroupFragments( - const ParquetFileFragment& fragment, - std::shared_ptr filter = scalar(true)); + /// This will return true if the RowGroup was not initialized with statistics + /// (rather than silently reading metadata for a complete check). + bool Satisfy(const Expression& predicate) const; + + /// \brief Indicate if the other RowGroup points to the same RowGroup. + bool Equals(const RowGroupInfo& other) const { return id() == other.id(); } + + private: + int id_; + int64_t num_rows_; + std::shared_ptr statistics_; }; +/// \brief A FileFragment with parquet logic. +/// +/// ParquetFileFragment provides a lazy (with respect to IO) interface to +/// scan parquet files. Any heavy IO calls are deferred to the Scan() method. +/// +/// The caller can provide an optional list of selected RowGroups to limit the +/// number of scanned RowGroups, or to partition the scans across multiple +/// threads. +/// +/// It can also attach optional statistics with each RowGroups, providing +/// pushdown predicate benefits before invoking any heavy IO. This can induce +/// significant performance boost when scanning high latency file systems. class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment { public: Result Scan(std::shared_ptr options, std::shared_ptr context) override; - /// \brief The row groups viewed by this Fragment. This may be empty which signifies all - /// row groups are selected. - const std::vector& row_groups() const { return row_groups_; } + Result SplitByRowGroup(const std::shared_ptr& predicate); + + /// \brief Return the RowGroups selected by this fragment. An empty list + /// represents all RowGroups in the parquet file. + const std::vector& row_groups() const { return row_groups_; } + + /// \brief Indicate if the attached statistics are complete. + /// + /// The statistics are complete if the provided RowGroups (see `row_groups()`) + /// is not empty / and all RowGroup return true on `RowGroup::HasStatistics()`. + bool HasCompleteMetadata() const { return has_complete_metadata_; } private: ParquetFileFragment(FileSource source, std::shared_ptr format, std::shared_ptr partition_expression, - std::vector row_groups) - : FileFragment(std::move(source), std::move(format), - std::move(partition_expression)), - row_groups_(std::move(row_groups)) {} + std::vector row_groups); - const ParquetFileFormat& parquet_format() const; - - std::vector row_groups_; + std::vector row_groups_; + ParquetFileFormat& parquet_format_; + bool has_complete_metadata_; friend class ParquetFileFormat; }; +/// \brief Create FileSystemDataset from custom `_metadata` cache file. +/// +/// Dask and other systems will generate a cache metadata file by concatenating +/// the RowGroupMetaData of multiple parquet files into a single parquet file +/// that only contains metadata and no ColumnChunk data. +/// +/// ParquetDatasetFactory creates a FileSystemDataset composed of +/// ParquetFileFragment where each fragment is pre-populated with the exact +/// number of row groups and statistics for each columns. +class ARROW_DS_EXPORT ParquetDatasetFactory : public DatasetFactory { + public: + /// \brief Create a ParquetDatasetFactory from a metadata path. + /// + /// The `metadata_path` will be read from `filesystem`. Each RowGroup + /// contained in the metadata file will be relative to `dirname(metadata_path)`. + /// + /// \param[in] metadata_path path of the metadata parquet file + /// \param[in] filesystem from which to open/read the path + /// \param[in] format to read the file with. + static Result> Make( + const std::string& metadata_path, std::shared_ptr filesystem, + std::shared_ptr format); + + /// \brief Create a ParquetDatasetFactory from a metadata source. + /// + /// Similar to the previous Make definition, but the metadata can be a Buffer + /// and the base_path is explicited instead of inferred from the metadata + /// path. + /// + /// \param[in] metadata source to open the metadata parquet file from + /// \param[in] base_path used as the prefix of every parquet files referenced + /// \param[in] filesystem from which to read the files referenced. + /// \param[in] format to read the file with. + static Result> Make( + const FileSource& metadata, const std::string& base_path, + std::shared_ptr filesystem, + std::shared_ptr format); + + Result>> InspectSchemas( + InspectOptions options) override; + + Result> Finish(FinishOptions options) override; + + protected: + ParquetDatasetFactory(std::shared_ptr fs, + std::shared_ptr format, + std::shared_ptr metadata, + std::string base_path); + + std::shared_ptr filesystem_; + std::shared_ptr format_; + std::shared_ptr metadata_; + std::string base_path_; + + private: + Result>> CollectParquetFragments( + const parquet::FileMetaData& metadata, + const parquet::ArrowReaderProperties& properties); +}; + } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc index 70b0f02fcf0..7cb81c21038 100644 --- a/cpp/src/arrow/dataset/file_parquet_test.cc +++ b/cpp/src/arrow/dataset/file_parquet_test.cc @@ -183,18 +183,16 @@ class TestParquetFileFormat : public ArrowParquetWriterMixin { std::vector expected_row_groups, const Expression& filter) { auto parquet_fragment = checked_pointer_cast(fragment); - ASSERT_OK_AND_ASSIGN(auto row_group_fragments, - format_->GetRowGroupFragments(*parquet_fragment, filter.Copy())); + ASSERT_OK_AND_ASSIGN(auto fragments, parquet_fragment->SplitByRowGroup(filter.Copy())) - auto expected_row_group = expected_row_groups.begin(); - for (auto maybe_fragment : row_group_fragments) { - ASSERT_OK_AND_ASSIGN(auto fragment, std::move(maybe_fragment)); - auto parquet_fragment = checked_pointer_cast(fragment); + EXPECT_EQ(fragments.size(), expected_row_groups.size()); + for (size_t i = 0; i < fragments.size(); i++) { + auto expected = expected_row_groups[i]; + auto parquet_fragment = checked_pointer_cast(fragments[i]); - auto i = *expected_row_group++; - EXPECT_EQ(parquet_fragment->row_groups(), std::vector{i}); - - EXPECT_EQ(SingleBatch(parquet_fragment.get())->num_rows(), i + 1); + EXPECT_EQ(parquet_fragment->row_groups(), + RowGroupInfo::FromIdentifiers({expected})); + EXPECT_EQ(SingleBatch(parquet_fragment.get())->num_rows(), expected + 1); } } @@ -434,7 +432,6 @@ TEST_F(TestParquetFileFormat, PredicatePushdown) { TEST_F(TestParquetFileFormat, PredicatePushdownRowGroupFragments) { constexpr int64_t kNumRowGroups = 16; - constexpr int64_t kTotalNumRows = kNumRowGroups * (kNumRowGroups + 1) / 2; auto reader = ArithmeticDatasetFixture::GetRecordBatchReader(kNumRowGroups); auto source = GetFileSource(reader.get()); @@ -442,7 +439,7 @@ TEST_F(TestParquetFileFormat, PredicatePushdownRowGroupFragments) { opts_ = ScanOptions::Make(reader->schema()); ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); - CountRowGroupsInFragment(fragment, internal::Iota(static_cast(kTotalNumRows)), + CountRowGroupsInFragment(fragment, internal::Iota(static_cast(kNumRowGroups)), *scalar(true)); for (int i = 0; i < kNumRowGroups; ++i) { @@ -466,7 +463,7 @@ TEST_F(TestParquetFileFormat, PredicatePushdownRowGroupFragments) { CountRowGroupsInFragment(fragment, internal::Iota(5, static_cast(kNumRowGroups)), "i64"_ >= int64_t(6)); - CountRowGroupsInFragment(fragment, {5, 6, 7}, + CountRowGroupsInFragment(fragment, {5, 6}, "i64"_ >= int64_t(6) and "i64"_ < int64_t(8)); } @@ -492,7 +489,7 @@ TEST_F(TestParquetFileFormat, ExplicitRowGroupSelection) { // individual selection selects a single row group for (int i = 0; i < kNumRowGroups; ++i) { CountRowsAndBatchesInScan(row_groups_fragment({i}), i + 1, 1); - EXPECT_EQ(row_groups_fragment({i})->row_groups(), std::vector{i}); + EXPECT_EQ(row_groups_fragment({i})->row_groups(), RowGroupInfo::FromIdentifiers({i})); } for (int i = 0; i < kNumRowGroups; ++i) { diff --git a/cpp/src/arrow/dataset/filter.h b/cpp/src/arrow/dataset/filter.h index fa6a4a5535b..9d476f5e9a5 100644 --- a/cpp/src/arrow/dataset/filter.h +++ b/cpp/src/arrow/dataset/filter.h @@ -198,6 +198,10 @@ class ARROW_DS_EXPORT Expression { /// /// This behaves like IsSatisfiable, but it simplifies the current expression /// with the given `other` information. + bool IsSatisfiableWith(const Expression& other) const { + return Assume(other)->IsSatisfiable(); + } + bool IsSatisfiableWith(const std::shared_ptr& other) const { return Assume(other)->IsSatisfiable(); } diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index 97b2daf9e61..22075241eaa 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -177,32 +177,38 @@ static inline RecordBatchVector FlattenRecordBatchVector( return flattened; } +struct TableAssemblyState { + /// Protecting mutating accesses to batches + std::mutex mutex{}; + std::vector batches{}; + + void Emplace(RecordBatchVector b, size_t position) { + std::lock_guard lock(mutex); + if (batches.size() <= position) { + batches.resize(position + 1); + } + batches[position] = std::move(b); + } +}; + Result> Scanner::ToTable() { ARROW_ASSIGN_OR_RAISE(auto scan_task_it, Scan()); auto task_group = scan_context_->TaskGroup(); - // Protecting mutating accesses to batches - std::mutex mutex; - std::vector batches; + /// Wraps the state in a shared_ptr to ensure that failing ScanTasks don't + /// invalidate concurrently running tasks when Finish() early returns + /// and the mutex/batches fail out of scope. + auto state = std::make_shared(); + size_t scan_task_id = 0; for (auto maybe_scan_task : scan_task_it) { ARROW_ASSIGN_OR_RAISE(auto scan_task, std::move(maybe_scan_task)); auto id = scan_task_id++; - task_group->Append([&batches, &mutex, id, scan_task] { + task_group->Append([state, id, scan_task] { ARROW_ASSIGN_OR_RAISE(auto batch_it, scan_task->Execute()); - ARROW_ASSIGN_OR_RAISE(auto local, batch_it.ToVector()); - - { - // Move into global batches. - std::lock_guard lock(mutex); - if (batches.size() <= id) { - batches.resize(id + 1); - } - batches[id] = std::move(local); - } - + state->Emplace(std::move(local), id); return Status::OK(); }); } @@ -211,7 +217,7 @@ Result> Scanner::ToTable() { RETURN_NOT_OK(task_group->Finish()); return Table::FromRecordBatches(scan_options_->schema(), - FlattenRecordBatchVector(std::move(batches))); + FlattenRecordBatchVector(std::move(state->batches))); } } // namespace dataset diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h index 75a45618244..cc9ae3abad8 100644 --- a/cpp/src/arrow/dataset/test_util.h +++ b/cpp/src/arrow/dataset/test_util.h @@ -449,8 +449,8 @@ struct ArithmeticDatasetFixture { std::stringstream ss; ss << "[\n"; - for (int64_t i = 0; i < n; i++) { - if (i != 0) { + for (int64_t i = 1; i <= n; i++) { + if (i != 1) { ss << "\n,"; } ss << record; diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 129ee844db9..2a3cf11f182 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -280,6 +280,10 @@ class FileReaderImpl : public FileReader { reader_properties_.set_use_threads(use_threads); } + const ArrowReaderProperties& properties() const override { return reader_properties_; } + + const SchemaManifest& manifest() const override { return manifest_; } + Status ScanContents(std::vector columns, const int32_t column_batch_size, int64_t* num_rows) override { BEGIN_PARQUET_CATCH_EXCEPTIONS diff --git a/cpp/src/parquet/arrow/reader.h b/cpp/src/parquet/arrow/reader.h index 832054854a4..858ff4d4297 100644 --- a/cpp/src/parquet/arrow/reader.h +++ b/cpp/src/parquet/arrow/reader.h @@ -46,6 +46,7 @@ namespace arrow { class ColumnChunkReader; class ColumnReader; +struct SchemaManifest; class RowGroupReader; /// \brief Arrow read adapter class for deserializing Parquet files as Arrow row batches. @@ -211,6 +212,10 @@ class PARQUET_EXPORT FileReader { /// By default only one thread is used. virtual void set_use_threads(bool use_threads) = 0; + virtual const ArrowReaderProperties& properties() const = 0; + + virtual const SchemaManifest& manifest() const = 0; + virtual ~FileReader() = default; }; diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc index 1536fb4745c..aa0a28a8a51 100644 --- a/cpp/src/parquet/arrow/reader_internal.cc +++ b/cpp/src/parquet/arrow/reader_internal.cc @@ -747,8 +747,7 @@ Status TypedIntegralStatisticsAsScalars(const Statistics& statistics, using CType = typename StatisticsType::T; return MakeMinMaxScalar(statistics, min, max); default: - return Status::NotImplemented("Cannot extract statistics for type ", - logical_type->ToString()); + return Status::NotImplemented("Cannot extract statistics for type "); } return Status::OK(); diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc index 8f424aa68ce..9d21c62cae1 100644 --- a/cpp/src/parquet/metadata.cc +++ b/cpp/src/parquet/metadata.cc @@ -652,6 +652,10 @@ class FileMetaData::FileMetaDataImpl { } void AppendRowGroups(const std::unique_ptr& other) { + if (!schema()->Equals(*other->schema())) { + throw ParquetException("AppendRowGroups requires equal schemas."); + } + format::RowGroup other_rg; for (int i = 0; i < other->num_row_groups(); i++) { other_rg = other->row_group(i); diff --git a/cpp/src/parquet/metadata.h b/cpp/src/parquet/metadata.h index c1539f5d3d8..0fd5f94cc62 100644 --- a/cpp/src/parquet/metadata.h +++ b/cpp/src/parquet/metadata.h @@ -258,10 +258,26 @@ class PARQUET_EXPORT FileMetaData { const std::shared_ptr& key_value_metadata() const; - // Set file_path ColumnChunk fields to a particular value + /// \brief Set a path to all ColumnChunk for all RowGroups. + /// + /// Commonly used by systems (Dask, Spark) who generates an metadata-only + /// parquet file. The path is usually relative to said index file. + /// + /// \param[in] path to set. void set_file_path(const std::string& path); - // Merge row-group metadata from "other" FileMetaData object + /// \brief Merge row groups from another metadata file into this one. + /// + /// The schema of the input FileMetaData must be equal to the + /// schema of this object. + /// + /// This is used by systems who creates an aggregate metadata-only file by + /// concatenating the row groups of multiple files. This newly created + /// metadata file acts as an index of all available row groups. + /// + /// \param[in] other FileMetaData to merge the row groups from. + /// + /// \throws ParquetException if schemas are not equal. void AppendRowGroups(const FileMetaData& other); private: diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 2190009f0bb..1eba6abb5fc 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -758,6 +758,42 @@ cdef class FileFragment(Fragment): return FileFormat.wrap(self.file_fragment.format()) +cdef class RowGroupInfo: + """A wrapper class for RowGroup information""" + + cdef: + CRowGroupInfo info + + def __init__(self, int id): + cdef CRowGroupInfo info = CRowGroupInfo(id) + self.init(info) + + cdef void init(self, CRowGroupInfo info): + self.info = info + + @staticmethod + cdef wrap(CRowGroupInfo info): + cdef RowGroupInfo self = RowGroupInfo.__new__(RowGroupInfo) + self.init(info) + return self + + @property + def id(self): + return self.info.id() + + @property + def num_rows(self): + return self.info.num_rows() + + def __eq__(self, other): + if not isinstance(other, RowGroupInfo): + return False + cdef: + RowGroupInfo row_group = other + CRowGroupInfo c_info = row_group.info + return self.info.Equals(c_info) + + cdef class ParquetFileFragment(FileFragment): """A Fragment representing a parquet file.""" @@ -770,32 +806,42 @@ cdef class ParquetFileFragment(FileFragment): @property def row_groups(self): - row_groups = set(self.parquet_file_fragment.row_groups()) - if len(row_groups) != 0: - return row_groups - return None + cdef: + vector[CRowGroupInfo] c_row_groups + c_row_groups = self.parquet_file_fragment.row_groups() + if c_row_groups.empty(): + return None + return [RowGroupInfo.wrap(row_group) for row_group in c_row_groups] - def get_row_group_fragments(self, Expression extra_filter=None): + def split_by_row_group(self, Expression predicate=None): """ + Split the fragment into multiple fragments. + Yield a Fragment wrapping each row group in this ParquetFileFragment. - Row groups will be excluded whose metadata contradicts the either the - filter provided on construction of this Fragment or the extra_filter - argument. + Row groups will be excluded whose metadata contradicts the optional + predicate. + + Parameters + ---------- + predicate : Expression, default None + Exclude RowGroups whose statistics contradicts the predicate. + + Returns + ------- + A list of Fragment. """ cdef: - CParquetFileFormat* c_format - CFragmentIterator c_fragments - shared_ptr[CExpression] c_extra_filter + vector[shared_ptr[CFragment]] c_fragments + shared_ptr[CExpression] c_predicate + shared_ptr[CFragment] c_fragment schema = self.physical_schema - c_extra_filter = _insert_implicit_casts(extra_filter, schema) - c_format = self.file_fragment.format().get() - c_fragments = move(GetResultValue(c_format.GetRowGroupFragments(deref( - self.parquet_file_fragment), move(c_extra_filter)))) - - for maybe_fragment in c_fragments: - yield Fragment.wrap(GetResultValue(move(maybe_fragment))) + c_predicate = _insert_implicit_casts(predicate, schema) + with nogil: + c_fragments = move(GetResultValue( + self.parquet_file_fragment.SplitByRowGroup(move(c_predicate)))) + return [Fragment.wrap(c_fragment) for c_fragment in c_fragments] cdef class ParquetReadOptions: """ @@ -1446,6 +1492,47 @@ cdef class UnionDatasetFactory(DatasetFactory): self.union_factory = sp.get() +cdef class ParquetDatasetFactory(DatasetFactory): + """ + Create a ParquetDatasetFactory from a Parquet `_metadata` file. + + Parameters + ---------- + metadata_path : str + Path to the `_metadata` parquet metadata-only file generated with + `pyarrow.parquet.write_metadata`. + filesystem : pyarrow.fs.FileSystem + Filesystem to read the metadata_path from, and subsequent parquet + files. + format : ParquetFileFormat + Parquet format options. + """ + + cdef: + CParquetDatasetFactory* parquet_factory + + def __init__(self, metadata_path, FileSystem filesystem not None, + FileFormat format not None): + cdef: + c_string path + shared_ptr[CFileSystem] c_filesystem + shared_ptr[CParquetFileFormat] c_format + CResult[shared_ptr[CDatasetFactory]] result + + c_path = tobytes(metadata_path) + c_filesystem = filesystem.unwrap() + c_format = static_pointer_cast[CParquetFileFormat, CFileFormat]( + format.unwrap()) + + result = CParquetDatasetFactory.MakeFromMetaDataPath( + c_path, c_filesystem, c_format) + self.init(GetResultValue(result)) + + cdef init(self, shared_ptr[CDatasetFactory]& sp): + DatasetFactory.init(self, sp) + self.parquet_factory = sp.get() + + cdef class ScanTask: """Read record batches from a range of a single data fragment. diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd index 2b370b33b55..5b1317eb0b4 100644 --- a/python/pyarrow/_parquet.pxd +++ b/python/pyarrow/_parquet.pxd @@ -312,7 +312,7 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil: int num_schema_elements() void set_file_path(const c_string& path) - void AppendRowGroups(const CFileMetaData& other) + void AppendRowGroups(const CFileMetaData& other) except + unique_ptr[CRowGroupMetaData] RowGroup(int i) const SchemaDescriptor* schema() diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 189d02ab759..8c5825080d0 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -35,11 +35,13 @@ Fragment, HivePartitioning, IpcFileFormat, + ParquetDatasetFactory, ParquetFileFormat, ParquetFileFragment, ParquetReadOptions, Partitioning, PartitioningFactory, + RowGroupInfo, Scanner, ScanTask, UnionDataset, @@ -443,6 +445,52 @@ def _union_dataset(children, schema=None, **kwargs): return UnionDataset(schema, children) +def parquet_dataset(metadata_path, schema=None, filesystem=None, format=None): + """ + Create a FileSystemDataset from a `_metadata` file created via + `pyarrrow.parquet.write_metadata`. + + Parameters + ---------- + metadata_path : path, + Path pointing to a single file parquet metadata file + schema : Schema, optional + Optionally provide the Schema for the Dataset, in which case it will + not be inferred from the source. + filesystem : FileSystem or URI string, default None + If a single path is given as source and filesystem is None, then the + filesystem will be inferred from the path. + If an URI string is passed, then a filesystem object is constructed + using the URI's optional path component as a directory prefix. See the + examples below. + Note that the URIs on Windows must follow 'file:///C:...' or + 'file:/C:...' patterns. + format : ParquetFileFormat + An instance of a ParquetFileFormat if special options needs to be + passed. + + Returns + ------- + FileSystemDataset + """ + from pyarrow.fs import LocalFileSystem + + if format is None: + format = ParquetFileFormat() + elif not isinstance(format, ParquetFileFormat): + raise ValueError("format argument must be a ParquetFileFormat") + + if filesystem is None: + filesystem = LocalFileSystem() + else: + filesystem, _ = _ensure_filesystem(filesystem) + + metadata_path = _normalize_path(filesystem, _stringify_path(metadata_path)) + + factory = ParquetDatasetFactory(metadata_path, filesystem, format) + return factory.finish(schema) + + def dataset(source, schema=None, format=None, filesystem=None, partitioning=None, partition_base_dir=None, exclude_invalid_files=None, ignore_prefixes=None): diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index 04938928492..56140bde5a5 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -209,9 +209,20 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: const CFileSource& source() const const shared_ptr[CFileFormat]& format() const + cdef cppclass CRowGroupInfo "arrow::dataset::RowGroupInfo": + CRowGroupInfo() + CRowGroupInfo(int id) + CRowGroupInfo( + int id, int64_t n_rows, shared_ptr[CExpression] statistics) + int id() const + int64_t num_rows() const + bint Equals(const CRowGroupInfo& other) + cdef cppclass CParquetFileFragment "arrow::dataset::ParquetFileFragment"( CFileFragment): - const vector[int]& row_groups() const + const vector[CRowGroupInfo]& row_groups() const + CResult[vector[shared_ptr[CFragment]]] SplitByRowGroup( + shared_ptr[CExpression] predicate) cdef cppclass CFileSystemDataset \ "arrow::dataset::FileSystemDataset"(CDataset): @@ -234,9 +245,6 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: cdef cppclass CParquetFileFormat "arrow::dataset::ParquetFileFormat"( CFileFormat): CParquetFileFormatReaderOptions reader_options - CResult[CFragmentIterator] GetRowGroupFragments( - const CParquetFileFragment&, - shared_ptr[CExpression] extra_filter) CResult[shared_ptr[CFileFragment]] MakeFragment( CFileSource source, shared_ptr[CExpression] partition_expression, @@ -313,3 +321,20 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: shared_ptr[CFileFormat] format, CFileSystemFactoryOptions options ) + + cdef cppclass CParquetDatasetFactory \ + "arrow::dataset::ParquetDatasetFactory"(CDatasetFactory): + @staticmethod + CResult[shared_ptr[CDatasetFactory]] MakeFromMetaDataPath "Make"( + const c_string& metadata_path, + shared_ptr[CFileSystem] filesystem, + shared_ptr[CParquetFileFormat] format + ) + + @staticmethod + CResult[shared_ptr[CDatasetFactory]] MakeFromMetaDataSource "Make"( + const CFileSource& metadata_path, + const c_string& base_path, + shared_ptr[CFileSystem] filesystem, + shared_ptr[CParquetFileFormat] format + ) diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index 51542ee28ef..a71a844071a 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -1742,33 +1742,53 @@ def write_to_dataset(table, root_path, partition_cols=None, metadata_collector[-1].set_file_path(outfile) -def write_metadata(schema, where, version='1.0', - use_deprecated_int96_timestamps=False, - coerce_timestamps=None): +def write_metadata(schema, where, metadata_collector=None, **kwargs): """ - Write metadata-only Parquet file from schema. + Write metadata-only Parquet file from schema. This can be used with + `write_to_dataset` to generate `_common_metadata` and `_metadata` sidecar + files. Parameters ---------- schema : pyarrow.Schema where: string or pyarrow.NativeFile - version : {"1.0", "2.0"}, default "1.0" - The Parquet format version, defaults to 1.0. - use_deprecated_int96_timestamps : bool, default False - Write nanosecond resolution timestamps to INT96 Parquet format. - coerce_timestamps : str, default None - Cast timestamps a particular resolution. - Valid values: {None, 'ms', 'us'}. - filesystem : FileSystem, default None - If nothing passed, paths assumed to be found in the local on-disk - filesystem. + metadata_collector: + **kwargs : dict, + Additional kwargs for ParquetWriter class. See docstring for + `ParquetWriter` for more information. + + Examples + -------- + + Write a dataset and collect metadata information. + + >>> metadata_collector = [] + >>> write_to_dataset( + ... table, root_path, + ... metadata_collector=metadata_collector, **writer_kwargs) + + Write the `_common_metadata` parquet file without row groups statistics. + + >>> write_metadata( + ... table.schema, root_path / '_common_metadata', **writer_kwargs) + + Write the `_metadata` parquet file with row groups statistics. + + >>> write_metadata( + ... table.schema, root_path / '_metadata', + ... metadata_collector=metadata_collector, **writer_kwargs) """ - writer = ParquetWriter( - where, schema, version=version, - use_deprecated_int96_timestamps=use_deprecated_int96_timestamps, - coerce_timestamps=coerce_timestamps) + writer = ParquetWriter(where, schema, **kwargs) writer.close() + if metadata_collector is not None: + # ParquetWriter doesn't expose the metadata until it's written. Write + # it and read it again. + metadata = read_metadata(where) + for m in metadata_collector: + metadata.append_row_groups(m) + metadata.write_metadata_file(where) + def read_metadata(where, memory_map=False): """ diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index df512f83153..a7c7a1119c8 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -250,11 +250,11 @@ def test_filesystem_dataset(mockfs): assert isinstance(fragment, ds.ParquetFileFragment) assert fragment.row_groups is None - row_group_fragments = list(fragment.get_row_group_fragments()) + row_group_fragments = list(fragment.split_by_row_group()) assert len(row_group_fragments) == 1 assert isinstance(fragment, ds.ParquetFileFragment) assert row_group_fragments[0].path == path - assert row_group_fragments[0].row_groups == {0} + assert row_group_fragments[0].row_groups == [ds.RowGroupInfo(0)] fragments = list(dataset.get_fragments(filter=ds.field("const") == 0)) assert len(fragments) == 2 @@ -552,7 +552,7 @@ def test_make_fragment(multisourcefs): assert f.path == path assert isinstance(f.filesystem, type(multisourcefs)) assert fragment.row_groups is None - assert row_group_fragment.row_groups == {0} + assert row_group_fragment.row_groups == [ds.RowGroupInfo(0)] def _create_dataset_for_fragments(tempdir, chunk_size=None): @@ -621,18 +621,19 @@ def test_fragments_implicit_cast(tempdir): assert len(list(fragments)) == 1 -@pytest.mark.skip(reason="ARROW-8318") @pytest.mark.pandas @pytest.mark.parquet def test_fragments_reconstruct(tempdir): table, dataset = _create_dataset_for_fragments(tempdir) - def assert_yields_projected(fragment, row_slice, schema): - actual = fragment.to_table(schema=schema) - assert actual.schema == schema.schema + def assert_yields_projected(fragment, row_slice, + columns=None, filter=None): + actual = fragment.to_table( + schema=table.schema, columns=columns, filter=filter) + column_names = columns if columns else table.column_names + assert actual.column_names == column_names - names = schema.names - expected = table.slice(*row_slice).to_pandas()[[*names]] + expected = table.slice(*row_slice).to_pandas()[[*column_names]] assert actual.equals(pa.Table.from_pandas(expected)) fragment = list(dataset.get_fragments())[0] @@ -643,38 +644,37 @@ def assert_yields_projected(fragment, row_slice, schema): fragment.path, fragment.filesystem, partition_expression=fragment.partition_expression) assert new_fragment.to_table().equals(fragment.to_table()) - assert_yields_projected(new_fragment, (0, 4), table.column_names) + assert_yields_projected(new_fragment, (0, 4)) # filter / column projection, inspected schema new_fragment = parquet_format.make_fragment( fragment.path, fragment.filesystem, - columns=['f1'], filter=ds.field('f1') < 2, partition_expression=fragment.partition_expression) - assert_yields_projected(new_fragment, (0, 2), ['f1']) + assert_yields_projected(new_fragment, (0, 2), filter=ds.field('f1') < 2) # filter requiring cast / column projection, inspected schema new_fragment = parquet_format.make_fragment( fragment.path, fragment.filesystem, - columns=['f1'], filter=ds.field('f1') < 2.0, partition_expression=fragment.partition_expression) - assert_yields_projected(new_fragment, (0, 2), ['f1']) + assert_yields_projected(new_fragment, (0, 2), + columns=['f1'], filter=ds.field('f1') < 2.0) - # filter on the partition column, explicit schema + # filter on the partition column new_fragment = parquet_format.make_fragment( - fragment.path, fragment.filesystem, schema=dataset.schema, - filter=ds.field('part') == 'a', + fragment.path, fragment.filesystem, partition_expression=fragment.partition_expression) - assert_yields_projected(new_fragment, (0, 4), table.column_names) + assert_yields_projected(new_fragment, (0, 4), + filter=ds.field('part') == 'a') - # filter on the partition column, inspected schema + # Fragments don't contain the partition's columns if not provided to the + # `to_table(schema=...)` method. with pytest.raises(ValueError, match="Field named 'part' not found"): new_fragment = parquet_format.make_fragment( fragment.path, fragment.filesystem, - filter=ds.field('part') == 'a', partition_expression=fragment.partition_expression) + new_fragment.to_table(filter=ds.field('part') == 'a') -@pytest.mark.skip(reason="ARROW-8318") @pytest.mark.pandas @pytest.mark.parquet def test_fragments_parquet_row_groups(tempdir): @@ -683,7 +683,7 @@ def test_fragments_parquet_row_groups(tempdir): fragment = list(dataset.get_fragments())[0] # list and scan row group fragments - row_group_fragments = list(fragment.get_row_group_fragments()) + row_group_fragments = list(fragment.split_by_row_group()) assert len(row_group_fragments) == 2 result = row_group_fragments[0].to_table(schema=dataset.schema) assert result.column_names == ['f1', 'f2', 'part'] @@ -691,13 +691,12 @@ def test_fragments_parquet_row_groups(tempdir): assert result.equals(table.slice(0, 2)) fragment = list(dataset.get_fragments(filter=ds.field('f1') < 1))[0] - row_group_fragments = list(fragment.get_row_group_fragments()) + row_group_fragments = list(fragment.split_by_row_group(ds.field('f1') < 1)) assert len(row_group_fragments) == 1 result = row_group_fragments[0].to_table(filter=ds.field('f1') < 1) assert len(result) == 1 -@pytest.mark.skip(reason="ARROW-8318") @pytest.mark.pandas @pytest.mark.parquet def test_fragments_parquet_row_groups_reconstruct(tempdir): @@ -705,7 +704,7 @@ def test_fragments_parquet_row_groups_reconstruct(tempdir): fragment = list(dataset.get_fragments())[0] parquet_format = fragment.format - row_group_fragments = list(fragment.get_row_group_fragments()) + row_group_fragments = list(fragment.split_by_row_group()) # manually re-construct row group fragments new_fragment = parquet_format.make_fragment( @@ -720,7 +719,7 @@ def test_fragments_parquet_row_groups_reconstruct(tempdir): fragment.path, fragment.filesystem, partition_expression=fragment.partition_expression, row_groups={1}) - result = new_fragment.to_table(columns=['f1', 'part'], + result = new_fragment.to_table(schema=table.schema, columns=['f1', 'part'], filter=ds.field('f1') < 3, ) assert result.column_names == ['f1', 'part'] assert len(result) == 1 @@ -730,7 +729,7 @@ def test_fragments_parquet_row_groups_reconstruct(tempdir): fragment.path, fragment.filesystem, partition_expression=fragment.partition_expression, row_groups={2}) - with pytest.raises(IndexError, match="trying to scan row group 2"): + with pytest.raises(IndexError, match="Trying to scan row group 2"): new_fragment.to_table() @@ -1441,3 +1440,101 @@ def test_feather_format(tempdir): write_feather(table, str(basedir / "data1.feather"), version=1) with pytest.raises(ValueError): ds.dataset(basedir, format="feather").to_table() + + +def _create_parquet_dataset_simple(root_path): + import pyarrow.parquet as pq + + metadata_collector = [] + + for i in range(4): + table = pa.table({'f1': [i] * 10, 'f2': np.random.randn(10)}) + pq.write_to_dataset( + table, str(root_path), metadata_collector=metadata_collector + ) + + metadata_path = str(root_path / '_metadata') + # write _metadata file + pq.write_metadata( + table.schema, metadata_path, + metadata_collector=metadata_collector + ) + return metadata_path, table + + +@pytest.mark.parquet +@pytest.mark.pandas # write_to_dataset currently requires pandas +def test_parquet_dataset_factory(tempdir): + root_path = tempdir / "test_parquet_dataset" + metadata_path, table = _create_parquet_dataset_simple(root_path) + dataset = ds.parquet_dataset(metadata_path) + assert dataset.schema.equals(table.schema) + assert len(dataset.files) == 4 + result = dataset.to_table() + assert result.num_rows == 40 + + +@pytest.mark.parquet +@pytest.mark.pandas +def test_parquet_dataset_factory_invalid(tempdir): + root_path = tempdir / "test_parquet_dataset_invalid" + metadata_path, table = _create_parquet_dataset_simple(root_path) + # remove one of the files + list(root_path.glob("*.parquet"))[0].unlink() + dataset = ds.parquet_dataset(metadata_path) + assert dataset.schema.equals(table.schema) + assert len(dataset.files) == 4 + with pytest.raises(FileNotFoundError): + dataset.to_table() + + +def _create_metadata_file(root_path): + # create _metadata file from existing parquet dataset + import pyarrow.parquet as pq + + parquet_paths = list(sorted(root_path.rglob("*.parquet"))) + schema = pq.ParquetFile(parquet_paths[0]).schema.to_arrow_schema() + + metadata_collector = [] + for path in parquet_paths: + metadata = pq.ParquetFile(path).metadata + metadata.set_file_path(str(path.relative_to(root_path))) + metadata_collector.append(metadata) + + metadata_path = root_path / "_metadata" + pq.write_metadata( + schema, metadata_path, metadata_collector=metadata_collector + ) + return metadata_path + + +def _create_parquet_dataset_partitioned(root_path): + import pyarrow.parquet as pq + + table = pa.table({ + 'f1': range(20), 'f2': np.random.randn(20), + 'part': np.repeat(['a', 'b'], 10)} + ) + pq.write_to_dataset(table, str(root_path), partition_cols=['part']) + return _create_metadata_file(root_path), table + + +@pytest.mark.parquet +@pytest.mark.pandas +def test_parquet_dataset_factory_partitioned(tempdir): + # TODO support for specifying partitioning scheme + + root_path = tempdir / "test_parquet_dataset_factory_partitioned" + metadata_path, table = _create_parquet_dataset_partitioned(root_path) + + dataset = ds.parquet_dataset(metadata_path) + # TODO partition column not yet included + # assert dataset.schema.equals(table.schema) + assert len(dataset.files) == 2 + result = dataset.to_table() + assert result.num_rows == 20 + + # the partitioned dataset does not preserve order + result = result.to_pandas().sort_values("f1").reset_index(drop=True) + expected = table.to_pandas().drop(columns=["part"]) + pd.testing.assert_frame_equal(result, expected)