Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions connector/connect/client/jvm/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
<properties>
<sbt.project.name>connect-client-jvm</sbt.project.name>
<guava.version>31.0.1-jre</guava.version>
<mima.version>1.1.0</mima.version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The latest version is 1.1.1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there is a bug in 1.1.1, where the MiMa will not be able to check the class methods if the object is marked private. Thus I used the same one that our SBT build uses, which is 1.1.0.

</properties>

<dependencies>
Expand Down Expand Up @@ -92,6 +93,13 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<!-- Use mima to perform the compatibility check -->
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use SBT to check this instead of Maven? We have one place for MiMa so far in SBT (See also project/MimaBuild.scala, and dev/mima)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SBT MiMa check has some limitations to run as a SBT rule:
It is the best for a stable API. e.g. current vs previous. It is not very friendly to configure to test e.g. scala-client vs sql while we are actively working on the scala-client API.
To be more specific, the problems I hit were:

  1. I cannot configure the MiMa rule to find the current SQL SNAPSHOT jar.
  2. I cannot use ClassLoader correctly in the SBT rule to load all methods in the client API.

As a result, I end up this test where we have more freedom to grow the API test coverage with the client API.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotya. Let's probably add a couple of comments here and there to make it clear .. I am sure this is confusing to other developers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @dongjoon-hyun , also cc @pan3793 Do you have any suggestions for this?

Copy link
Contributor Author

@zhenlineo zhenlineo Jan 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can check out the MiMa SBT impl I did here: zhenlineo#6
I marked the two problems in the PR code. Unless we can fix the two problems. I do not feel it is a better solution than this PR: calling MiMa directly in a test.

