Skip to content

Spark 3.4+ / -Connect support #195

Open
@Jolanrensen

Description

@Jolanrensen
Collaborator

Activity

Jolanrensen

Jolanrensen commented on Apr 18, 2023

@Jolanrensen
CollaboratorAuthor

Seems like ScalaReflection.scala has now really deviated too far from KotlinReflection.scala requiring a major overhaul to function. Maybe it's time to try a new approach, such as #178 which would improve maintainability hugely as well as fix most compatibility issues we face.

This will require time and investment I'm not sure I have with my work on DataFrame (especially with the low number of downloads this library currently has).
Let me know if you are still an avid user and would like me to invest time into rebuilding the hacky base this library is built upon!

hawkaa

hawkaa commented on May 12, 2023

@hawkaa

Hi. I've just started looking at Spark 3.4 and the first issue we ran into was that we're missing this library. For sure it would be a big win if we could support it. 🙏

zaleslaw

zaleslaw commented on May 12, 2023

@zaleslaw

Please vote top comment if you need! Or write here like @hawkaa

shane-atg

shane-atg commented on Jul 22, 2023

@shane-atg

I am very interested in keeping this alive as well.

NatElkins

NatElkins commented on Sep 12, 2023

@NatElkins

What would be the next step towards moving forward with this?

Jolanrensen

Jolanrensen commented on Sep 13, 2023

@Jolanrensen
CollaboratorAuthor

What would be the next step towards moving forward with this?

The next step would be to investigate to find a new way to encode Kotlin Data Classes (both at the top-level of DataFrames, as well as inside columns) and keep inferring types to encoders without using KotlinReflection.scala, such that it's compatible with all versions of Scala and Spark 3.X by default. That way we can keep the API relatively the same and improve the maintainability, compatibility, and stability.
I'm not sure which mechanisms of Spark we can leverage for this; I was thinking of maybe using UDTs and a compiler plugin/annotation processor to generate the UDT classes... but that won't work for top-level tables.
Unfortunately, I'm too occupied with Kotlin DataFrame at the moment, but if someone could provide a proof of concept I'm sure I can provide some help :).

BierDav

BierDav commented on Sep 14, 2023

@BierDav

So if I understood that correctly, we will be able to create spark Dataset from a kotlinx DataFrame? That's exactly what I wanted to do, because working with spark Datasets is not that smooth.

Btw. is there currently a workaround for this?

Jolanrensen

Jolanrensen commented on Sep 14, 2023

@Jolanrensen
CollaboratorAuthor

So if I understood that correctly, we will be able to create spark Dataset from a kotlinx DataFrame? That's exactly what I wanted to do, because working with spark Datasets is not that smooth.

Btw. is there currently a workaround for this?

