Skip to content

Conversation

wForget
Copy link
Member

@wForget wForget commented Sep 9, 2025

Which issue does this PR close?

Closes #.

Rationale for this change

What changes are included in this PR?

How are these changes tested?

@codecov-commenter
Copy link

codecov-commenter commented Sep 9, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 57.44%. Comparing base (f09f8af) to head (cc529cb).
⚠️ Report is 501 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff              @@
##               main    #2350      +/-   ##
============================================
+ Coverage     56.12%   57.44%   +1.31%     
- Complexity      976     1297     +321     
============================================
  Files           119      147      +28     
  Lines         11743    13418    +1675     
  Branches       2251     2349      +98     
============================================
+ Hits           6591     7708    +1117     
- Misses         4012     4447     +435     
- Partials       1140     1263     +123     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@wForget wForget force-pushed the COMET-2298 branch 2 times, most recently from eefa69f to 1aa1d47 Compare September 9, 2025 10:10
@wForget wForget marked this pull request as ready for review September 11, 2025 08:01
with:
artifact_name: ${{ matrix.os }}-java-${{ matrix.java_version }}-features-${{ matrix.features.value }}-${{ github.run_id }}-${{ github.run_number }}-${{ github.run_attempt }}
features: ${{ matrix.features.value }}
maven_opts: "-Dtest=none -Dfeatures=${{ matrix.features.value }}"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-Dtest=none skip java tests

}

