Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,9 @@ jobs:
run: ./dev/lint-scala
- name: Java linter
run: ./dev/lint-java
- name: Spark connect jvm client mima check
if: inputs.branch != 'branch-3.2' && inputs.branch != 'branch-3.3'
run: ./dev/connect-jvm-client-mima-check
- name: Install Python linter dependencies
run: |
# TODO(SPARK-32407): Sphinx 3.1+ does not correctly index nested classes.
Expand Down
7 changes: 0 additions & 7 deletions connector/connect/client/jvm/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,6 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<!-- Use mima to perform the compatibility check -->
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>mima-core_${scala.binary.version}</artifactId>
<version>${mima.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2739,7 +2739,7 @@ class Dataset[T] private[sql] (
sparkSession.analyze(plan, proto.AnalyzePlanRequest.AnalyzeCase.SCHEMA)
}

def collectResult(): SparkResult[T] = sparkSession.execute(plan, encoder)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why make this private[sql]? For advanced use cases this is a better way of interacting with results.

private[sql] def collectResult(): SparkResult[T] = sparkSession.execute(plan, encoder)

private[sql] def withResult[E](f: SparkResult[T] => E): E = {
val result = collectResult()
Expand Down
72 changes: 72 additions & 0 deletions dev/connect-jvm-client-mima-check
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

set -o pipefail
set -e

# Go to the Spark project root directory
FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
cd "$FWDIR"
export SPARK_HOME=$FWDIR
echo $SPARK_HOME

if [[ -x "$JAVA_HOME/bin/java" ]]; then
JAVA_CMD="$JAVA_HOME/bin/java"
else
JAVA_CMD=java
fi

rm -f .connect-mima-check-result

TOOLS_CLASSPATH="$(build/sbt -DcopyDependencies=false "export tools/fullClasspath" | grep jar | tail -n1)"
CORE_CLASSPATH="$(build/sbt -DcopyDependencies=false "export core/fullClasspath" | grep jar | tail -n1)"
CATALYST_CLASSPATH="$(build/sbt -DcopyDependencies=false "export catalyst/fullClasspath" | grep jar | tail -n1)"

echo "Build sql module ..."
build/sbt sql/package
echo "Build connect-client-jvm module ..."
build/sbt connect-client-jvm/assembly

echo "Do connect-client-jvm module mima check ..."

$JAVA_CMD \
-Xmx2g \
-XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.util.jar=ALL-UNNAMED \
-cp "$TOOLS_CLASSPATH:$CORE_CLASSPATH:$CATALYST_CLASSPATH" \
org.apache.spark.tools.CheckConnectJvmClientCompatibility

RESULT_SIZE=$(wc -l .connect-mima-check-result | awk '{print $1}')

# The the file has no content if check passed.
if [[ $RESULT_SIZE -eq "0" ]]; then
ERRORS=""
else
ERRORS=$(grep ERROR .connect-mima-check-result | tail -n1)
fi

if test ! -z "$ERRORS"; then
cat .connect-mima-check-result
echo -e "connect-client-jvm module mima check failed."
echo -e "Exceptions to binary compatibility can be added in tools/CheckConnectJvmClientCompatibility.scala"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we keep the file CheckConnectJvmClientCompatibility.scala inside client/jvm and make the tools depends on the client test-jar? Then we do not need to copy past the findJar and recursiveListFiles methods. And also we can keep the code closer to the client?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the previous habit, the tools module is independent, and it does not rely on any other Spark module. I do not want to break this.

In addition, the compatibility API tests: Dataset related issues I mentioned in another pr, put it inside the client module may cause unexpected class loading.

If we are sure we don't need to check compatibility API tests: Dataset or there are other ways to solve it, I can try to put CheckConnectJvmClientCompatibility back into the client module.

Copy link
Contributor Author

@LuciferYang LuciferYang Feb 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another pr #40213 keep CheckConnectJvmClientCompatibility in client/jvm, if we all recommend keeping CheckConnectJvmClientCompatibility in the client/jvm module, we can use that one

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, I vote for change #40213 Thanks!

rm .connect-mima-check-result
exit 1
else
rm .connect-mima-check-result
echo -e "sql and connect-client-jvm module mima check passed."
fi
5 changes: 5 additions & 0 deletions tools/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@
<artifactId>classutil_${scala.binary.version}</artifactId>
<version>1.5.1</version>
</dependency>
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>mima-core_${scala.binary.version}</artifactId>
<version>1.1.0</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,51 +14,71 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.connect.client

import java.io.File
package org.apache.spark.tools

import java.io.{File, Writer}
import java.net.URLClassLoader
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}
import java.util.regex.Pattern

import scala.reflect.runtime.universe.runtimeMirror

import com.typesafe.tools.mima.core._
import com.typesafe.tools.mima.lib.MiMaLib

import org.apache.spark.sql.connect.client.util.ConnectFunSuite
import org.apache.spark.sql.connect.client.util.IntegrationTestUtils._

/**
* This test checks the binary compatibility of the connect client API against the spark SQL API
* A tool for checking the binary compatibility of the connect client API against the spark SQL API
* using MiMa. We did not write this check using a SBT build rule as the rule cannot provide the
* same level of freedom as a test. With a test we can:
* 1. Specify any two jars to run the compatibility check.
* 1. Easily make the test automatically pick up all new methods added while the client is being
* built.
*
* The test requires the following artifacts built before running:
* {{{
* spark-sql
* spark-connect-client-jvm
* }}}
* To build the above artifact, use e.g. `build/sbt package` or `build/mvn clean install
* -DskipTests`.
*
* When debugging this test, if any changes to the client API, the client jar need to be built
* before running the test. An example workflow with SBT for this test:
* 1. Compatibility test has reported an unexpected client API change.
* 1. Fix the wrong client API.
* 1. Build the client jar: `build/sbt package`
* 1. Run the test again: `build/sbt "testOnly
* org.apache.spark.sql.connect.client.CompatibilitySuite"`
* We can run this check by executing the `dev/connect-jvm-client-mima-check`.
*/
class CompatibilitySuite extends ConnectFunSuite {
// scalastyle:off println
object CheckConnectJvmClientCompatibility {

private lazy val clientJar: File =
findJar(
"connector/connect/client/jvm",
"spark-connect-client-jvm-assembly",
"spark-connect-client-jvm")
private lazy val sparkHome: String = {
if (!sys.env.contains("SPARK_HOME")) {
throw new IllegalArgumentException("SPARK_HOME is not set.")
}
sys.env("SPARK_HOME")
}

private lazy val sqlJar: File = findJar("sql/core", "spark-sql", "spark-sql")
def main(args: Array[String]): Unit = {
var resultWriter: Writer = null
try {
resultWriter =
Files.newBufferedWriter(Paths.get(
s"$sparkHome/.connect-mima-check-result"), StandardCharsets.UTF_8)
val clientJar: File =
findJar("connector/connect/client/jvm", "spark-connect-client-jvm-assembly")
val sqlJar: File = findJar("sql/core", "spark-sql")
val problems = checkMiMaCompatibility(clientJar, sqlJar)
if (problems.nonEmpty) {
resultWriter.write(s"ERROR: Comparing client jar: $clientJar and and sql jar: $sqlJar \n")
resultWriter.write(s"problems: \n")
resultWriter.write(s"${problems.map(p => p.description("client")).mkString("\n")}")
resultWriter.write("\n")
}
val incompatibleApis = checkDatasetApiCompatibility(clientJar, sqlJar)
if (incompatibleApis.nonEmpty) {
resultWriter.write("ERROR: Dataset apis incompatible with the sql module include: \n")
resultWriter.write(incompatibleApis.mkString("\n"))
resultWriter.write("\n")
}
} catch {
case e: Exception =>
resultWriter.write(s"ERROR: ${e.getMessage}")
} finally {
if (resultWriter != null) {
resultWriter.close()
}
}
}

/**
* MiMa takes an old jar (sql jar) and a new jar (client jar) as inputs and then reports all
Expand All @@ -67,7 +87,7 @@ class CompatibilitySuite extends ConnectFunSuite {
* need to be checked. Then exclude rules are applied to filter out all unsupported methods in
* the client classes.
*/
test("compatibility MiMa tests") {
private def checkMiMaCompatibility(clientJar: File, sqlJar: File): List[Problem] = {
val mima = new MiMaLib(Seq(clientJar, sqlJar))
val allProblems = mima.collectProblems(sqlJar, clientJar, List.empty)
val includedRules = Seq(
Expand Down Expand Up @@ -178,37 +198,72 @@ class CompatibilitySuite extends ConnectFunSuite {

// TypedColumn
ProblemFilters.exclude[Problem]("org.apache.spark.sql.TypedColumn.this"))
val problems = allProblems
allProblems
.filter { p =>
includedRules.exists(rule => rule(p))
}
.filter { p =>
excludeRules.forall(rule => rule(p))
}

if (problems.nonEmpty) {
fail(
s"\nComparing client jar: $clientJar\nand sql jar: $sqlJar\n" +
problems.map(p => p.description("client")).mkString("\n"))
}
}

test("compatibility API tests: Dataset") {
val clientClassLoader: URLClassLoader = new URLClassLoader(Seq(clientJar.toURI.toURL).toArray)
val sqlClassLoader: URLClassLoader = new URLClassLoader(Seq(sqlJar.toURI.toURL).toArray)
private def checkDatasetApiCompatibility(clientJar: File, sqlJar: File): Seq[String] = {

val clientClass = clientClassLoader.loadClass("org.apache.spark.sql.Dataset")
val sqlClass = sqlClassLoader.loadClass("org.apache.spark.sql.Dataset")
def methods(jar: File, className: String): Seq[String] = {
val classLoader: URLClassLoader = new URLClassLoader(Seq(jar.toURI.toURL).toArray)
val mirror = runtimeMirror(classLoader)
// scalastyle:off classforname
val classSymbol =
mirror.classSymbol(Class.forName(className, false, classLoader))
// scalastyle:on classforname
classSymbol.typeSignature.members
.filter(_.isMethod)
.map(_.asMethod)
.filter(m => m.isPublic)
.map(_.fullName)
.toSeq
}

val newMethods = clientClass.getMethods
val oldMethods = sqlClass.getMethods
val className = "org.apache.spark.sql.Dataset"
val clientMethods = methods(clientJar, className)
val sqlMethods = methods(sqlJar, className)

// For now we simply check the new methods is a subset of the old methods.
newMethods
.map(m => m.toString)
.foreach(method => {
assert(oldMethods.map(m => m.toString).contains(method))
})
clientMethods.diff(sqlMethods)
}

/**
* Find a jar in the Spark project artifacts. It requires a build first (e.g. sbt package)
* so that this method can find the jar in the target folders.
*
* @return the jar
*/
private def findJar(path: String, sbtName: String): File = {
val targetDir = new File(new File(sparkHome, path), "target")
if (!targetDir.exists()) {
throw new IllegalStateException("Fail to locate the target folder: " +
s"'${targetDir.getCanonicalPath}'. " +
s"SPARK_HOME='${new File(sparkHome).getCanonicalPath}'. " +
"Make sure the spark project jars has been built (e.g. using sbt package)" +
"and the env variable `SPARK_HOME` is set correctly.")
}
val jars = recursiveListFiles(targetDir).filter { f =>
// SBT jar
f.getParentFile.getName.startsWith("scala-") &&
f.getName.startsWith(sbtName) && f.getName.endsWith(".jar")
}
// It is possible we found more than one: one built by maven, and another by SBT
if (jars.isEmpty) {
throw new IllegalStateException(
s"Failed to find the jar inside folder: ${targetDir.getCanonicalPath}")
}
println("Using jar: " + jars(0).getCanonicalPath)
jars(0) // return the first jar found
}

private def recursiveListFiles(f: File): Array[File] = {
val these = f.listFiles
these ++ these.filter(_.isDirectory).flatMap(recursiveListFiles)
}

private case class IncludeByName(name: String) extends ProblemFilter {
Expand Down