No, that's currently not on the roadmap. They're two separate projects, although, we are exploring interop with other databases in DataFrame (Kotlin/dataframe#408) (including Spark).

If you want to convert from Kotlin DataFrame to Spark DataSets, that's actually quite simple:

@DataSchema
data class Name(
    val firstName: String,
    val lastName: String,
)

@DataSchema
data class Person(
    val name: Name,
    val age: Int,
    val city: String?,
    val weight: Int?,
    val isHappy: Boolean,
)

// Kotlin DataFrame
val df: DataFrame<Person> = listOf(
    Person(Name("Alice", "Cooper"), 15, "London", 54, true),
    Person(Name("Bob", "Dylan"), 45, "Dubai", 87, true),
    Person(Name("Charlie", "Daniels"), 20, "Moscow", null, false),
    Person(Name("Charlie", "Chaplin"), 40, "Milan", null, true),
    Person(Name("Bob", "Marley"), 30, "Tokyo", 68, true),
    Person(Name("Alice", "Wolf"), 20, null, 55, false),
    Person(Name("Charlie", "Byrd"), 30, "Moscow", 90, true),
).toDataFrame()

withSpark {
    // Spark Dataset
    val sparkDs: DataSet<Person> = df.toList().toDS()
}

Note that df.toList()/df.toListOf<>() only works if the return type is a data class, which is also what's needed for Spark.

If you want to be able to convert any Kotlin DataFrame to a Spark Dataset<Row>, we'll need to convert the schema as well:

/**
 * Converts the DataFrame to a Spark Dataset of Rows using the provided SparkSession and JavaSparkContext.
 *
 * @param spark The SparkSession object to use for creating the DataFrame.
 * @param sc The JavaSparkContext object to use for converting the DataFrame to RDD.
 * @return A Dataset of Rows representing the converted DataFrame.
 */
fun DataFrame<*>.toSpark(spark: SparkSession, sc: JavaSparkContext): Dataset<Row> {
    val rows = sc.toRDD(rows().map(DataRow<*>::toSpark))
    return spark.createDataFrame(rows, schema().toSpark())
}

/**
 * Converts a DataRow to a Spark Row object.
 *
 * @return The converted Spark Row.
 */
fun DataRow<*>.toSpark(): Row =
    RowFactory.create(
        *values().map {
            when (it) {
                is DataRow<*> -> it.toSpark()
                else -> it
            }
        }.toTypedArray()
    )

/**
 * Converts a DataFrameSchema to a Spark StructType.
 *
 * @return The converted Spark StructType.
 */
fun DataFrameSchema.toSpark(): StructType =
    DataTypes.createStructType(
        columns.map { (name, schema) ->
            DataTypes.createStructField(name, schema.toSpark(), schema.nullable)
        }
    )

/**
 * Converts a ColumnSchema object to Spark DataType.
 *
 * @return The Spark DataType corresponding to the given ColumnSchema object.
 * @throws IllegalArgumentException if the column type or kind is unknown.
 */
fun ColumnSchema.toSpark(): DataType =
    when (this) {
        is ColumnSchema.Value -> type.toSpark() ?: error("unknown data type: $type")
        is ColumnSchema.Group -> schema.toSpark()
        is ColumnSchema.Frame -> error("nested dataframes are not supported")
        else -> error("unknown column kind: $this")
    }

/**
 * Returns the corresponding Spark DataType for a given Kotlin type.
 *
 * @return The Spark DataType that corresponds to the Kotlin type, or null if no matching DataType is found.
 */
fun KType.toSpark(): DataType? = when(this) {
    typeOf<Byte>(), typeOf<Byte?>() -> DataTypes.ByteType
    typeOf<Short>(), typeOf<Short?>() -> DataTypes.ShortType
    typeOf<Int>(), typeOf<Int?>() -> DataTypes.IntegerType
    typeOf<Long>(), typeOf<Long?>() -> DataTypes.LongType
    typeOf<Boolean>(), typeOf<Boolean?>() -> DataTypes.BooleanType
    typeOf<Float>(), typeOf<Float?>() -> DataTypes.FloatType
    typeOf<Double>(), typeOf<Double?>() -> DataTypes.DoubleType
    typeOf<String>(), typeOf<String?>() -> DataTypes.StringType
    typeOf<LocalDate>(), typeOf<LocalDate?>() -> DataTypes.DateType
    typeOf<Date>(), typeOf<Date?>() -> DataTypes.DateType
    typeOf<Timestamp>(), typeOf<Timestamp?>() -> DataTypes.TimestampType
    typeOf<Instant>(), typeOf<Instant?>() -> DataTypes.TimestampType
    typeOf<ByteArray>(), typeOf<ByteArray?>() -> DataTypes.BinaryType
    typeOf<Decimal>(), typeOf<Decimal?>() -> DecimalType.SYSTEM_DEFAULT()
    typeOf<BigDecimal>(), typeOf<BigDecimal?>() -> DecimalType.SYSTEM_DEFAULT()
    typeOf<BigInteger>(), typeOf<BigInteger?>() -> DecimalType.SYSTEM_DEFAULT()
    typeOf<CalendarInterval>(), typeOf<CalendarInterval?>() -> DataTypes.CalendarIntervalType
    else -> null
}

withSpark {
    // Spark Dataset
    val sparkDs: DataSet<Row> = df.toSpark(spark, sc)
}

Edit: for conversion the other way around, check the Wiki: https://github.com/Kotlin/kotlin-spark-api/wiki/Kotlin-DataFrame-interoperability

changed the title [-]Spark 3.4 support[/-] [+]Spark 3.4+ support[/+] on Sep 15, 2023
NatElkins

NatElkins commented on Sep 19, 2023

@NatElkins

@Jolanrensen Can you explain a little bit more about what ScalaReflection.scala and KotlinReflection.scala do, what they're for, and why the latter is a blocker to Spark 3.4 support? And what some of the considerations about using UDTs as a replacement might be?

Thank you!

Jolanrensen

Jolanrensen commented on Sep 19, 2023

@Jolanrensen
CollaboratorAuthor

@Jolanrensen Can you explain a little bit more about what ScalaReflection.scala and KotlinReflection.scala do, what they're for, and why the latter is a blocker to Spark 3.4 support? And what some of the considerations about using UDTs as a replacement might be?

Thank you!

Sure! But I gotta give a warning. I'm not the original author of the patch, just the maintainer of the rest of the library, so this will be my best understanding of what's going on.

One of the biggest features of the Kotlin Spark API is the automatic recognizing and encoding of types as Datasets. Without this, encoders would need to be given explicitly, as is the case for the Java API of Spark. This is the difference between:

spark.createDataset(listOf(1, 2, 3), Encoders.INT())
// and
listOf(1, 2, 3, 4, 5).toDS()

or even

data class Person1 @JvmOverloads constructor(
    var name: String? = null,
    var age: Int? = null,
) : Serializable

spark.createDataset(listOf(Person1("A", 1)), Encoders.bean(Person1::javaClass))

// and
data class Person2(
    val name: String,
    val age: Int,
)
listOf(Person2("A", 1)).toDS()

To do this, we need to automatically generate an encoder based on the typing information provided by the reified type parameter in toDS<>() and the encoder<>() function.

If you follow the generateEncoder function in the same file you can see we'll attempt to create an Encoder from the given KType. This can be either a predefined encoder (from ENCODERS) or some data class, Map, iterable, Array, UDT or anything we can generate an Encoder for on the fly.

An (Expression)Encoder needs two things: An Expression to serialize an object and one to deserialize it. Functions to create these serializers and deserializers exist in ScalaReflection.scala, but they of course can only handle types supported by Spark itself. We want to be able to create (de)serializers for Kotlin Data classes, plus Tuples inside data classes, arrays inside tuples inside data classes inside maps, etc. We need the logic in this file but somehow inject extra functionality. At least, that's what @asm0dey likely thought when making it :). The downside is that we keep bumping into spark-internal functions if we want to call it from the Kotlin side (not even starting about the incompatibilities between Kotlin and Scala). So, a new module was created using the same org.apache.spark.sql package (to be able to call Spark internal functions) and the code from ScalaReflection.scala was copied to KotlinReflection.scala modifying its behavior for Kotlin data class support.

To help with the de(serializing) of Kotlin-specific stuff, a schema/predefinedDt argument was added such that the schema: DataType of a certain type can be generated in Kotlin instead of Scala.

Well, this worked, but, having a large piece of copied internal code in your codebase is bound to cause some issues over time. And so it did...

After each major release of Spark it was a large hassle to keep compatibility between KotlinReflection.scala and the rest of Spark. Especially, since internal calls can change between minor releases and break on a bytecode level. This is why we have so many releases (one for each Scala12/13 and minor Spark combo). Plus, if Spark adds a new feature to 3.Z, well, they can just update their ScalaReflection.scala file. We on the other hand need to support 3.X, 3.Y, ánd 3.Z with just one codebase (which we currently do with a preprocessor, but it's not a walk in the park).

Spark 3.4 was the straw that broke the camel's back. ScalaReflection.scala changed file location and a lot was changed between the last version. It was a good wake-up call to show that this wasn't the way forward. A simple preprocessor cannot ensure compatibility between these versions anymore and who knows what else will break in Spark 3.5 or with Scala 3 even.

We need a new way to encode Kotlin Data Classes while maintaining the current flexibility but without relying on internal Spark code. Spark version bumps (even major ones) need to be doable with minor preprocessor changes. (One version of the API for ALL spark versions is unrealistic, but one for, say 3.0, 3.1, etc. will probably be fine)

There are probably several ways to do this:

  • UDT: Make data classes automatically a user-defined-type, either with an @Annotation or using a Kotlin (2.0) compiler plugin. Downside: UDTs (afaik) only allow user defined objects serialized inside others, such as Tuples, not as a top-level table-like object. This could be possible, but I don't know how. Second, I don't know how (de)serializing of other known JVM types (such as tuples, Seqs, arrays etc) inside data classes would work.
  • Java Bean: Make a compiler plugin that will convert all data classes to something like Person1 at compile time. Top-level should now work, but again I'm not sure about nested types.
  • Other clever Kotlinx reflection + Spark magic: Somehow be able to create an Encoder for any data class using reflection without the use of Spark internals or a compiler plugin. This would be the holy grail, but again, no clue how to do that.
  • Maybe even another way. I'm no Spark expert by any means. Maybe this gitbook could provide any inspiration.

Hopefully, this has given you or anyone interested enough inspiration and explanation to give it a try :) If someone can provide a proof-of-concept, I'd be happy to explore it further.

asm0dey

asm0dey commented on Sep 19, 2023

@asm0dey
Contributor
NatElkins

NatElkins commented on Sep 19, 2023

@NatElkins

Thank you for the comprehensive answer! I will ponder this for a bit.

I have a strong interest in using Kotlin and Spark together for a work project. It doesn't seem like there is a hard blocker per se (I can always just use the Java API), just that some of the nice-to-haves of this library may not be available unless I contribute a fix.

39 remaining items

added this to the 2.0.0 milestone on May 14, 2024
mdsadiqueinam

mdsadiqueinam commented on Jun 3, 2024

@mdsadiqueinam

Sorry to bother is there any expected release date for this version

Jolanrensen

Jolanrensen commented on Jun 4, 2024

@Jolanrensen
CollaboratorAuthor

@mdsadique-inam Sorry for my absence, work called and KotlinConf and DataFrame had priority.

There are still some (large) issues with notebook support (Spark and java serialization breaks everything), so until I can solve that I won't be able to release it.
What I could do, and I plan to do that hopefully soon, is to make a pre-release of this branch #218 so you can try it for yourself and I can gather feedback :).
Unfortunately that also requires some guides/docs etc. and Kotlin DataFrame still has priority to our team, meaning my work on Spark is usually after work/in the weekends so I don't dare to put a date on it.

mdsadiqueinam

mdsadiqueinam commented on Jun 4, 2024

@mdsadiqueinam

@mdsadique-inam Sorry for my absence, work called and KotlinConf and DataFrame had priority.

There are still some (large) issues with notebook support (Spark and java serialization breaks everything), so until I can solve that I won't be able to release it.
What I could do, and I plan to do that hopefully soon, is to make a pre-release of this branch #218 so you can try it for yourself and I can gather feedback :).
Unfortunately that also requires some guides/docs etc. and Kotlin DataFrame still has priority to our team, meaning my work on Spark is usually after work/in the weekends so I don't dare to put a date on it.

I understand your situation, so therefore I am also willing to contribute here, but don't know where to start, I am looking into it.

Jolanrensen

Jolanrensen commented on Jun 4, 2024

@Jolanrensen
CollaboratorAuthor

@mdsadique-inam If you're familiar with how Spark serialization works. This is what I'm currently struggling with:
https://gist.github.com/Jolanrensen/7ebcdbd0dc8daf252aa5e14e12d29409
Even without the kotlin-spark-api, running any lambda function in a notebook breaks Spark. I've tried %dumpClassesForSpark (they are stored in System.getProperty("spark.repl.class.outputDir")), with or without spark.sql.codegen.wholeStage, using anonymous objects instead of lambdas, @JvmSerializableLambda, but none seem to work. Ideas are welcome :)

Jolanrensen

Jolanrensen commented on Jun 8, 2024

@Jolanrensen
CollaboratorAuthor

Also, did anyone get spark-connect to work, like at all? I'm trying to run it with a sample project with scala 2.12 (2.13 breaks) and spark 3.5.1 (or 3.5.0), on java 8, 11, any combination, but I keep getting NoSuchMethodErrors. Even though I literally follow https://spark.apache.org/docs/latest/spark-connect-overview.html#use-spark-connect-in-standalone-applications.
I want to know if I can build something spark-connect safe, but it looks like spark-connect atm is not stable enough to try :/

Jolanrensen

Jolanrensen commented on Jun 15, 2024

@Jolanrensen
CollaboratorAuthor

Alright! I finally got a working spark-connect connection using the following setup:

Spark-connect server:

  • Spark 3.5.1
  • Scala 2.12
  • Java 1.8
  • not sure if needed, but in spark-defaults.conf:
    • spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true"
    • spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true"

Client:

  • Spark-connect 3.5.1
    • sql api as compileOnly
    • spark-connect client as implementation
  • Scala 2.12/2.13 (both work)
  • JDK 17 (toolchain, gradle, project, sourceCompatibility, the whole shebang)
  • JVM argument "--add-opens=java.base/java.nio=ALL-UNNAMED"

I cannot add a dependency on kotlin-spark-api yet because of clashes between the normal sparkSql and this sqlApi, but let's see if I can make a "watered down" version of the API with the knowledge I have now :)

Watered down version: https://github.com/Kotlin/kotlin-spark-api/tree/3.4%2B-spark-connect

Jolanrensen

Jolanrensen commented on Jun 18, 2024

@Jolanrensen
CollaboratorAuthor

Okay, that setup breaks down when you actually try to submit classes in a jar with spark.addArtifact() because the client runs a newer java version... back to the drawing board!

Now with an actual dependence on the watered down Spark API:

Spark-connect server:

  • Spark 3.5.1
  • Scala 2.13/2.12 (must match the client I'm afraid)
  • Java 17 (newest possible for Spark)
  • spark-defaults.conf trick for netty

Spark client:

  • Kotlin 2.0 works
  • Spark-connect 3.5.1
    • sql api as compileOnly
    • spark-connect client as implementation
  • Scala 2.13/2.12 (must match the server) (library as compileOnly on all modules)
  • JDK (1.)8 (sourceCompatibility, targetCompatibility, jvmTarget)
  • Toolchain can be java 17, as long as you use "--add-opens=java.base/java.nio=ALL-UNNAMED"
  • use shadow and runShadow to run the project with spark.addArtifact("path to jar")

working branch: https://github.com/Kotlin/kotlin-spark-api/tree/71f115a9fa0ebec4b44e5bc3857e0fc7bacc190b

Encoding seems to work, but UDTs don't :(

changed the title [-]Spark 3.4+ support[/-] [+]Spark 3.4+ / -Connect support[/+] on Jun 20, 2024
Jolanrensen

Jolanrensen commented on Jun 20, 2024

@Jolanrensen
CollaboratorAuthor

I managed to get spark-connect work in notebooks too!

https://github.com/Kotlin/kotlin-spark-api/tree/3.4%2B-spark-connect

image
image

I modified the integration with jupyter to, at the beginning of each cell, add a line that sends all created .class files to a class cache accessible by spark-connect :).

Now I need to decide how to go forward:

  • I either drop normal spark completely and make the kotlin-spark-api just use spark-connect. This is good for compatibility, but bad for getting started quickly and a lot of things are not supported (yet), like RDDs, UDTs (so I cannot support kotlinx.datetime etc.), UDAFs...
  • Or I support both spark-connect and normal spark. This is gonna be a fun java-preprocessor hell again resulting in sparkVersions * scalaVersions * useSparkConnect * modules packages... Plus normal spark in notebooks I couldn't get to work at all anymore, so that's fun.

@asm0dey Any wise words? :)

leourbina

leourbina commented on Jul 24, 2024

@leourbina

I'm happy to lend a hand, would love to see this come to fruition.

leourbina

leourbina commented on Jul 25, 2024

@leourbina

@Jolanrensen Let me know if there's a better communication channel, but happy to chat directly on how I could contribute.

Jolanrensen

Jolanrensen commented on Jul 25, 2024

@Jolanrensen
CollaboratorAuthor

@leourbina I'm always reachable at https://kotlinlang.slack.com/archives/C015B9ZRGJF for anything :) That might be a good place for ideas, discussions etc.. Better there than to flood this issue even more haha

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

      Development

      Participants

      @leourbina@zaleslaw@asm0dey@NatElkins@hawkaa

      Issue actions

        Spark 3.4+ / -Connect support · Issue #195 · Kotlin/kotlin-spark-api