def featureEnabled(feature: String): Boolean = {
System.getProperty("feature", "").split(",").contains(feature)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A temporary solution before #2372

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we wait #2372 ?

@wForget
Copy link
Member Author

wForget commented Sep 11, 2025

@parthchandra Could you please take a look?

{
val testFilePath = dir.toString
writeTestParquetFile(testFilePath)
withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO we also should check native_iceberg_compat`

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I will add native_iceberg_compat mode

}

ignore("test native_datafusion scan on fake fs") {
test("test native_datafusion scan on fake fs") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also test hdfs feature and native_iceberg_compat mode?

Copy link
Member Author

@wForget wForget Sep 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also test hdfs feature

hdfs feature does not seem to support non-hdfs scheme yet, as comment #2360 (comment) .

fn get_namenode_uri(path: &str) -> Result<String, HdfsErr> {
match Url::parse(path) {
Ok(url) => match url.scheme() {
LOCAL_FS_SCHEME => Ok("file:///".to_string()),
HDFS_FS_SCHEME | VIEW_FS_SCHEME => {
if let Some(host) = url.host() {
let mut uri_builder = String::new();
write!(&mut uri_builder, "{}://{}", url.scheme(), host).unwrap();
if let Some(port) = url.port() {
write!(&mut uri_builder, ":{port}").unwrap();
}
Ok(uri_builder)
} else {
Err(HdfsErr::InvalidUrl(path.to_string()))
}
}
_ => Err(HdfsErr::InvalidUrl(path.to_string())),
},
Err(_) => Err(HdfsErr::InvalidUrl(path.to_string())),
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats correct, yes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's one of the fixes we wanted in fs-hdfs datafusion-contrib/fs-hdfs#29

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I think at some point it might be more reasonable to use openDAL instead of fs-hdfs (#2367)

Copy link
Contributor

@parthchandra parthchandra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this PR @wForget. Should we wait to merge #2327 before we merge this?

}

ignore("test native_datafusion scan on fake fs") {
test("test native_datafusion scan on fake fs") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I think at some point it might be more reasonable to use openDAL instead of fs-hdfs (#2367)

@wForget
Copy link
Member Author

wForget commented Sep 12, 2025

Thanks for this PR @wForget. Should we wait to merge #2327 before we merge this?

Thanks for the review. Do you mean #2372? If so, I think either way is fine — the current temporary solution won’t break CI.

@parthchandra
Copy link
Contributor

oops. Yes, I did mean #2372. Let's get that merged and update the test in this PR.

@parthchandra
Copy link
Contributor

Should we also be adding the test suites to pr_build_macos.yml ?

@wForget
Copy link
Member Author

wForget commented Sep 13, 2025

Should we also be adding the test suites to pr_build_macos.yml ?

Thanks, I will add it

Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is LGTM thanks @wForget but probably should wait for #2372

@wForget
Copy link
Member Author

wForget commented Sep 16, 2025

Looks like a related failure https://github.com/apache/datafusion-comet/actions/runs/17752053952/job/50448541640?pr=2350, but I can't reproduce it, filed an issue #2400.

- test native_datafusion scan on fake fs *** FAILED *** (8 seconds, 838 milliseconds)
  org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 6.0 failed 1 times, most recent failure: Lost task 2.0 in stage 6.0 (TID 15) (localhost executor driver): org.apache.spark.SparkException: Encountered error while reading file fake://fake-bucket/Users/runner/work/datafusion-comet/datafusion-comet/spark/target/tmp/comet_fake_3afe6cec-86bd-48ff-97a1-d354fec360327728661446900846429/data/test-file.parquet/part-00001-82d42ceb-6226-4ed6-8b34-d7108fc0b695-c000.snappy.parquet. Details:
	at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFilesError(QueryExecutionErrors.scala:864)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:296)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:131)
	at org.apache.spark.sql.comet.CometScanExec$$anon$1.hasNext(CometScanExec.scala:271)
	at org.apache.comet.CometBatchIterator.hasNext(CometBatchIterator.java:60)
	at org.apache.comet.Native.executePlan(Native Method)
	at org.apache.comet.CometExecIterator.$anonfun$getNextBatch$2(CometExecIterator.scala:165)
	at org.apache.comet.CometExecIterator.$anonfun$getNextBatch$2$adapted(CometExecIterator.scala:164)
	at org.apache.comet.vector.NativeUtil.getNextBatch(NativeUtil.scala:212)
	at org.apache.comet.CometExecIterator.$anonfun$getNextBatch$1(CometExecIterator.scala:164)
	at org.apache.comet.Tracing$.withTrace(Tracing.scala:31)
	at org.apache.comet.CometExecIterator.getNextBatch(CometExecIterator.scala:162)
	at org.apache.comet.CometExecIterator.hasNext(CometExecIterator.scala:213)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.comet.CometBatchIterator.hasNext(CometBatchIterator.java:60)
	at org.apache.comet.Native.executePlan(Native Method)
	at org.apache.comet.CometExecIterator.$anonfun$getNextBatch$2(CometExecIterator.scala:165)
	at org.apache.comet.CometExecIterator.$anonfun$getNextBatch$2$adapted(CometExecIterator.scala:164)
	at org.apache.comet.vector.NativeUtil.getNextBatch(NativeUtil.scala:212)
	at org.apache.comet.CometExecIterator.$anonfun$getNextBatch$1(CometExecIterator.scala:164)
	at org.apache.comet.Tracing$.withTrace(Tracing.scala:31)
	at org.apache.comet.CometExecIterator.getNextBatch(CometExecIterator.scala:162)
	at org.apache.comet.CometExecIterator.hasNext(CometExecIterator.scala:213)
	at org.apache.spark.sql.comet.execution.shuffle.CometNativeShuffleWriter.write(CometNativeShuffleWriter.scala:106)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:621)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:624)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.comet.CometNativeException: External: Generic IoError error: Unexpected (permanent) at , context: { expect: 8, actual: 0 } => reader got too little data
	at org.apache.comet.parquet.Native.readNextRecordBatch(Native Method)
	at org.apache.comet.parquet.NativeBatchReader.loadNextBatch(NativeBatchReader.java:812)
	at org.apache.comet.parquet.NativeBatchReader.nextBatch(NativeBatchReader.java:749)
	at org.apache.comet.parquet.NativeBatchReader.nextKeyValue(NativeBatchReader.java:707)
	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:131)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:286)
	... 36 more

@parthchandra
Copy link
Contributor

I tried this PR earlier along with #2372 and did not see this failure either. I'll try again to see if I can reproduce it.

Copy link
Contributor

@parthchandra parthchandra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm, but I cannot reproduce the failure in ci. Can anyone else try this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants