Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARKC-704 - spark 3.5.0 support #1363

Merged
merged 12 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@

3.5.0
* Support for Apache Spark 3.5 (SPARKC-704)

3.4.1
* Scala 2.13 support (SPARKC-686)

Expand Down
10 changes: 6 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Spark RDDs and Datasets/DataFrames to Cassandra tables, and execute arbitrary CQ
in your Spark applications.

- Compatible with Apache Cassandra version 2.1 or higher (see table below)
- Compatible with Apache Spark 1.0 through 3.4 ([see table below](#version-compatibility))
- Compatible with Apache Spark 1.0 through 3.5 ([see table below](#version-compatibility))
- Compatible with Scala 2.11, 2.12 and 2.13
- Exposes Cassandra tables as Spark RDDs and Datasets/DataFrames
- Maps table rows to CassandraRow objects or tuples
Expand All @@ -45,15 +45,17 @@ corresponds to the 1.6 release. The "master" branch will normally contain
development for the next connector release in progress.

Currently, the following branches are actively supported:
3.4.x ([master](https://github.com/datastax/spark-cassandra-connector/tree/master)),
3.5.x ([master](https://github.com/datastax/spark-cassandra-connector/tree/master)),
3.4.x ([b3.4](https://github.com/datastax/spark-cassandra-connector/tree/b3.4)),
3.3.x ([b3.2](https://github.com/datastax/spark-cassandra-connector/tree/b3.3)),
3.2.x ([b3.2](https://github.com/datastax/spark-cassandra-connector/tree/b3.2)),
3.1.x ([b3.1](https://github.com/datastax/spark-cassandra-connector/tree/b3.1)),
3.0.x ([b3.0](https://github.com/datastax/spark-cassandra-connector/tree/b3.0)) and
2.5.x ([b2.5](https://github.com/datastax/spark-cassandra-connector/tree/b2.5)).

| Connector | Spark | Cassandra | Cassandra Java Driver | Minimum Java Version | Supported Scala Versions |
|-----------|---------------|-----------------------| --------------------- | -------------------- |--------------------------|
|-----------|---------------|-----------------------| --------------------- | -------------------- | ----------------------- |
| 3.5 | 3.5 | 2.1.5*, 2.2, 3.x, 4.x | 4.13 | 8 | 2.12, 2.13 |
| 3.4 | 3.4 | 2.1.5*, 2.2, 3.x, 4.x | 4.13 | 8 | 2.12, 2.13 |
| 3.3 | 3.3 | 2.1.5*, 2.2, 3.x, 4.x | 4.13 | 8 | 2.12 |
| 3.2 | 3.2 | 2.1.5*, 2.2, 3.x, 4.0 | 4.13 | 8 | 2.12 |
Expand Down Expand Up @@ -105,7 +107,7 @@ This project is available on the Maven Central Repository.
For SBT to download the connector binaries, sources and javadoc, put this in your project
SBT config:

libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "3.4.1"
libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "3.5.0"

* The default Scala version for Spark 3.0+ is 2.12 please choose the appropriate build. See the
[FAQ](doc/FAQ.md) for more information.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ trait SparkCassandraITSpecBase

def getCassandraScan(plan: SparkPlan): CassandraScan = {
plan.collectLeaves.collectFirst{
case BatchScanExec(_, cassandraScan: CassandraScan, _, _, _, _, _, _, _) => cassandraScan
case BatchScanExec(_, cassandraScan: CassandraScan, _, _, _, _) => cassandraScan
}.getOrElse(throw new IllegalArgumentException("No Cassandra Scan Found"))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ trait SaiBaseSpec extends Matchers with SparkCassandraITSpecBase {

def findCassandraScan(plan: SparkPlan): CassandraScan = {
plan match {
case BatchScanExec(_, scan: CassandraScan, _, _, _, _, _, _, _) => scan
case BatchScanExec(_, scan: CassandraScan, _, _, _, _) => scan
case filter: FilterExec => findCassandraScan(filter.child)
case project: ProjectExec => findCassandraScan(project.child)
case _ => throw new NoSuchElementException("RowDataSourceScanExec was not found in the given plan")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,10 @@ class CassandraDataSourceSpec extends SparkCassandraITFlatSpecBase with DefaultC
if (pushDown)
withClue(s"Given Dataframe plan does not contain CassandraInJoin in its predecessors.\n${df.queryExecution.sparkPlan.toString()}") {
df.queryExecution.executedPlan.collectLeaves().collectFirst{
case a@BatchScanExec(_, _: CassandraInJoin, _, _, _, _, _, _, _) => a
case a@BatchScanExec(_, _: CassandraInJoin, _, _, _, _) => a
case b@AdaptiveSparkPlanExec(_, _, _, _, _) =>
b.executedPlan.collectLeaves().collectFirst{
case a@BatchScanExec(_, _: CassandraInJoin, _, _, _, _, _, _, _) => a
case a@BatchScanExec(_, _: CassandraInJoin, _, _, _, _) => a
}
} shouldBe defined
}
Expand All @@ -288,7 +288,7 @@ class CassandraDataSourceSpec extends SparkCassandraITFlatSpecBase with DefaultC
private def assertOnAbsenceOfCassandraInJoin(df: DataFrame): Unit =
withClue(s"Given Dataframe plan contains CassandraInJoin in its predecessors.\n${df.queryExecution.sparkPlan.toString()}") {
df.queryExecution.executedPlan.collectLeaves().collectFirst{
case a@BatchScanExec(_, _: CassandraInJoin, _, _, _, _, _, _, _) => a
case a@BatchScanExec(_, _: CassandraInJoin, _, _, _, _) => a
} shouldBe empty
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
object CatalystUtil {

def findCassandraScan(sparkPlan: SparkPlan): Option[CassandraScan] = {
sparkPlan.collectFirst{ case BatchScanExec(_, scan: CassandraScan, _, _, _, _, _, _, _) => scan}
sparkPlan.collectFirst{ case BatchScanExec(_, scan: CassandraScan, _, _, _, _) => scan}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.apache.spark.sql.datastax.test.empty

import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Source}
import org.apache.spark.sql.sources.StreamSourceProvider
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
Expand Down Expand Up @@ -33,7 +34,7 @@ class DefaultSource extends StreamSourceProvider {
}

override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
Dataset.ofRows(sqlContext.sparkSession, LocalRelation(schema.toAttributes, isStreaming = true))
Dataset.ofRows(sqlContext.sparkSession, LocalRelation(DataTypeUtils.toAttributes(schema), isStreaming = true))
}

override def stop() {}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.apache.spark.sql.datastax.test.monotonic

import org.apache.spark.sql.{DataFrame, Dataset, SQLContext}
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, SerializedOffset, Source}
Expand Down Expand Up @@ -43,7 +44,7 @@ class DefaultSource extends StreamSourceProvider {
}
val rows = (startValue.toInt to endValue.toInt).map( value =>
new GenericInternalRow(values = Array(value)))
Dataset.ofRows(spark.sparkSession, LocalRelation(schema.toAttributes, rows, isStreaming = true))
Dataset.ofRows(spark.sparkSession, LocalRelation(DataTypeUtils.toAttributes(schema), rows, isStreaming = true))
}

override def stop() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ case class CassandraBulkWrite(

override def createStreamingWriterFactory(info: PhysicalWriteInfo): StreamingDataWriterFactory = getWriterFactory()

override def useCommitCoordinator(): Boolean = super.useCommitCoordinator()

private def getWriterFactory(): CassandraDriverDataWriterFactory = {
CassandraDriverDataWriterFactory(
connector,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.datastax.spark
import com.datastax.spark.connector.rdd.{CassandraTableScanRDD, SparkPartitionLimit}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Row}

import scala.language.implicitConversions
Expand Down Expand Up @@ -65,7 +65,7 @@ package object connector {
new CassandraTableScanRDDFunctions(rdd)

implicit def toDataFrameFunctions(dataFrame: DataFrame): DatasetFunctions[Row] =
new DatasetFunctions[Row](dataFrame)(RowEncoder(dataFrame.schema))
new DatasetFunctions[Row](dataFrame)(ExpressionEncoder(dataFrame.schema))

implicit def toDatasetFunctions[K: Encoder](dataset: Dataset[K]): DatasetFunctions[K] =
new DatasetFunctions[K](dataset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ object CassandraDirectJoinStrategy extends Logging {
*/
def getScanExec(plan: SparkPlan): Option[BatchScanExec] = {
plan.collectFirst {
case exec @ BatchScanExec(_, _: CassandraScan, _, _, _, _, _, _, _) => exec
case exec @ BatchScanExec(_, _: CassandraScan, _, _, _, _) => exec
}
}

Expand Down Expand Up @@ -205,7 +205,7 @@ object CassandraDirectJoinStrategy extends Logging {
def hasCassandraChild[T <: QueryPlan[T]](plan: T): Boolean = {
plan.children.size == 1 && plan.children.exists {
case DataSourceV2ScanRelation(DataSourceV2Relation(_: CassandraTable, _, _, _, _), _, _, _, _) => true
case BatchScanExec(_, _: CassandraScan, _, _, _, _, _, _, _) => true
case BatchScanExec(_, _: CassandraScan, _, _, _, _) => true
case _ => false
}
}
Expand Down Expand Up @@ -238,7 +238,7 @@ object CassandraDirectJoinStrategy extends Logging {
originalOutput: Seq[Attribute]): SparkPlan = {
val reordered = plan match {
//This may be the only node in the Plan
case BatchScanExec(_, _: CassandraScan, _, _, _, _, _, _, _) => directJoin
case BatchScanExec(_, _: CassandraScan, _, _, _, _) => directJoin
// Plan has children
case normalPlan => normalPlan.transform {
case penultimate if hasCassandraChild(penultimate) =>
Expand Down
10 changes: 5 additions & 5 deletions doc/0_quick_start.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ Configure a new Scala project with the Apache Spark and dependency.

The dependencies are easily retrieved via Maven Central

libraryDependencies += "com.datastax.spark" % "spark-cassandra-connector_2.12" % "3.4.1"
libraryDependencies += "com.datastax.spark" % "spark-cassandra-connector_2.12" % "3.5.0"

The spark-packages libraries can also be used with spark-submit and spark shell, these
commands will place the connector and all of its dependencies on the path of the
Spark Driver and all Spark Executors.

$SPARK_HOME/bin/spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.12:3.4.1
$SPARK_HOME/bin/spark-submit --packages com.datastax.spark:spark-cassandra-connector_2.12:3.4.1
$SPARK_HOME/bin/spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.12:3.5.0
$SPARK_HOME/bin/spark-submit --packages com.datastax.spark:spark-cassandra-connector_2.12:3.5.0

For the list of available versions, see:
- https://repo1.maven.org/maven2/com/datastax/spark/spark-cassandra-connector_2.12/

Expand All @@ -42,7 +42,7 @@ and *all* of its dependencies on the Spark Class PathTo configure
the default Spark Configuration pass key value pairs with `--conf`

$SPARK_HOME/bin/spark-shell --conf spark.cassandra.connection.host=127.0.0.1 \
--packages com.datastax.spark:spark-cassandra-connector_2.12:3.4.1
--packages com.datastax.spark:spark-cassandra-connector_2.12:3.5.0
--conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions

This command would set the Spark Cassandra Connector parameter
Expand Down
2 changes: 1 addition & 1 deletion doc/13_spark_shell.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Find additional versions at [Spark Packages](https://repo1.maven.org/maven2/com/
```bash
cd spark/install/dir
#Include the --master if you want to run against a spark cluster and not local mode
./bin/spark-shell [--master sparkMasterAddress] --jars yourAssemblyJar --packages com.datastax.spark:spark-cassandra-connector_2.12:3.4.1 --conf spark.cassandra.connection.host=yourCassandraClusterIp
./bin/spark-shell [--master sparkMasterAddress] --jars yourAssemblyJar --packages com.datastax.spark:spark-cassandra-connector_2.12:3.5.0 --conf spark.cassandra.connection.host=yourCassandraClusterIp
```

By default spark will log everything to the console and this may be a bit of an overload. To change this copy and modify the `log4j.properties` template file
Expand Down
2 changes: 1 addition & 1 deletion doc/15_python.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ shell similarly to how the spark shell is started. The preferred method is now t

```bash
./bin/pyspark \
--packages com.datastax.spark:spark-cassandra-connector_2.12:3.4.1 \
--packages com.datastax.spark:spark-cassandra-connector_2.12:3.5.0 \
--conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions
```

Expand Down
4 changes: 2 additions & 2 deletions project/Versions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ object Versions {
val JUnitInterface = "0.11"
val Mockito = "1.10.19"

val ApacheSpark = "3.4.1"
val SparkJetty = "9.4.50.v20221201"
val ApacheSpark = "3.5.0"
val SparkJetty = "9.4.51.v20230217"
val SolrJ = "8.3.0"

val ScalaCompat = "2.11.0"
Expand Down
Loading