Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions java/dataset/src/main/cpp/jni_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -399,11 +399,11 @@ JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_closeDataset
/*
* Class: org_apache_arrow_dataset_jni_JniWrapper
* Method: createScanner
* Signature: (J[Ljava/lang/String;JJ)J
* Signature: (J[Ljava/lang/String;JLjava/lang/String;J)J
*/
JNIEXPORT jlong JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_createScanner(
JNIEnv* env, jobject, jlong dataset_id, jobjectArray columns, jlong batch_size,
jlong memory_pool_id) {
jstring filter, jlong memory_pool_id) {
JNI_METHOD_START
arrow::MemoryPool* pool = reinterpret_cast<arrow::MemoryPool*>(memory_pool_id);
if (pool == nullptr) {
Expand All @@ -418,6 +418,11 @@ JNIEXPORT jlong JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_createScann
std::vector<std::string> column_vector = ToStringVector(env, columns);
JniAssertOkOrThrow(scanner_builder->Project(column_vector));
}
if (filter != nullptr) {
arrow::compute::Expression filter_expr = JniGetOrThrow(
arrow::compute::Expression::FromString(JStringToCString(env, filter)));
JniAssertOkOrThrow(scanner_builder->Filter(filter_expr));
}
JniAssertOkOrThrow(scanner_builder->BatchSize(batch_size));

auto scanner = JniGetOrThrow(scanner_builder->Finish());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ private JniWrapper() {
* @param memoryPool identifier of memory pool used in the native scanner.
* @return the native pointer of the arrow::dataset::Scanner instance.
*/
public native long createScanner(long datasetId, String[] columns, long batchSize, long memoryPool);
public native long createScanner(long datasetId, String[] columns, long batchSize, String filter, long memoryPool);

/**
* Get a serialized schema from native instance of a Scanner.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public synchronized NativeScanner newScan(ScanOptions options) {
throw new NativeInstanceReleasedException();
}
long scannerId = JniWrapper.get().createScanner(datasetId, options.getColumns().orElse(null),
options.getBatchSize(), context.getMemoryPool().getNativeInstanceId());
options.getBatchSize(), options.getFilter().orElse(null), context.getMemoryPool().getNativeInstanceId());
return new NativeScanner(context, scannerId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
*/
public class ScanOptions {
private final Optional<String[]> columns;
private final Optional<String> filter;
private final long batchSize;

/**
Expand All @@ -43,7 +44,7 @@ public ScanOptions(String[] columns, long batchSize) {
return null;
}
return present;
}));
}), Optional.empty());
}

/**
Expand All @@ -52,19 +53,24 @@ public ScanOptions(String[] columns, long batchSize) {
* @param columns (Optional) Projected columns. {@link Optional#empty()} for scanning all columns. Otherwise,
* Only columns present in the Array will be scanned.
*/
public ScanOptions(long batchSize, Optional<String[]> columns) {
public ScanOptions(long batchSize, Optional<String[]> columns, Optional<String> filter) {
Preconditions.checkNotNull(filter)
Preconditions.checkNotNull(columns);
this.batchSize = batchSize;
this.columns = columns;
}

public ScanOptions(long batchSize) {
this(batchSize, Optional.empty());
this(batchSize, Optional.empty(), Optional.empty());
}

public Optional<String[]> getColumns() {
return columns;
}

public Optional<String> getFilter() {
return filter;
}

public long getBatchSize() {
return batchSize;
Expand Down