Skip to content

Conversation

@hvanhovell
Copy link
Contributor

@hvanhovell hvanhovell commented Oct 2, 2023

What changes were proposed in this pull request?

This PR fixes shading for the Spark Connect Scala Client maven build. The following things are addressed:

  • Guava and protobuf are included in the shaded jars. These were missing, and were causing users to see ClassNotFoundExceptions.
  • Fixed duplicate shading of guava. We use the parent pom's location now.
  • Fixed duplicate Netty dependency (shaded and transitive). One was used for GRPC and the other was needed by Arrow. This was fixed by pulling arrow into the shaded jar.
  • Use the same package as the shading defined in the parent package.

Why are the changes needed?

The maven artifacts for the Spark Connect Scala Client are currently broken.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Manual tests.

Step 1: Build new shaded library and install it in local maven repository

build/mvn clean install -pl connector/connect/client/jvm -am -DskipTests

Step 2: Start Connect Server

connector/connect/bin/spark-connect

Step 3: Launch REPL using the newly created library

This step requires coursier to be installed.
cs launch --jvm zulu:17.0.8 --scala 2.13.9 -r m2Local com.lihaoyi:::ammonite:2.5.11 org.apache.spark::spark-connect-client-jvm:4.0.0-SNAPSHOT --java-opt --add-opens=java.base/java.nio=ALL-UNNAMED -M org.apache.spark.sql.application.ConnectRepl

Step 4: Run a bunch of commands:

// Check version
spark.version

// Run a simple query
{
  spark.range(1, 10000, 1)
    .select($"id", $"id" % 5 as "group", rand(1).as("v1"), rand(2).as("v2"))
    .groupBy($"group")
    .agg(
      avg($"v1").as("v1_avg"),
      avg($"v2").as("v2_avg"))
    .show()
}

// Run a streaming query
{
  import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger
  val query_name = "simple_streaming"
  val stream = spark.readStream
    .format("rate")
    .option("numPartitions", "1")
    .option("rowsPerSecond", "10")
    .load()
    .withWatermark("timestamp", "10 milliseconds")
    .groupBy(window(col("timestamp"), "10 milliseconds"))
    .count()
    .selectExpr("window.start as timestamp", "count as num_events")
    .writeStream
    .format("memory")
    .queryName(query_name)
    .trigger(ProcessingTimeTrigger.create("10 milliseconds"))
  // run for 20 seconds
  val query = stream.start()
  val start = System.currentTimeMillis()
  val end = System.currentTimeMillis() + 20 * 1000
  while (System.currentTimeMillis() < end) {
    println(s"time: ${System.currentTimeMillis() - start} ms")
    println(query.status)
    spark.sql(s"select * from ${query_name}").show()
    Thread.sleep(500)
  }
  query.stop()
}

<include>com.google.code.findbugs:*</include>
<include>com.google.code.gson:*</include>
<include>com.google.errorprone:*</include>
<include>com.google.guava:*</include>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guava in included by the parent pom.xml.

<artifactId>maven-shade-plugin</artifactId>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<promoteTransitiveDependencies>true</promoteTransitiveDependencies>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was probably broken as well.

@hvanhovell hvanhovell requested a review from LuciferYang October 2, 2023 04:39
Copy link
Contributor

@LuciferYang LuciferYang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I verified this feature on branch-3.5, this PR works fine when using Java 8 and Java 11, I believe this fix is effective. However, for the sample code given in the PR description, it may need to be changed to:

spark.range(1, 10000, 1).
  select($"id", $"id" % 5 as "group", rand(1).as("v1"), rand(2).as("v2")).
  groupBy($"group").
  agg(
    avg($"v1").as("v1_avg"),
    avg($"v2").as("v2_avg")).
  show()

The . seems to be at the end, not at the beginning.

For example, if it's written as:

spark.range(1, 10000, 1)
  .select($"id", $"id" % 5 as "group", rand(1).as("v1"), rand(2).as("v2"))
  .groupBy($"group")
  .agg(
    avg($"v1").as("v1_avg"),
    avg($"v2").as("v2_avg"))
  .show()

ammonite-repl seems to treat spark.range(1, 10000, 1) as a complete statement, and errors like (console):1:3 expected end-of-input are reported for the subsequent statements. I'm not sure if this has anything to do with my MacOS environment.


Unrelated to this PR, but I found that the example cannot run normally under Java 17. For example, I start ammonite-repl on Master using the following command:

coursier launch --scala 2.13.12 -r m2Local com.lihaoyi:::ammonite:2.5.11 org.apache.spark::spark-connect-client-jvm:4.0.0-SNAPSHOT -M org.apache.spark.sql.application.ConnectRepl

Then when I execute

spark.range(1, 10000, 1).
  select($"id", $"id" % 5 as "group", rand(1).as("v1"), rand(2).as("v2")).
  groupBy($"group").
  agg(
    avg($"v1").as("v1_avg"),
    avg($"v2").as("v2_avg")).
  show()

