diff --git a/.github/actions/java-test/action.yaml b/.github/actions/java-test/action.yaml index 1f7899f8ac..24af6908ce 100644 --- a/.github/actions/java-test/action.yaml +++ b/.github/actions/java-test/action.yaml @@ -29,6 +29,10 @@ inputs: description: 'Maven options passed to the mvn command' required: false default: '' + features: + description: 'Build native features (like hdfs e.g)' + required: false + default: '' scan_impl: description: 'The default Parquet scan implementation' required: false @@ -45,7 +49,11 @@ runs: shell: bash run: | cd native - cargo build + FEATURES_ARG="" + if [ -n "${{ inputs.features }}" ]; then + FEATURES_ARG="--features=${{ inputs.features }}" + fi + cargo build $FEATURES_ARG - name: Cache Maven dependencies uses: actions/cache@v4 @@ -82,6 +90,10 @@ runs: MAVEN_SUITES="$(echo "${{ inputs.suites }}" | paste -sd, -)" echo "Running with MAVEN_SUITES=$MAVEN_SUITES" MAVEN_OPTS="-Xmx4G -Xms2G -DwildcardSuites=$MAVEN_SUITES -XX:+UnlockDiagnosticVMOptions -XX:+ShowMessageBoxOnError -XX:+HeapDumpOnOutOfMemoryError -XX:ErrorFile=./hs_err_pid%p.log" SPARK_HOME=`pwd` ./mvnw -B clean install ${{ inputs.maven_opts }} + # Hadoop mini cluster may interfere with spark shutdown hook, causing the tmp directory to not be fully cleaned up. + # Clean tmp dir to avoid hashFiles error + ls spark/target/tmp + rm -rf spark/target/tmp - name: Upload crash logs if: failure() uses: actions/upload-artifact@v4 diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index c45355978e..c5651dc101 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -170,4 +170,42 @@ jobs: suites: ${{ matrix.suite.value }} maven_opts: ${{ matrix.profile.maven_opts }} scan_impl: ${{ matrix.profile.scan_impl }} - upload-test-reports: true \ No newline at end of file + upload-test-reports: true + + # Java tests with native features + linux-test-features: + strategy: + matrix: + os: [ubuntu-latest] + java_version: [17] + features: + - value: "hdfs-opendal" + suites: | + org.apache.comet.parquet.ParquetReadFromFakeHadoopFsSuite + org.apache.comet.parquet.ParquetReadFromHdfsSuite + - value: "hdfs" + suites: | + org.apache.comet.parquet.ParquetReadFromHdfsSuite + fail-fast: false + name: ${{ matrix.os }}/java ${{ matrix.java_version }}-features [${{ matrix.features.value }}] + runs-on: ${{ matrix.os }} + container: + image: amd64/rust + env: + JAVA_TOOL_OPTIONS: ${{ matrix.profile.java_version == '17' && '--add-exports=java.base/sun.nio.ch=ALL-UNNAMED --add-exports=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED' || '' }} + + steps: + - uses: actions/checkout@v5 + - name: Setup Rust & Java toolchain + uses: ./.github/actions/setup-builder + with: + rust-version: ${{env.RUST_VERSION}} + jdk-version: ${{ matrix.java_version }} + - name: Java test steps + uses: ./.github/actions/java-test + with: + artifact_name: ${{ matrix.os }}-java-${{ matrix.java_version }}-features-${{ matrix.features.value }}-${{ github.run_id }}-${{ github.run_number }}-${{ github.run_attempt }} + features: ${{ matrix.features.value }} + maven_opts: "-Dtest=none" + suites: ${{ matrix.features.suites }} + upload-test-reports: true diff --git a/.github/workflows/pr_build_macos.yml b/.github/workflows/pr_build_macos.yml index 6c71006e5f..7dab979b36 100644 --- a/.github/workflows/pr_build_macos.yml +++ b/.github/workflows/pr_build_macos.yml @@ -131,3 +131,39 @@ jobs: artifact_name: ${{ matrix.os }}-${{ matrix.profile.name }}-${{ matrix.suite.name }}-${{ github.run_id }}-${{ github.run_number }}-${{ github.run_attempt }} suites: ${{ matrix.suite.value }} maven_opts: ${{ matrix.profile.maven_opts }} + + # Java tests with native features + macos-aarch64-test-features: + strategy: + matrix: + os: [macos-14] + java_version: [17] + features: + - value: "hdfs-opendal" + suites: | + org.apache.comet.parquet.ParquetReadFromFakeHadoopFsSuite + org.apache.comet.parquet.ParquetReadFromHdfsSuite + - value: "hdfs" + suites: | + org.apache.comet.parquet.ParquetReadFromHdfsSuite + fail-fast: false + name: ${{ matrix.os }}/java ${{ matrix.java_version }}-features [${{ matrix.features.value }}] + runs-on: ${{ matrix.os }} + + steps: + - uses: actions/checkout@v5 + - name: Setup Rust & Java toolchain + uses: ./.github/actions/setup-macos-builder + with: + rust-version: ${{env.RUST_VERSION}} + jdk-version: ${{ matrix.java_version }} + jdk-architecture: aarch64 + protoc-architecture: aarch_64 + - name: Java test steps + uses: ./.github/actions/java-test + with: + artifact_name: ${{ matrix.os }}-java-${{ matrix.java_version }}-features-${{ matrix.features.value }}-${{ github.run_id }}-${{ github.run_number }}-${{ github.run_attempt }} + features: ${{ matrix.features.value }} + maven_opts: "-Dtest=none" + suites: ${{ matrix.features.suites }} + upload-test-reports: true diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index 16a8a7316d..e6828237ae 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -74,8 +74,8 @@ aws-credential-types = { workspace = true } parking_lot = "0.12.3" datafusion-comet-objectstore-hdfs = { path = "../hdfs", optional = true, default-features = false, features = ["hdfs"] } -object_store_opendal = {version = "0.54.0", optional = true} -hdfs-sys = {version = "0.3", optional = true, features = ["hdfs_3_3"]} +object_store_opendal = { version = "0.54.0", optional = true } +hdfs-sys = { version = "0.3", optional = true, features = ["hdfs_3_3"] } opendal = { version ="0.54.0", optional = true, features = ["services-hdfs"] } [target.'cfg(target_os = "linux")'.dependencies] diff --git a/spark/src/test/scala/org/apache/comet/WithHdfsCluster.scala b/spark/src/test/scala/org/apache/comet/WithHdfsCluster.scala index 49124d63e5..30d1581696 100644 --- a/spark/src/test/scala/org/apache/comet/WithHdfsCluster.scala +++ b/spark/src/test/scala/org/apache/comet/WithHdfsCluster.scala @@ -74,6 +74,9 @@ trait WithHdfsCluster extends Logging { } def stopHdfsCluster(): Unit = { + // wait for hdfs async blocking thread exit to avoid jvm crash, see: + // https://github.com/apache/datafusion-comet/issues/2354 + Thread.sleep(2000) if (hdfsCluster != null) hdfsCluster.shutdown(true) if (hadoopConfDir != null) FileUtils.deleteDirectory(hadoopConfDir) } diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala index f4a8b5ed82..03f9437b29 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala @@ -26,7 +26,7 @@ import java.util.UUID import org.apache.commons.io.FileUtils import org.apache.spark.SparkConf import org.apache.spark.sql.{CometTestBase, DataFrame, SaveMode} -import org.apache.spark.sql.comet.CometNativeScanExec +import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.functions.{col, sum} @@ -45,10 +45,10 @@ class ParquetReadFromFakeHadoopFsSuite extends CometTestBase with AdaptiveSparkP } override def beforeAll(): Unit = { - // Initialize fake root dir - fake_root_dir = Files.createTempDirectory(s"comet_fake_${UUID.randomUUID().toString}").toFile // Initialize Spark session super.beforeAll() + // Initialize fake root dir + fake_root_dir = Files.createTempDirectory(s"comet_fake_${UUID.randomUUID().toString}").toFile } protected override def afterAll(): Unit = { @@ -62,16 +62,21 @@ class ParquetReadFromFakeHadoopFsSuite extends CometTestBase with AdaptiveSparkP } private def assertCometNativeScanOnFakeFs(df: DataFrame): Unit = { - val scans = collect(df.queryExecution.executedPlan) { case p: CometNativeScanExec => - p + val scans = collect(df.queryExecution.executedPlan) { + case p: CometNativeScanExec => + assert( + p.nativeOp.getNativeScan + .getFilePartitions(0) + .getPartitionedFile(0) + .getFilePath + .startsWith(FakeHDFSFileSystem.PREFIX)) + p + case p: CometScanExec if p.scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT => + assert(p.toString().contains(FakeHDFSFileSystem.PREFIX)) + p } assert(scans.size == 1) - assert( - scans.head.nativeOp.getNativeScan - .getFilePartitions(0) - .getPartitionedFile(0) - .getFilePath - .startsWith(FakeHDFSFileSystem.PREFIX)) + } // This test fails for 'hdfs' but succeeds for 'open-dal'. 'hdfs' requires this fix @@ -82,10 +87,13 @@ class ParquetReadFromFakeHadoopFsSuite extends CometTestBase with AdaptiveSparkP val testFilePath = s"${FakeHDFSFileSystem.PREFIX}${fake_root_dir.getAbsolutePath}/data/test-file.parquet" writeTestParquetFile(testFilePath) - withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION) { - val df = spark.read.format("parquet").load(testFilePath).agg(sum(col("id"))) - assertCometNativeScanOnFakeFs(df) - assert(df.first().getLong(0) == 499500) + Seq(CometConf.SCAN_NATIVE_DATAFUSION, CometConf.SCAN_NATIVE_ICEBERG_COMPAT).foreach { + scanImpl => + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanImpl) { + val df = spark.read.format("parquet").load(testFilePath).agg(sum(col("id"))) + assertCometNativeScanOnFakeFs(df) + assert(df.first().getLong(0) == 499500) + } } } } diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromHdfsSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromHdfsSuite.scala new file mode 100644 index 0000000000..719e03d6e9 --- /dev/null +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromHdfsSuite.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.parquet + +import org.apache.spark.sql.{CometTestBase, DataFrame, SaveMode} +import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec} +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.functions.{col, sum} + +import org.apache.comet.{CometConf, WithHdfsCluster} + +class ParquetReadFromHdfsSuite + extends CometTestBase + with AdaptiveSparkPlanHelper + with WithHdfsCluster { + + override protected def createSparkSession: SparkSessionType = { + val sparkSession = super.createSparkSession + // start HDFS cluster and add hadoop conf + startHdfsCluster() + sparkSession.sparkContext.hadoopConfiguration.addResource(getHadoopConfFile) + sparkSession + } + + protected override def afterAll(): Unit = { + super.afterAll() + stopHdfsCluster() + } + + private def writeTestParquetFile(filePath: String): Unit = { + val df = spark.range(0, 1000) + df.write.format("parquet").mode(SaveMode.Overwrite).save(filePath) + } + + private def assertCometNativeScanOnHDFS(df: DataFrame): Unit = { + val scans = collect(df.queryExecution.executedPlan) { + case p: CometNativeScanExec => + assert( + p.nativeOp.getNativeScan + .getFilePartitions(0) + .getPartitionedFile(0) + .getFilePath + .startsWith("hdfs://")) + p + case p: CometScanExec if p.scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT => + assert(p.toString().contains("hdfs://")) + p + } + assert(scans.size == 1) + } + + test("test native_datafusion scan on hdfs") { + assume(isFeatureEnabled("hdfs") || isFeatureEnabled("hdfs-opendal")) + + withTmpHdfsDir { dir => + { + val testFilePath = dir.toString + writeTestParquetFile(testFilePath) + Seq(CometConf.SCAN_NATIVE_DATAFUSION, CometConf.SCAN_NATIVE_ICEBERG_COMPAT).foreach { + scanImpl => + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanImpl) { + val df = spark.read.format("parquet").load(testFilePath).agg(sum(col("id"))) + assertCometNativeScanOnHDFS(df) + assert(df.first().getLong(0) == 499500) + } + } + } + } + } +}