diff --git a/cpp/src/arrow/adapters/orc/adapter.cc b/cpp/src/arrow/adapters/orc/adapter.cc index cd200dda45e51..6548786ce2bdd 100644 --- a/cpp/src/arrow/adapters/orc/adapter.cc +++ b/cpp/src/arrow/adapters/orc/adapter.cc @@ -438,8 +438,37 @@ class ORCFileReader::Impl { return Status::OK(); } + Status NextStripeReader(int64_t batch_size, const std::vector& include_names, + std::shared_ptr* out) { + if (current_row_ >= NumberOfRows()) { + out->reset(); + return Status::OK(); + } + + liborc::RowReaderOptions opts; + if (!include_names.empty()) { + RETURN_NOT_OK(SelectNames(&opts, include_names)); + } + StripeInformation stripe_info({0, 0, 0, 0}); + RETURN_NOT_OK(SelectStripeWithRowNumber(&opts, current_row_, &stripe_info)); + std::shared_ptr schema; + RETURN_NOT_OK(ReadSchema(opts, &schema)); + std::unique_ptr row_reader; + + ORC_BEGIN_CATCH_NOT_OK + row_reader = reader_->createRowReader(opts); + row_reader->seekToRow(current_row_); + current_row_ = stripe_info.first_row_of_stripe + stripe_info.num_rows; + ORC_END_CATCH_NOT_OK + + *out = std::shared_ptr( + new OrcStripeReader(std::move(row_reader), schema, batch_size, pool_)); + return Status::OK(); + } + Status NextStripeReader(int64_t batch_size, std::shared_ptr* out) { - return NextStripeReader(batch_size, {}, out); + std::vector empty_vec; + return NextStripeReader(batch_size, empty_vec, out); } private: @@ -538,6 +567,12 @@ Status ORCFileReader::NextStripeReader(int64_t batch_size, return impl_->NextStripeReader(batch_size, include_indices, out); } +Status ORCFileReader::NextStripeReader(int64_t batch_size, + const std::vector& include_names, + std::shared_ptr* out) { + return impl_->NextStripeReader(batch_size, include_names, out); +} + int64_t ORCFileReader::NumberOfStripes() { return impl_->NumberOfStripes(); } int64_t ORCFileReader::NumberOfRows() { return impl_->NumberOfRows(); } diff --git a/cpp/src/arrow/adapters/orc/adapter.h b/cpp/src/arrow/adapters/orc/adapter.h index 0f84a9501ca71..e87fdf6fc6054 100644 --- a/cpp/src/arrow/adapters/orc/adapter.h +++ b/cpp/src/arrow/adapters/orc/adapter.h @@ -164,6 +164,19 @@ class ARROW_EXPORT ORCFileReader { Status NextStripeReader(int64_t batch_size, const std::vector& include_indices, std::shared_ptr* out); + /// \brief Get a stripe level record batch iterator with specified row count + /// in each record batch. NextStripeReader serves as a fine grain + /// alternative to ReadStripe which may cause OOM issue by loading + /// the whole stripes into memory. + /// + /// \param[in] batch_size Get a stripe level record batch iterator with specified row + /// count in each record batch. + /// + /// \param[in] include_indices the selected field names to read + /// \param[out] out the returned stripe reader + Status NextStripeReader(int64_t batch_size, const std::vector& include_names, + std::shared_ptr* out); + /// \brief The number of stripes in the file int64_t NumberOfStripes(); diff --git a/cpp/src/arrow/dataset/file_orc.cc b/cpp/src/arrow/dataset/file_orc.cc index cf4b2db4f545a..541c99fb0b338 100644 --- a/cpp/src/arrow/dataset/file_orc.cc +++ b/cpp/src/arrow/dataset/file_orc.cc @@ -84,24 +84,20 @@ class OrcScanTask : public ScanTask { included_fields.push_back(name); } + std::shared_ptr recordBatchReader; + reader->NextStripeReader(scan_options.batch_size, included_fields, &recordBatchReader); + return RecordBatchIterator( - Impl{std::move(reader), 0, num_stripes, included_fields}); + Impl{std::move(recordBatchReader)}); } Result> Next() { - if (i_ == num_stripes_) { - return nullptr; - } std::shared_ptr batch; - // TODO (https://issues.apache.org/jira/browse/ARROW-14153) - // pass scan_options_->batch_size - return reader_->ReadStripe(i_++, included_fields_); + RETURN_NOT_OK(recordBatchReader_->ReadNext(&batch)); + return batch; } - std::unique_ptr reader_; - int i_; - int num_stripes_; - std::vector included_fields_; + std::shared_ptr recordBatchReader_; }; return Impl::Make(source_, *checked_pointer_cast(fragment_)->format(),