for testing, the following error occurred:

java.lang.RuntimeException: Failed to initialize MemoryUtil. Was Java started with `--add-opens=java.base/java.nio=ALL-UNNAMED`? (See https://arrow.apache.org/docs/java/install.html)
  org.sparkproject.org.apache.arrow.memory.util.MemoryUtil.<clinit>(MemoryUtil.java:143)
  org.sparkproject.org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229)
  org.sparkproject.org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224)
  org.sparkproject.org.apache.arrow.vector.ipc.ReadChannel.readFully(ReadChannel.java:87)
  org.sparkproject.org.apache.arrow.vector.ipc.message.MessageSerializer.readMessageBody(MessageSerializer.java:728)
  org.sparkproject.org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:67)
  org.apache.spark.sql.connect.client.arrow.MessageIterator.hasNext(ConcatenatingArrowStreamReader.scala:159)
  org.apache.spark.sql.connect.client.SparkResult.org$apache$spark$sql$connect$client$SparkResult$$processResponses(SparkResult.scala:126)
  org.apache.spark.sql.connect.client.SparkResult.length(SparkResult.scala:153)
  org.apache.spark.sql.Dataset.$anonfun$show$2(Dataset.scala:535)
  org.apache.spark.sql.Dataset.$anonfun$show$2$adapted(Dataset.scala:534)
  org.apache.spark.sql.Dataset.withResult(Dataset.scala:3350)
  org.apache.spark.sql.Dataset.show(Dataset.scala:534)
  org.apache.spark.sql.Dataset.show(Dataset.scala:450)
  org.apache.spark.sql.Dataset.show(Dataset.scala:405)
  org.apache.spark.sql.Dataset.show(Dataset.scala:414)
  ammonite.$sess.cmd0$Helper.<init>(cmd0.sc:6)
  ammonite.$sess.cmd0$.<clinit>(cmd0.sc:7)
java.lang.reflect.InaccessibleObjectException: Unable to make field long java.nio.Buffer.address accessible: module java.base does not "opens java.nio" to unnamed module @3196eefd
  java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:354)
  java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297)
  java.lang.reflect.Field.checkCanSetAccessible(Field.java:178)
  java.lang.reflect.Field.setAccessible(Field.java:172)
  org.sparkproject.org.apache.arrow.memory.util.MemoryUtil.<clinit>(MemoryUtil.java:88)
  org.sparkproject.org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229)
  org.sparkproject.org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224)
  org.sparkproject.org.apache.arrow.vector.ipc.ReadChannel.readFully(ReadChannel.java:87)
  org.sparkproject.org.apache.arrow.vector.ipc.message.MessageSerializer.readMessageBody(MessageSerializer.java:728)
  org.sparkproject.org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:67)
  org.apache.spark.sql.connect.client.arrow.MessageIterator.hasNext(ConcatenatingArrowStreamReader.scala:159)
  org.apache.spark.sql.connect.client.SparkResult.org$apache$spark$sql$connect$client$SparkResult$$processResponses(SparkResult.scala:126)
  org.apache.spark.sql.connect.client.SparkResult.length(SparkResult.scala:153)
  org.apache.spark.sql.Dataset.$anonfun$show$2(Dataset.scala:535)
  org.apache.spark.sql.Dataset.$anonfun$show$2$adapted(Dataset.scala:534)
  org.apache.spark.sql.Dataset.withResult(Dataset.scala:3350)
  org.apache.spark.sql.Dataset.show(Dataset.scala:534)
  org.apache.spark.sql.Dataset.show(Dataset.scala:450)
  org.apache.spark.sql.Dataset.show(Dataset.scala:405)
  org.apache.spark.sql.Dataset.show(Dataset.scala:414)
  ammonite.$sess.cmd0$Helper.<init>(cmd0.sc:6)
  ammonite.$sess.cmd0$.<clinit>(cmd0.sc:7)
...

I tried two solutions:

  1. Use -J to pass parameters:
    run coursier launch --scala 2.13.12 -r m2Local com.lihaoyi:::ammonite:2.5.11 org.apache.spark::spark-connect-client-jvm:4.0.0-SNAPSHOT -M org.apache.spark.sql.application.ConnectRepl -J"--add-opens=java.base/java.nio=ALL-UNNAMED"
  2. Use JAVA_OPTS environment variable to pass parameters:
    2.1 export JAVA_OPTS="--add-opens=java.base/java.nio=ALL-UNNAMED"
    2.2 run coursier launch --scala 2.13.12 -r m2Local com.lihaoyi:::ammonite:2.5.11 org.apache.spark::spark-connect-client-jvm:4.0.0-SNAPSHOT -M org.apache.spark.sql.application.ConnectRepl

However, the above error is still reported. For branch-3.5, the result is the same when tested with Java 17.

@hvanhovell Do you know how to solve this problem? I'm not very familiar with ammonite itself, so I'm not sure how to pass the -add-opens related options to it now. also cc @vicennial

Copy link
Contributor

