diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml
index 2fea0d53b1631..6bab1335a2cb8 100644
--- a/.github/workflows/build_and_test.yml
+++ b/.github/workflows/build_and_test.yml
@@ -564,6 +564,9 @@ jobs:
run: ./dev/lint-scala
- name: Java linter
run: ./dev/lint-java
+ - name: Spark connect jvm client mima check
+ if: inputs.branch != 'branch-3.2' && inputs.branch != 'branch-3.3'
+ run: ./dev/connect-jvm-client-mima-check
- name: Install Python linter dependencies
run: |
# TODO(SPARK-32407): Sphinx 3.1+ does not correctly index nested classes.
diff --git a/connector/connect/client/jvm/pom.xml b/connector/connect/client/jvm/pom.xml
index 7606795f8203a..31c6bb52ea273 100644
--- a/connector/connect/client/jvm/pom.xml
+++ b/connector/connect/client/jvm/pom.xml
@@ -118,13 +118,6 @@
mockito-core
test
-
-
- com.typesafe
- mima-core_${scala.binary.version}
- ${mima.version}
- test
-
target/scala-${scala.binary.version}/test-classes
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
index 6ef205956306f..f08b92192f9fa 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -2739,7 +2739,7 @@ class Dataset[T] private[sql] (
sparkSession.analyze(plan, proto.AnalyzePlanRequest.AnalyzeCase.SCHEMA)
}
- def collectResult(): SparkResult[T] = sparkSession.execute(plan, encoder)
+ private[sql] def collectResult(): SparkResult[T] = sparkSession.execute(plan, encoder)
private[sql] def withResult[E](f: SparkResult[T] => E): E = {
val result = collectResult()
diff --git a/dev/connect-jvm-client-mima-check b/dev/connect-jvm-client-mima-check
new file mode 100755
index 0000000000000..00061bfb8bd0e
--- /dev/null
+++ b/dev/connect-jvm-client-mima-check
@@ -0,0 +1,72 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+set -o pipefail
+set -e
+
+# Go to the Spark project root directory
+FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
+cd "$FWDIR"
+export SPARK_HOME=$FWDIR
+echo $SPARK_HOME
+
+if [[ -x "$JAVA_HOME/bin/java" ]]; then
+ JAVA_CMD="$JAVA_HOME/bin/java"
+else
+ JAVA_CMD=java
+fi
+
+rm -f .connect-mima-check-result
+
+TOOLS_CLASSPATH="$(build/sbt -DcopyDependencies=false "export tools/fullClasspath" | grep jar | tail -n1)"
+CORE_CLASSPATH="$(build/sbt -DcopyDependencies=false "export core/fullClasspath" | grep jar | tail -n1)"
+CATALYST_CLASSPATH="$(build/sbt -DcopyDependencies=false "export catalyst/fullClasspath" | grep jar | tail -n1)"
+
+echo "Build sql module ..."
+build/sbt sql/package
+echo "Build connect-client-jvm module ..."
+build/sbt connect-client-jvm/assembly
+
+echo "Do connect-client-jvm module mima check ..."
+
+$JAVA_CMD \
+ -Xmx2g \
+ -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.util.jar=ALL-UNNAMED \
+ -cp "$TOOLS_CLASSPATH:$CORE_CLASSPATH:$CATALYST_CLASSPATH" \
+ org.apache.spark.tools.CheckConnectJvmClientCompatibility
+
+RESULT_SIZE=$(wc -l .connect-mima-check-result | awk '{print $1}')
+
+# The the file has no content if check passed.
+if [[ $RESULT_SIZE -eq "0" ]]; then
+ ERRORS=""
+else
+ ERRORS=$(grep ERROR .connect-mima-check-result | tail -n1)
+fi
+
+if test ! -z "$ERRORS"; then
+ cat .connect-mima-check-result
+ echo -e "connect-client-jvm module mima check failed."
+ echo -e "Exceptions to binary compatibility can be added in tools/CheckConnectJvmClientCompatibility.scala"
+ rm .connect-mima-check-result
+ exit 1
+else
+ rm .connect-mima-check-result
+ echo -e "sql and connect-client-jvm module mima check passed."
+fi
diff --git a/tools/pom.xml b/tools/pom.xml
index ce2aebe9e598a..bdb8707aa9a51 100644
--- a/tools/pom.xml
+++ b/tools/pom.xml
@@ -46,6 +46,11 @@
classutil_${scala.binary.version}
1.5.1
+
+ com.typesafe
+ mima-core_${scala.binary.version}
+ 1.1.0
+
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala b/tools/src/main/scala/org/apache/spark/tools/CheckConnectJvmClientCompatibility.scala
similarity index 70%
rename from connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala
rename to tools/src/main/scala/org/apache/spark/tools/CheckConnectJvmClientCompatibility.scala
index b96d2d372f317..5b5f5b6772679 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala
+++ b/tools/src/main/scala/org/apache/spark/tools/CheckConnectJvmClientCompatibility.scala
@@ -14,51 +14,71 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.spark.sql.connect.client
-import java.io.File
+package org.apache.spark.tools
+
+import java.io.{File, Writer}
import java.net.URLClassLoader
+import java.nio.charset.StandardCharsets
+import java.nio.file.{Files, Paths}
import java.util.regex.Pattern
+import scala.reflect.runtime.universe.runtimeMirror
+
import com.typesafe.tools.mima.core._
import com.typesafe.tools.mima.lib.MiMaLib
-import org.apache.spark.sql.connect.client.util.ConnectFunSuite
-import org.apache.spark.sql.connect.client.util.IntegrationTestUtils._
-
/**
- * This test checks the binary compatibility of the connect client API against the spark SQL API
+ * A tool for checking the binary compatibility of the connect client API against the spark SQL API
* using MiMa. We did not write this check using a SBT build rule as the rule cannot provide the
* same level of freedom as a test. With a test we can:
* 1. Specify any two jars to run the compatibility check.
* 1. Easily make the test automatically pick up all new methods added while the client is being
* built.
*
- * The test requires the following artifacts built before running:
- * {{{
- * spark-sql
- * spark-connect-client-jvm
- * }}}
- * To build the above artifact, use e.g. `build/sbt package` or `build/mvn clean install
- * -DskipTests`.
- *
- * When debugging this test, if any changes to the client API, the client jar need to be built
- * before running the test. An example workflow with SBT for this test:
- * 1. Compatibility test has reported an unexpected client API change.
- * 1. Fix the wrong client API.
- * 1. Build the client jar: `build/sbt package`
- * 1. Run the test again: `build/sbt "testOnly
- * org.apache.spark.sql.connect.client.CompatibilitySuite"`
+ * We can run this check by executing the `dev/connect-jvm-client-mima-check`.
*/
-class CompatibilitySuite extends ConnectFunSuite {
+// scalastyle:off println
+object CheckConnectJvmClientCompatibility {
- private lazy val clientJar: File =
- findJar(
- "connector/connect/client/jvm",
- "spark-connect-client-jvm-assembly",
- "spark-connect-client-jvm")
+ private lazy val sparkHome: String = {
+ if (!sys.env.contains("SPARK_HOME")) {
+ throw new IllegalArgumentException("SPARK_HOME is not set.")
+ }
+ sys.env("SPARK_HOME")
+ }
- private lazy val sqlJar: File = findJar("sql/core", "spark-sql", "spark-sql")
+ def main(args: Array[String]): Unit = {
+ var resultWriter: Writer = null
+ try {
+ resultWriter =
+ Files.newBufferedWriter(Paths.get(
+ s"$sparkHome/.connect-mima-check-result"), StandardCharsets.UTF_8)
+ val clientJar: File =
+ findJar("connector/connect/client/jvm", "spark-connect-client-jvm-assembly")
+ val sqlJar: File = findJar("sql/core", "spark-sql")
+ val problems = checkMiMaCompatibility(clientJar, sqlJar)
+ if (problems.nonEmpty) {
+ resultWriter.write(s"ERROR: Comparing client jar: $clientJar and and sql jar: $sqlJar \n")
+ resultWriter.write(s"problems: \n")
+ resultWriter.write(s"${problems.map(p => p.description("client")).mkString("\n")}")
+ resultWriter.write("\n")
+ }
+ val incompatibleApis = checkDatasetApiCompatibility(clientJar, sqlJar)
+ if (incompatibleApis.nonEmpty) {
+ resultWriter.write("ERROR: Dataset apis incompatible with the sql module include: \n")
+ resultWriter.write(incompatibleApis.mkString("\n"))
+ resultWriter.write("\n")
+ }
+ } catch {
+ case e: Exception =>
+ resultWriter.write(s"ERROR: ${e.getMessage}")
+ } finally {
+ if (resultWriter != null) {
+ resultWriter.close()
+ }
+ }
+ }
/**
* MiMa takes an old jar (sql jar) and a new jar (client jar) as inputs and then reports all
@@ -67,7 +87,7 @@ class CompatibilitySuite extends ConnectFunSuite {
* need to be checked. Then exclude rules are applied to filter out all unsupported methods in
* the client classes.
*/
- test("compatibility MiMa tests") {
+ private def checkMiMaCompatibility(clientJar: File, sqlJar: File): List[Problem] = {
val mima = new MiMaLib(Seq(clientJar, sqlJar))
val allProblems = mima.collectProblems(sqlJar, clientJar, List.empty)
val includedRules = Seq(
@@ -178,37 +198,72 @@ class CompatibilitySuite extends ConnectFunSuite {
// TypedColumn
ProblemFilters.exclude[Problem]("org.apache.spark.sql.TypedColumn.this"))
- val problems = allProblems
+ allProblems
.filter { p =>
includedRules.exists(rule => rule(p))
}
.filter { p =>
excludeRules.forall(rule => rule(p))
}
-
- if (problems.nonEmpty) {
- fail(
- s"\nComparing client jar: $clientJar\nand sql jar: $sqlJar\n" +
- problems.map(p => p.description("client")).mkString("\n"))
- }
}
- test("compatibility API tests: Dataset") {
- val clientClassLoader: URLClassLoader = new URLClassLoader(Seq(clientJar.toURI.toURL).toArray)
- val sqlClassLoader: URLClassLoader = new URLClassLoader(Seq(sqlJar.toURI.toURL).toArray)
+ private def checkDatasetApiCompatibility(clientJar: File, sqlJar: File): Seq[String] = {
- val clientClass = clientClassLoader.loadClass("org.apache.spark.sql.Dataset")
- val sqlClass = sqlClassLoader.loadClass("org.apache.spark.sql.Dataset")
+ def methods(jar: File, className: String): Seq[String] = {
+ val classLoader: URLClassLoader = new URLClassLoader(Seq(jar.toURI.toURL).toArray)
+ val mirror = runtimeMirror(classLoader)
+ // scalastyle:off classforname
+ val classSymbol =
+ mirror.classSymbol(Class.forName(className, false, classLoader))
+ // scalastyle:on classforname
+ classSymbol.typeSignature.members
+ .filter(_.isMethod)
+ .map(_.asMethod)
+ .filter(m => m.isPublic)
+ .map(_.fullName)
+ .toSeq
+ }
- val newMethods = clientClass.getMethods
- val oldMethods = sqlClass.getMethods
+ val className = "org.apache.spark.sql.Dataset"
+ val clientMethods = methods(clientJar, className)
+ val sqlMethods = methods(sqlJar, className)
// For now we simply check the new methods is a subset of the old methods.
- newMethods
- .map(m => m.toString)
- .foreach(method => {
- assert(oldMethods.map(m => m.toString).contains(method))
- })
+ clientMethods.diff(sqlMethods)
+ }
+
+ /**
+ * Find a jar in the Spark project artifacts. It requires a build first (e.g. sbt package)
+ * so that this method can find the jar in the target folders.
+ *
+ * @return the jar
+ */
+ private def findJar(path: String, sbtName: String): File = {
+ val targetDir = new File(new File(sparkHome, path), "target")
+ if (!targetDir.exists()) {
+ throw new IllegalStateException("Fail to locate the target folder: " +
+ s"'${targetDir.getCanonicalPath}'. " +
+ s"SPARK_HOME='${new File(sparkHome).getCanonicalPath}'. " +
+ "Make sure the spark project jars has been built (e.g. using sbt package)" +
+ "and the env variable `SPARK_HOME` is set correctly.")
+ }
+ val jars = recursiveListFiles(targetDir).filter { f =>
+ // SBT jar
+ f.getParentFile.getName.startsWith("scala-") &&
+ f.getName.startsWith(sbtName) && f.getName.endsWith(".jar")
+ }
+ // It is possible we found more than one: one built by maven, and another by SBT
+ if (jars.isEmpty) {
+ throw new IllegalStateException(
+ s"Failed to find the jar inside folder: ${targetDir.getCanonicalPath}")
+ }
+ println("Using jar: " + jars(0).getCanonicalPath)
+ jars(0) // return the first jar found
+ }
+
+ private def recursiveListFiles(f: File): Array[File] = {
+ val these = f.listFiles
+ these ++ these.filter(_.isDirectory).flatMap(recursiveListFiles)
}
private case class IncludeByName(name: String) extends ProblemFilter {