<dependency>
<groupId>com.typesafe</groupId>
<artifactId>mima-core_${scala.binary.version}</artifactId>
<version>${mima.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql
import scala.collection.JavaConverters._

import org.apache.spark.connect.proto
import org.apache.spark.internal.Logging
import org.apache.spark.sql.Column.fn
import org.apache.spark.sql.connect.client.unsupported
import org.apache.spark.sql.functions.lit
Expand All @@ -44,7 +45,7 @@ import org.apache.spark.sql.functions.lit
*
* @since 3.4.0
*/
class Column private[sql] (private[sql] val expr: proto.Expression) {
class Column private[sql] (private[sql] val expr: proto.Expression) extends Logging {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we're not using this Logging

Copy link
Contributor Author

@zhenlineo zhenlineo Jan 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logging is needed for the binary compatibility: class type shall be exactly the same.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

class Column(val expr: Expression) extends Logging {

should we delete private[sql] here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My poor Scala knowledge indicate this only marks one constructor private. The intension is to mark the current constructor private. For more constructor methods, we will add in follow up PRs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm....why is it not consistent with spark.sql.Column?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our type is proto.Expression, it is not the same as Expression. I leave it to later PRs to decide how to support Expression.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, why not

class Column(val expr: proto.Expression) extends Logging { ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I am not certain if we should expose constructor this(expr: proto.Expression) and val expr:proto.Expression.
They are not the same type as this(expr: Expression) and val expr: Expression.

Our type proto.Expression is some grpc class and Expression is in sql package. They are different types from the binary code point of view.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep it private[sql] for now.


/**
* Sum of this expression and another expression.
Expand Down Expand Up @@ -80,7 +81,7 @@ class Column private[sql] (private[sql] val expr: proto.Expression) {
}
}

object Column {
private[sql] object Column {
Copy link
Contributor

@LuciferYang LuciferYang Jan 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both Column and Column$ are private[sql] access scope with this pr, so this is not an API for users?

Seem users cannot create a Column in their own package with this pr, for example:

package org.apache.spark.test

import org.scalatest.funsuite.AnyFunSuite // scalastyle:ignore funsuite
import org.apache.spark.sql.Column

class MyTestSuite
  extends AnyFunSuite // scalastyle:ignore funsuite
{
  test("new column") {
    val a = Column("a") // Symbol apply is inaccessible from this place
    val b = new Column(null) // No constructor accessible from here
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think org.apache.spark.sql.Column#apply was a public api before. If private[sql] is added to object Column, it may require more code refactoring work

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your inputs.

Looking the current Column class, the SQL API give two public APIs to construct the Column:

class Column(val expr: Expression) extends Logging {

  def this(name: String) = this(name match {
    case "*" => UnresolvedStar(None)
    case _ if name.endsWith(".*") =>
      val parts = UnresolvedAttribute.parseAttributeName(name.substring(0, name.length - 2))
      UnresolvedStar(Some(parts))
    case _ => UnresolvedAttribute.quotedString(name)
  })
...

Right now the client API is very far from completion. We will add new methods in coming PRs. I am sure there will be a Column(name: String) for users to use. But it is out the scope of this PR to include all public constructors needed for the client.

The compatibility check added with this PR will grow the check coverage when more and more methods are added in the client. The current check ensures when a new method are added, the new method should be binary compatible with the existing SQL API. When the client API coverage is up (~80%) we can switch to a more aggressive check to ensure we did not miss any methods by mistake.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I see what you mean. It seems that this is just an intermediate state. So it really doesn't need to consider users' use now.


def apply(name: String): Column = Column { builder =>
name match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import scala.collection.JavaConverters._
import org.apache.spark.connect.proto
import org.apache.spark.sql.connect.client.SparkResult

class Dataset(val session: SparkSession, private[sql] val plan: proto.Plan) {
class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan: proto.Plan)
extends Serializable {

/**
* Selects a set of column based expressions.
Expand All @@ -33,7 +34,7 @@ class Dataset(val session: SparkSession, private[sql] val plan: proto.Plan) {
* @since 3.4.0
*/
@scala.annotation.varargs
def select(cols: Column*): Dataset = session.newDataset { builder =>
def select(cols: Column*): DataFrame = session.newDataset { builder =>
builder.getProjectBuilder
.setInput(plan.getRoot)
.addAllExpressions(cols.map(_.expr).asJava)
Expand All @@ -50,7 +51,7 @@ class Dataset(val session: SparkSession, private[sql] val plan: proto.Plan) {
* @group typedrel
* @since 3.4.0
*/
def filter(condition: Column): Dataset = session.newDataset { builder =>
def filter(condition: Column): Dataset[T] = session.newDataset { builder =>
builder.getFilterBuilder.setInput(plan.getRoot).setCondition(condition.expr)
}

Expand All @@ -62,7 +63,7 @@ class Dataset(val session: SparkSession, private[sql] val plan: proto.Plan) {
* @group typedrel
* @since 3.4.0
*/
def limit(n: Int): Dataset = session.newDataset { builder =>
def limit(n: Int): Dataset[T] = session.newDataset { builder =>
builder.getLimitBuilder
.setInput(plan.getRoot)
.setLimit(n)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@
*/
package org.apache.spark.sql

import java.io.Closeable

import org.apache.arrow.memory.RootAllocator

import org.apache.spark.connect.proto
import org.apache.spark.internal.Logging
import org.apache.spark.sql.connect.client.{SparkConnectClient, SparkResult}
import org.apache.spark.sql.connect.client.util.Cleaner

Expand All @@ -43,7 +46,9 @@ import org.apache.spark.sql.connect.client.util.Cleaner
* }}}
*/
class SparkSession(private val client: SparkConnectClient, private val cleaner: Cleaner)
extends AutoCloseable {
extends Serializable
with Closeable
with Logging {

private[this] val allocator = new RootAllocator()

Expand All @@ -53,7 +58,7 @@ class SparkSession(private val client: SparkConnectClient, private val cleaner:
*
* @since 3.4.0
*/
def sql(query: String): Dataset = newDataset { builder =>
def sql(query: String): DataFrame = newDataset { builder =>
builder.setSql(proto.SQL.newBuilder().setQuery(query))
}

Expand All @@ -63,15 +68,15 @@ class SparkSession(private val client: SparkConnectClient, private val cleaner:
*
* @since 3.4.0
*/
def range(end: Long): Dataset = range(0, end)
def range(end: Long): Dataset[java.lang.Long] = range(0, end)

/**
* Creates a [[Dataset]] with a single `LongType` column named `id`, containing elements in a
* range from `start` to `end` (exclusive) with step value 1.
*
* @since 3.4.0
*/
def range(start: Long, end: Long): Dataset = {
def range(start: Long, end: Long): Dataset[java.lang.Long] = {
range(start, end, step = 1)
}

Expand All @@ -81,7 +86,7 @@ class SparkSession(private val client: SparkConnectClient, private val cleaner:
*
* @since 3.4.0
*/
def range(start: Long, end: Long, step: Long): Dataset = {
def range(start: Long, end: Long, step: Long): Dataset[java.lang.Long] = {
range(start, end, step, None)
}

Expand All @@ -91,11 +96,15 @@ class SparkSession(private val client: SparkConnectClient, private val cleaner:
*
* @since 3.4.0
*/
def range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset = {
def range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[java.lang.Long] = {
range(start, end, step, Option(numPartitions))
}

private def range(start: Long, end: Long, step: Long, numPartitions: Option[Int]): Dataset = {
private def range(
start: Long,
end: Long,
step: Long,
numPartitions: Option[Int]): Dataset[java.lang.Long] = {
newDataset { builder =>
val rangeBuilder = builder.getRangeBuilder
.setStart(start)
Expand All @@ -105,11 +114,11 @@ class SparkSession(private val client: SparkConnectClient, private val cleaner:
}
}

private[sql] def newDataset(f: proto.Relation.Builder => Unit): Dataset = {
private[sql] def newDataset[T](f: proto.Relation.Builder => Unit): Dataset[T] = {
val builder = proto.Relation.newBuilder()
f(builder)
val plan = proto.Plan.newBuilder().setRoot(builder).build()
new Dataset(this, plan)
new Dataset[T](this, plan)
}

private[sql] def analyze(plan: proto.Plan): proto.AnalyzePlanResponse =
Expand All @@ -130,7 +139,7 @@ class SparkSession(private val client: SparkConnectClient, private val cleaner:

// The minimal builder needed to create a spark session.
// TODO: implements all methods mentioned in the scaladoc of [[SparkSession]]
object SparkSession {
object SparkSession extends Logging {
def builder(): Builder = new Builder()

private lazy val cleaner = {
Expand All @@ -139,7 +148,7 @@ object SparkSession {
cleaner
}

class Builder() {
class Builder() extends Logging {
private var _client = SparkConnectClient.builder().build()

def client(client: SparkConnectClient): Builder = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

package object sql {
type DataFrame = Dataset[Row]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.connect.client

import java.io.File
import java.net.URLClassLoader
import java.util.regex.Pattern

import com.typesafe.tools.mima.core._
import com.typesafe.tools.mima.lib.MiMaLib
import org.scalatest.funsuite.AnyFunSuite // scalastyle:ignore funsuite
import org.apache.spark.sql.connect.client.util.IntegrationTestUtils._

/**
* This test checks the binary compatibility of the connect client API against the spark SQL API
* using MiMa. We did not write this check using a SBT build rule as the rule cannot provide the
* same level of freedom as a test. With a test we can:
* 1. Specify any two jars to run the compatibility check.
* 1. Easily make the test automatically pick up all new methods added while the client is being
* built.
*
* The test requires the following artifacts built before running:
* {{{
* spark-sql
* spark-connect-client-jvm
* }}}
* To build the above artifact, use e.g. `sbt package` or `mvn clean install -DskipTests`.
*
* When debugging this test, if any changes to the client API, the client jar need to be built
* before running the test. An example workflow with SBT for this test:
* 1. Compatibility test has reported an unexpected client API change.
* 1. Fix the wrong client API.
* 1. Build the client jar: `sbt package`
* 1. Run the test again: `sbt "testOnly
* org.apache.spark.sql.connect.client.CompatibilitySuite"`
*/
class CompatibilitySuite extends AnyFunSuite { // scalastyle:ignore funsuite

private lazy val clientJar: File =
findJar(
"connector/connect/client/jvm",
"spark-connect-client-jvm-assembly",
"spark-connect-client-jvm")

private lazy val sqlJar: File = findJar("sql/core", "spark-sql", "spark-sql")

/**
* MiMa takes an old jar (sql jar) and a new jar (client jar) as inputs and then reports all
* incompatibilities found in the new jar. The incompatibility result is then filtered using
* include and exclude rules. Include rules are first applied to find all client classes that
* need to be checked. Then exclude rules are applied to filter out all unsupported methods in
* the client classes.
*/
test("compatibility MiMa tests") {
val mima = new MiMaLib(Seq(clientJar, sqlJar))
Copy link
Contributor

@LuciferYang LuciferYang Jan 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cloud we add assume(sys.env.contains("GITHUB_ACTIONS")) before line 69 to make this test run only with GA? @HyukjinKwon @zhenlineo

val allProblems = mima.collectProblems(sqlJar, clientJar, List.empty)
val includedRules = Seq(
IncludeByName("org.apache.spark.sql.Column"),
IncludeByName("org.apache.spark.sql.Column$"),
IncludeByName("org.apache.spark.sql.Dataset"),
// TODO(SPARK-42175) Add the Dataset object definition
// IncludeByName("org.apache.spark.sql.Dataset$"),
IncludeByName("org.apache.spark.sql.DataFrame"),
IncludeByName("org.apache.spark.sql.SparkSession"),
IncludeByName("org.apache.spark.sql.SparkSession$")) ++ includeImplementedMethods(clientJar)
val excludeRules = Seq(
// Filter unsupported rules:
// Two sql overloading methods are marked experimental in the API and skipped in the client.
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.sql"),
// Skip all shaded dependencies in the client.
ProblemFilters.exclude[Problem]("org.sparkproject.*"),
ProblemFilters.exclude[Problem]("org.apache.spark.connect.proto.*"))
val problems = allProblems
.filter { p =>
includedRules.exists(rule => rule(p))
}
.filter { p =>
excludeRules.forall(rule => rule(p))
}

if (problems.nonEmpty) {
fail(
s"\nComparing client jar: $clientJar\nand sql jar: $sqlJar\n" +
problems.map(p => p.description("client")).mkString("\n"))
}
}

test("compatibility API tests: Dataset") {
val clientClassLoader: URLClassLoader = new URLClassLoader(Seq(clientJar.toURI.toURL).toArray)
val sqlClassLoader: URLClassLoader = new URLClassLoader(Seq(sqlJar.toURI.toURL).toArray)

val clientClass = clientClassLoader.loadClass("org.apache.spark.sql.Dataset")
Copy link
Contributor

@LuciferYang LuciferYang Feb 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HI ~ @zhenlineo @HyukjinKwon , there may be some problems with this test case, I add some logs as follows:
https://github.com/apache/spark/compare/master...LuciferYang:spark:CompatibilitySuite?expand=1

image

From the log, I found that both clientClass and sqlClass are loaded from file:/home/runner/work/spark/spark/connector/connect/client/jvm/target/scala-2.12/spark-connect-client-jvm_2.12-3.5.0-SNAPSHOT.jar, and the contents of newMethods and oldMethods are the same ...

https://pipelines.actions.githubusercontent.com/serviceHosts/c184045e-b556-4e78-b8ef-fb37b2eda9a3/_apis/pipelines/1/runs/62963/signedlogcontent/14?urlExpires=2023-02-27T08%3A53%3A13.6716136Z&urlSigningMethod=HMACV1&urlSignature=XkRKqix4ZapzEeczn7ZhWpAFhSnwWW74UX%2BKUhocftc%3D

image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At present, using this way to check, at least 4 apis should be incompatible:

private[sql] def withResult[E](f: SparkResult => E): E
def collectResult(): SparkResult
private[sql] def analyze: proto.AnalyzePlanResponse
private[sql] val plan: proto.Plan

Because when using Java reflection, the above 4 methods will be identified as public apis, even though three of them are private [sql], and these four apis do not exist in the Dataset of the sql module:

public java.lang.Object org.apache.spark.sql.Dataset.withResult(scala.Function1)$
public org.apache.spark.sql.connect.client.SparkResult org.apache.spark.sql.Dataset.collectResult()$
public org.apache.spark.connect.proto.AnalyzePlanResponse org.apache.spark.sql.Dataset.analyze()$
public org.apache.spark.connect.proto.Plan org.apache.spark.sql.Dataset.plan()$

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also cc @hvanhovell

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks so much for looking into this. The dataset test is not as important as Mima test. I will check if we can fix the issue you found. Otherwise it should be safe to delete as the test is already covered by mima.

Copy link
Contributor

@LuciferYang LuciferYang Feb 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @zhenlineo, If it has been covered, we can delete it :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the investigation, @LuciferYang .

val sqlClass = sqlClassLoader.loadClass("org.apache.spark.sql.Dataset")

val newMethods = clientClass.getMethods
val oldMethods = sqlClass.getMethods

// For now we simply check the new methods is a subset of the old methods.
newMethods
.map(m => m.toString)
.foreach(method => {
assert(oldMethods.map(m => m.toString).contains(method))
})
}

/**
* Find all methods that are implemented in the client jar. Once all major methods are
* implemented we can switch to include all methods under the class using ".*" e.g.
* "org.apache.spark.sql.Dataset.*"
*/
private def includeImplementedMethods(clientJar: File): Seq[IncludeByName] = {
val clsNames = Seq(
"org.apache.spark.sql.Column",
// TODO(SPARK-42175) Add all overloading methods. Temporarily mute compatibility check for \
// the Dataset methods, as too many overload methods are missing.
// "org.apache.spark.sql.Dataset",
"org.apache.spark.sql.SparkSession")

val clientClassLoader: URLClassLoader = new URLClassLoader(Seq(clientJar.toURI.toURL).toArray)
clsNames
.flatMap { clsName =>
val cls = clientClassLoader.loadClass(clsName)
// all distinct method names
cls.getMethods.map(m => s"$clsName.${m.getName}").toSet
}
.map { fullName =>
IncludeByName(fullName)
}
}

private case class IncludeByName(name: String) extends ProblemFilter {
private[this] val pattern =
Pattern.compile(name.split("\\*", -1).map(Pattern.quote).mkString(".*"))

override def apply(problem: Problem): Boolean = {
pattern.matcher(problem.matchName.getOrElse("")).matches
}
}
}
Loading