diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index 402d017d8815a..d7449c7127180 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -267,7 +267,7 @@ class ARROW_DS_EXPORT SyncScanner : public Scanner { : Scanner(std::move(scan_options)), fragment_(std::move(fragment)) {} Result ScanBatches() override; - Result ScanBatchesWithWeakFilter() override; + Result ScanBatchesWithWeakFilterProject() override; Result Scan() override; Status Scan(std::function visitor) override; Result> ToTable() override; @@ -298,21 +298,11 @@ Result SyncScanner::ScanBatches() { }); } -Result SyncScanner::ScanBatchesWithWeakFilter() { +Result SyncScanner::ScanBatchesWithWeakFilterProject() { ARROW_ASSIGN_OR_RAISE(auto fragment_it, GetFragments()) auto fn = [this](const std::shared_ptr& fragment) -> Result { ARROW_ASSIGN_OR_RAISE(auto scan_task_it, fragment->Scan(scan_options_)); - - auto partition = fragment->partition_expression(); - // Apply the projection to incoming RecordBatches by - // wrapping the ScanTask with a FilterAndProjectScanTask, - // ignore filters. - auto wrap_scan_task = - [partition](std::shared_ptr task) -> std::shared_ptr { - return std::make_shared(std::move(task), partition); - }; - - return MakeMapIterator(wrap_scan_task, std::move(scan_task_it)); + return std::move(scan_task_it); }; // Iterator> @@ -395,7 +385,7 @@ class ARROW_DS_EXPORT AsyncScanner : public Scanner, Status Scan(std::function visitor) override; Result ScanBatches() override; - Result ScanBatchesWithWeakFilter() override; + Result ScanBatchesWithWeakFilterProject() override; Result ScanBatchesUnordered() override; Result> ToTable() override; @@ -518,8 +508,8 @@ Result AsyncScanner::ScanBatches() { return MakeGeneratorIterator(std::move(batches_gen)); } -Result AsyncScanner::ScanBatchesWithWeakFilter() { - return Status::NotImplemented("Scanning with weak filter not implemented in async scanner"); +Result AsyncScanner::ScanBatchesWithWeakFilterProject() { + return Status::NotImplemented("Scanning with weak filter project not implemented in async scanner"); } Result AsyncScanner::ScanBatchesUnordered() { diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index e7eb20c12179b..6469e54666616 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -274,7 +274,10 @@ class ARROW_DS_EXPORT Scanner { /// much in memory (this is goverended by the readahead options and use_threads option). /// If the readahead queue fills up then I/O will pause until the calling thread catches /// up. - virtual Result ScanBatchesWithWeakFilter() = 0; + /// + /// Filter and Project expressions in ScanOption will be not executed individually and + /// will be only recognized by the underlying file format. + virtual Result ScanBatchesWithWeakFilterProject() = 0; /// \brief Scan the dataset into a stream of record batches. Unlike ScanBatches this /// method may allow record batches to be returned out of order. This allows for more /// efficient scanning: some fragments may be accessed more quickly than others (e.g. diff --git a/cpp/src/jni/dataset/jni_wrapper.cc b/cpp/src/jni/dataset/jni_wrapper.cc index 60675c8d8feef..a56670badc693 100644 --- a/cpp/src/jni/dataset/jni_wrapper.cc +++ b/cpp/src/jni/dataset/jni_wrapper.cc @@ -130,7 +130,7 @@ class DisposableScannerAdaptor { static arrow::Result> Create( std::shared_ptr scanner) { - ARROW_ASSIGN_OR_RAISE(auto batch_itr, scanner->ScanBatchesWithWeakFilter()) + ARROW_ASSIGN_OR_RAISE(auto batch_itr, scanner->ScanBatchesWithWeakFilterProject()) return std::make_shared(scanner, std::move(batch_itr)); } diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java index 0ceec3b8f1ebf..bf381e6dfd3c6 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java @@ -43,7 +43,7 @@ public synchronized NativeScanner newScan(ScanOptions options) { long scannerId = JniWrapper.get().createScanner(datasetId, options.getColumns(), options.getFilter().toByteArray(), options.getBatchSize(), context.getMemoryPool().getNativeInstanceId()); - return new NativeScanner(context, scannerId); + return new NativeScanner(context, options, scannerId); } @Override diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java index ea2c9edf4ec30..a8374f1deb3e8 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java @@ -25,8 +25,10 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.arrow.dataset.scanner.ScanOptions; import org.apache.arrow.dataset.scanner.ScanTask; import org.apache.arrow.dataset.scanner.Scanner; +import org.apache.arrow.util.Preconditions; import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; import org.apache.arrow.vector.types.pojo.Schema; import org.apache.arrow.vector.util.SchemaUtility; @@ -40,6 +42,7 @@ public class NativeScanner implements Scanner { private final AtomicBoolean executed = new AtomicBoolean(false); private final NativeContext context; + private final ScanOptions options; private final long scannerId; private final ReadWriteLock lock = new ReentrantReadWriteLock(); @@ -47,8 +50,9 @@ public class NativeScanner implements Scanner { private final Lock readLock = lock.readLock(); private boolean closed = false; - public NativeScanner(NativeContext context, long scannerId) { + public NativeScanner(NativeContext context, ScanOptions options, long scannerId) { this.context = context; + this.options = options; this.scannerId = scannerId; } @@ -87,6 +91,9 @@ public boolean hasNext() { return false; } peek = UnsafeRecordBatchSerializer.deserializeUnsafe(context.getAllocator(), bytes); + if (options.getColumns() != null) { + Preconditions.checkState(peek.getNodes().size() == options.getColumns().length); + } return true; } diff --git a/java/dataset/src/test/resources/data/people-comma.csv b/java/dataset/src/test/resources/data/people-comma.csv new file mode 100644 index 0000000000000..4d9b27bf9ac8e --- /dev/null +++ b/java/dataset/src/test/resources/data/people-comma.csv @@ -0,0 +1,3 @@ +name,age,job +Jorge,30,Developer +Bob,32,Developer diff --git a/java/dataset/src/test/resources/data/people-tab.csv b/java/dataset/src/test/resources/data/people-tab.csv new file mode 100644 index 0000000000000..e7ef9fa488c38 --- /dev/null +++ b/java/dataset/src/test/resources/data/people-tab.csv @@ -0,0 +1,3 @@ +name age job +Jorge 30 Developer +Bob 32 Developer