Skip to content

Commit

Permalink
Add support for batch_size in the ORC Scanner (Dataset) (#47)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhixingheyi-tian authored Nov 22, 2021
1 parent cded559 commit a1575c8
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 12 deletions.
37 changes: 36 additions & 1 deletion cpp/src/arrow/adapters/orc/adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -438,8 +438,37 @@ class ORCFileReader::Impl {
return Status::OK();
}

Status NextStripeReader(int64_t batch_size, const std::vector<std::string>& include_names,
std::shared_ptr<RecordBatchReader>* out) {
if (current_row_ >= NumberOfRows()) {
out->reset();
return Status::OK();
}

liborc::RowReaderOptions opts;
if (!include_names.empty()) {
RETURN_NOT_OK(SelectNames(&opts, include_names));
}
StripeInformation stripe_info({0, 0, 0, 0});
RETURN_NOT_OK(SelectStripeWithRowNumber(&opts, current_row_, &stripe_info));
std::shared_ptr<Schema> schema;
RETURN_NOT_OK(ReadSchema(opts, &schema));
std::unique_ptr<liborc::RowReader> row_reader;

ORC_BEGIN_CATCH_NOT_OK
row_reader = reader_->createRowReader(opts);
row_reader->seekToRow(current_row_);
current_row_ = stripe_info.first_row_of_stripe + stripe_info.num_rows;
ORC_END_CATCH_NOT_OK

*out = std::shared_ptr<RecordBatchReader>(
new OrcStripeReader(std::move(row_reader), schema, batch_size, pool_));
return Status::OK();
}

Status NextStripeReader(int64_t batch_size, std::shared_ptr<RecordBatchReader>* out) {
return NextStripeReader(batch_size, {}, out);
std::vector<int> empty_vec;
return NextStripeReader(batch_size, empty_vec, out);
}

private:
Expand Down Expand Up @@ -538,6 +567,12 @@ Status ORCFileReader::NextStripeReader(int64_t batch_size,
return impl_->NextStripeReader(batch_size, include_indices, out);
}

Status ORCFileReader::NextStripeReader(int64_t batch_size,
const std::vector<std::string>& include_names,
std::shared_ptr<RecordBatchReader>* out) {
return impl_->NextStripeReader(batch_size, include_names, out);
}

int64_t ORCFileReader::NumberOfStripes() { return impl_->NumberOfStripes(); }

int64_t ORCFileReader::NumberOfRows() { return impl_->NumberOfRows(); }
Expand Down
13 changes: 13 additions & 0 deletions cpp/src/arrow/adapters/orc/adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,19 @@ class ARROW_EXPORT ORCFileReader {
Status NextStripeReader(int64_t batch_size, const std::vector<int>& include_indices,
std::shared_ptr<RecordBatchReader>* out);

/// \brief Get a stripe level record batch iterator with specified row count
/// in each record batch. NextStripeReader serves as a fine grain
/// alternative to ReadStripe which may cause OOM issue by loading
/// the whole stripes into memory.
///
/// \param[in] batch_size Get a stripe level record batch iterator with specified row
/// count in each record batch.
///
/// \param[in] include_indices the selected field names to read
/// \param[out] out the returned stripe reader
Status NextStripeReader(int64_t batch_size, const std::vector<std::string>& include_names,
std::shared_ptr<RecordBatchReader>* out);

/// \brief The number of stripes in the file
int64_t NumberOfStripes();

Expand Down
18 changes: 7 additions & 11 deletions cpp/src/arrow/dataset/file_orc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,24 +84,20 @@ class OrcScanTask : public ScanTask {
included_fields.push_back(name);
}

std::shared_ptr<RecordBatchReader> recordBatchReader;
reader->NextStripeReader(scan_options.batch_size, included_fields, &recordBatchReader);

return RecordBatchIterator(
Impl{std::move(reader), 0, num_stripes, included_fields});
Impl{std::move(recordBatchReader)});
}

Result<std::shared_ptr<RecordBatch>> Next() {
if (i_ == num_stripes_) {
return nullptr;
}
std::shared_ptr<RecordBatch> batch;
// TODO (https://issues.apache.org/jira/browse/ARROW-14153)
// pass scan_options_->batch_size
return reader_->ReadStripe(i_++, included_fields_);
RETURN_NOT_OK(recordBatchReader_->ReadNext(&batch));
return batch;
}

std::unique_ptr<arrow::adapters::orc::ORCFileReader> reader_;
int i_;
int num_stripes_;
std::vector<std::string> included_fields_;
std::shared_ptr<RecordBatchReader> recordBatchReader_;
};

return Impl::Make(source_, *checked_pointer_cast<FileFragment>(fragment_)->format(),
Expand Down

0 comments on commit a1575c8

Please sign in to comment.