From ab913d8bf0d8511cbe083782e56472005bf71991 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Wed, 3 Nov 2021 19:45:09 +0800 Subject: [PATCH] Revert "Avoid whole partition cache (#38)" (#40) This reverts commit 8d7e25031b3884ea66d99f3c77e921a11f343c9c. --- cpp/src/arrow/dataset/file_parquet.cc | 1 - cpp/src/arrow/dataset/scanner.cc | 28 +++++++++++++++------------ cpp/src/arrow/dataset/scanner.h | 2 +- cpp/src/jni/dataset/jni_wrapper.cc | 8 ++++---- 4 files changed, 21 insertions(+), 18 deletions(-) diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index e9395c408562f..7e24d905a0470 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -51,7 +51,6 @@ using parquet::arrow::SchemaField; using parquet::arrow::SchemaManifest; using parquet::arrow::StatisticsAsScalars; - /// \brief A ScanTask backed by a parquet file and a RowGroup within a parquet file. class ParquetScanTask : public ScanTask { public: diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index e45259de89838..d7449c7127180 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -22,7 +22,6 @@ #include #include #include -#include #include "arrow/array/array_primitive.h" #include "arrow/compute/api_scalar.h" @@ -268,7 +267,7 @@ class ARROW_DS_EXPORT SyncScanner : public Scanner { : Scanner(std::move(scan_options)), fragment_(std::move(fragment)) {} Result ScanBatches() override; - Result ScanBatchesWithWeakFilterProject() override; + Result ScanBatchesWithWeakFilterProject() override; Result Scan() override; Status Scan(std::function visitor) override; Result> ToTable() override; @@ -299,23 +298,28 @@ Result SyncScanner::ScanBatches() { }); } -Result SyncScanner::ScanBatchesWithWeakFilterProject() { +Result SyncScanner::ScanBatchesWithWeakFilterProject() { ARROW_ASSIGN_OR_RAISE(auto fragment_it, GetFragments()) auto fn = [this](const std::shared_ptr& fragment) -> Result { ARROW_ASSIGN_OR_RAISE(auto scan_task_it, fragment->Scan(scan_options_)); return std::move(scan_task_it); }; + // Iterator> auto maybe_scantask_it = MakeMaybeMapIterator(fn, std::move(fragment_it)); auto scan_task_it = MakeFlattenIterator(std::move(maybe_scantask_it)); - - auto scan_fn = [](const std::shared_ptr& aTask)->Result { - ARROW_ASSIGN_OR_RAISE(auto recordbatch_it, aTask->Execute()); - return std::move(recordbatch_it); - }; - auto maybe_recordbatch_it = MakeMaybeMapIterator(scan_fn, std::move(scan_task_it)); - return MakeFlattenIterator(std::move(maybe_recordbatch_it)); + auto task_group = scan_options_->TaskGroup(); + auto state = std::make_shared(std::move(scan_task_it), task_group); + for (int i = 0; i < scan_options_->fragment_readahead; i++) { + state->PushScanTask(); + } + return MakeFunctionIterator([task_group, state]() -> Result { + ARROW_ASSIGN_OR_RAISE(auto batch, state->Pop()); + if (!IsIterationEnd(batch)) return batch; + RETURN_NOT_OK(task_group->Finish()); + return IterationEnd(); + }); } Result SyncScanner::GetFragments() { @@ -381,7 +385,7 @@ class ARROW_DS_EXPORT AsyncScanner : public Scanner, Status Scan(std::function visitor) override; Result ScanBatches() override; - Result ScanBatchesWithWeakFilterProject() override; + Result ScanBatchesWithWeakFilterProject() override; Result ScanBatchesUnordered() override; Result> ToTable() override; @@ -504,7 +508,7 @@ Result AsyncScanner::ScanBatches() { return MakeGeneratorIterator(std::move(batches_gen)); } -Result AsyncScanner::ScanBatchesWithWeakFilterProject() { +Result AsyncScanner::ScanBatchesWithWeakFilterProject() { return Status::NotImplemented("Scanning with weak filter project not implemented in async scanner"); } diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index fff36b67e92bb..6469e54666616 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -277,7 +277,7 @@ class ARROW_DS_EXPORT Scanner { /// /// Filter and Project expressions in ScanOption will be not executed individually and /// will be only recognized by the underlying file format. - virtual Result ScanBatchesWithWeakFilterProject() = 0; + virtual Result ScanBatchesWithWeakFilterProject() = 0; /// \brief Scan the dataset into a stream of record batches. Unlike ScanBatches this /// method may allow record batches to be returned out of order. This allows for more /// efficient scanning: some fragments may be accessed more quickly than others (e.g. diff --git a/cpp/src/jni/dataset/jni_wrapper.cc b/cpp/src/jni/dataset/jni_wrapper.cc index baf916b29ca9f..a56670badc693 100644 --- a/cpp/src/jni/dataset/jni_wrapper.cc +++ b/cpp/src/jni/dataset/jni_wrapper.cc @@ -125,7 +125,7 @@ class ReserveFromJava : public arrow::jniutil::ReservationListener { class DisposableScannerAdaptor { public: DisposableScannerAdaptor(std::shared_ptr scanner, - arrow::RecordBatchIterator batch_itr) + arrow::dataset::TaggedRecordBatchIterator batch_itr) : scanner_(std::move(scanner)), batch_itr_(std::move(batch_itr)) {} static arrow::Result> Create( @@ -136,18 +136,18 @@ class DisposableScannerAdaptor { arrow::Result> Next() { ARROW_ASSIGN_OR_RAISE(std::shared_ptr batch, NextBatch()); - return std::move(batch); + return batch; } const std::shared_ptr& GetScanner() const { return scanner_; } private: std::shared_ptr scanner_; - arrow::RecordBatchIterator batch_itr_; + arrow::dataset::TaggedRecordBatchIterator batch_itr_; arrow::Result> NextBatch() { ARROW_ASSIGN_OR_RAISE(auto batch, batch_itr_.Next()) - return batch; + return batch.record_batch; } };