@LuciferYang LuciferYang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM if test pass

@hvanhovell
Copy link
Contributor Author

@LuciferYang I have updated the code examples, now they will run in Ammonite. I am looking at the Java 17 issue.

@hvanhovell
Copy link
Contributor Author

hvanhovell commented Oct 2, 2023

@LuciferYang I have updated the coursier command to fix the JDK17 issue.

Copy link
Contributor

@LuciferYang LuciferYang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

the new command is ok

@LuciferYang
Copy link
Contributor

Final question, should the results of maven shade and sbt assembly be consistent? This PR seems to only fix maven.

@hvanhovell
Copy link
Contributor Author

@LuciferYang AFAICT the SBT is working at the moment. However we release using maven and this is very much broken at the moment. I am happy to homogenize the SBT and maven builds in a follow-up.

@hvanhovell
Copy link
Contributor Author

Merging.

@hvanhovell hvanhovell closed this in e53abbb Oct 2, 2023
hvanhovell added a commit that referenced this pull request Oct 2, 2023
…Client

### What changes were proposed in this pull request?
This PR fixes shading for the Spark Connect Scala Client maven build. The following things are addressed:
- Guava and protobuf are included in the shaded jars. These were missing, and were causing users to see `ClassNotFoundException`s.
- Fixed duplicate shading of guava. We use the parent pom's location now.
- Fixed duplicate Netty dependency (shaded and transitive). One was used for GRPC and the other was needed by Arrow. This was fixed by pulling arrow into the shaded jar.
- Use the same package as the shading defined in the parent package.

### Why are the changes needed?
The maven artifacts for the Spark Connect Scala Client are currently broken.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Manual tests.
#### Step 1:  Build new shaded library and install it in local maven repository
`build/mvn clean install -pl connector/connect/client/jvm -am -DskipTests`
#### Step 2: Start Connect Server
`connector/connect/bin/spark-connect`
#### Step 3: Launch REPL using the newly created library
This step requires [coursier](https://get-coursier.io/) to be installed.
`cs launch --jvm zulu:17.0.8 --scala 2.13.9 -r m2Local com.lihaoyi:::ammonite:2.5.11 org.apache.spark::spark-connect-client-jvm:4.0.0-SNAPSHOT --java-opt --add-opens=java.base/java.nio=ALL-UNNAMED -M org.apache.spark.sql.application.ConnectRepl`
#### Step 4: Run a bunch of commands:
```scala
// Check version
spark.version

// Run a simple query
{
  spark.range(1, 10000, 1)
    .select($"id", $"id" % 5 as "group", rand(1).as("v1"), rand(2).as("v2"))
    .groupBy($"group")
    .agg(
      avg($"v1").as("v1_avg"),
      avg($"v2").as("v2_avg"))
    .show()
}

// Run a streaming query
{
  import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger
  val query_name = "simple_streaming"
  val stream = spark.readStream
    .format("rate")
    .option("numPartitions", "1")
    .option("rowsPerSecond", "10")
    .load()
    .withWatermark("timestamp", "10 milliseconds")
    .groupBy(window(col("timestamp"), "10 milliseconds"))
    .count()
    .selectExpr("window.start as timestamp", "count as num_events")
    .writeStream
    .format("memory")
    .queryName(query_name)
    .trigger(ProcessingTimeTrigger.create("10 milliseconds"))
  // run for 20 seconds
  val query = stream.start()
  val start = System.currentTimeMillis()
  val end = System.currentTimeMillis() + 20 * 1000
  while (System.currentTimeMillis() < end) {
    println(s"time: ${System.currentTimeMillis() - start} ms")
    println(query.status)
    spark.sql(s"select * from ${query_name}").show()
    Thread.sleep(500)
  }
  query.stop()
}
```

Closes #43195 from hvanhovell/SPARK-45371.

Authored-by: Herman van Hovell <herman@databricks.com>
Signed-off-by: Herman van Hovell <herman@databricks.com>
(cherry picked from commit e53abbb)
Signed-off-by: Herman van Hovell <herman@databricks.com>
<shadedPattern>${spark.shade.packageName}.com.google</shadedPattern>
<excludes>
<!-- Guava is relocated to ${spark.shade.packageName}.guava (see the parent pom.xml) -->
<exclude>com.google.common.**</exclude>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @hvanhovell , connect uses a separate version of guava, is guava directly using the parent pom as expected?

LuciferYang pushed a commit that referenced this pull request Apr 1, 2024
### What changes were proposed in this pull request?

This PR amins to correct relocation connect guava dependency and remove duplicate connect-common from SBT build jars.

This PR cherry-pick from #43436 and #44801 as a backport to 3.5 branch.

### Why are the changes needed?

Bugfix

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Follow the steps described at #43195 (comment) to test manually.

In addition, will continue to observe the GA situation in recent days.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #45775 from Yikf/branch-3.5.

Authored-by: yikaifei <yikaifei@apache.org>
Signed-off-by: yangjie01 <yangjie01@baidu.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants