Skip to content

Commit

Permalink
Dataset: Add API to ignore both filter and project after scanning da…
Browse files Browse the repository at this point in the history
…ta from file format (#35)

* Dataset: Add API to ignore both filter and project after scanning data from file format

* Fixup

* Fixup
  • Loading branch information
zhztheplayer authored Oct 8, 2021
1 parent 609f033 commit 861538d
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 20 deletions.
22 changes: 6 additions & 16 deletions cpp/src/arrow/dataset/scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ class ARROW_DS_EXPORT SyncScanner : public Scanner {
: Scanner(std::move(scan_options)), fragment_(std::move(fragment)) {}

Result<TaggedRecordBatchIterator> ScanBatches() override;
Result<TaggedRecordBatchIterator> ScanBatchesWithWeakFilter() override;
Result<TaggedRecordBatchIterator> ScanBatchesWithWeakFilterProject() override;
Result<ScanTaskIterator> Scan() override;
Status Scan(std::function<Status(TaggedRecordBatch)> visitor) override;
Result<std::shared_ptr<Table>> ToTable() override;
Expand Down Expand Up @@ -298,21 +298,11 @@ Result<TaggedRecordBatchIterator> SyncScanner::ScanBatches() {
});
}

Result<TaggedRecordBatchIterator> SyncScanner::ScanBatchesWithWeakFilter() {
Result<TaggedRecordBatchIterator> SyncScanner::ScanBatchesWithWeakFilterProject() {
ARROW_ASSIGN_OR_RAISE(auto fragment_it, GetFragments())
auto fn = [this](const std::shared_ptr<Fragment>& fragment) -> Result<ScanTaskIterator> {
ARROW_ASSIGN_OR_RAISE(auto scan_task_it, fragment->Scan(scan_options_));

auto partition = fragment->partition_expression();
// Apply the projection to incoming RecordBatches by
// wrapping the ScanTask with a FilterAndProjectScanTask,
// ignore filters.
auto wrap_scan_task =
[partition](std::shared_ptr<ScanTask> task) -> std::shared_ptr<ScanTask> {
return std::make_shared<ProjectScanTask>(std::move(task), partition);
};

return MakeMapIterator(wrap_scan_task, std::move(scan_task_it));
return std::move(scan_task_it);
};

// Iterator<Iterator<ScanTask>>
Expand Down Expand Up @@ -395,7 +385,7 @@ class ARROW_DS_EXPORT AsyncScanner : public Scanner,

Status Scan(std::function<Status(TaggedRecordBatch)> visitor) override;
Result<TaggedRecordBatchIterator> ScanBatches() override;
Result<TaggedRecordBatchIterator> ScanBatchesWithWeakFilter() override;
Result<TaggedRecordBatchIterator> ScanBatchesWithWeakFilterProject() override;
Result<EnumeratedRecordBatchIterator> ScanBatchesUnordered() override;
Result<std::shared_ptr<Table>> ToTable() override;

Expand Down Expand Up @@ -518,8 +508,8 @@ Result<TaggedRecordBatchIterator> AsyncScanner::ScanBatches() {
return MakeGeneratorIterator(std::move(batches_gen));
}

Result<TaggedRecordBatchIterator> AsyncScanner::ScanBatchesWithWeakFilter() {
return Status::NotImplemented("Scanning with weak filter not implemented in async scanner");
Result<TaggedRecordBatchIterator> AsyncScanner::ScanBatchesWithWeakFilterProject() {
return Status::NotImplemented("Scanning with weak filter project not implemented in async scanner");
}

Result<EnumeratedRecordBatchIterator> AsyncScanner::ScanBatchesUnordered() {
Expand Down
5 changes: 4 additions & 1 deletion cpp/src/arrow/dataset/scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,10 @@ class ARROW_DS_EXPORT Scanner {
/// much in memory (this is goverended by the readahead options and use_threads option).
/// If the readahead queue fills up then I/O will pause until the calling thread catches
/// up.
virtual Result<TaggedRecordBatchIterator> ScanBatchesWithWeakFilter() = 0;
///
/// Filter and Project expressions in ScanOption will be not executed individually and
/// will be only recognized by the underlying file format.
virtual Result<TaggedRecordBatchIterator> ScanBatchesWithWeakFilterProject() = 0;
/// \brief Scan the dataset into a stream of record batches. Unlike ScanBatches this
/// method may allow record batches to be returned out of order. This allows for more
/// efficient scanning: some fragments may be accessed more quickly than others (e.g.
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/jni/dataset/jni_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class DisposableScannerAdaptor {

static arrow::Result<std::shared_ptr<DisposableScannerAdaptor>> Create(
std::shared_ptr<arrow::dataset::Scanner> scanner) {
ARROW_ASSIGN_OR_RAISE(auto batch_itr, scanner->ScanBatchesWithWeakFilter())
ARROW_ASSIGN_OR_RAISE(auto batch_itr, scanner->ScanBatchesWithWeakFilterProject())
return std::make_shared<DisposableScannerAdaptor>(scanner, std::move(batch_itr));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public synchronized NativeScanner newScan(ScanOptions options) {
long scannerId = JniWrapper.get().createScanner(datasetId, options.getColumns(),
options.getFilter().toByteArray(), options.getBatchSize(),
context.getMemoryPool().getNativeInstanceId());
return new NativeScanner(context, scannerId);
return new NativeScanner(context, options, scannerId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.arrow.dataset.scanner.ScanOptions;
import org.apache.arrow.dataset.scanner.ScanTask;
import org.apache.arrow.dataset.scanner.Scanner;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.util.SchemaUtility;
Expand All @@ -40,15 +42,17 @@ public class NativeScanner implements Scanner {

private final AtomicBoolean executed = new AtomicBoolean(false);
private final NativeContext context;
private final ScanOptions options;
private final long scannerId;

private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock writeLock = lock.writeLock();
private final Lock readLock = lock.readLock();
private boolean closed = false;

public NativeScanner(NativeContext context, long scannerId) {
public NativeScanner(NativeContext context, ScanOptions options, long scannerId) {
this.context = context;
this.options = options;
this.scannerId = scannerId;
}

Expand Down Expand Up @@ -87,6 +91,9 @@ public boolean hasNext() {
return false;
}
peek = UnsafeRecordBatchSerializer.deserializeUnsafe(context.getAllocator(), bytes);
if (options.getColumns() != null) {
Preconditions.checkState(peek.getNodes().size() == options.getColumns().length);
}
return true;
}

Expand Down
3 changes: 3 additions & 0 deletions java/dataset/src/test/resources/data/people-comma.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
name,age,job
Jorge,30,Developer
Bob,32,Developer
3 changes: 3 additions & 0 deletions java/dataset/src/test/resources/data/people-tab.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
name age job
Jorge 30 Developer
Bob 32 Developer

0 comments on commit 861538d

Please sign in to comment.