-
Notifications
You must be signed in to change notification settings - Fork 35
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark 3.4+ / -Connect support #195
Comments
Seems like ScalaReflection.scala has now really deviated too far from KotlinReflection.scala requiring a major overhaul to function. Maybe it's time to try a new approach, such as #178 which would improve maintainability hugely as well as fix most compatibility issues we face. This will require time and investment I'm not sure I have with my work on DataFrame (especially with the low number of downloads this library currently has). |
Hi. I've just started looking at Spark 3.4 and the first issue we ran into was that we're missing this library. For sure it would be a big win if we could support it. 🙏 |
Please vote top comment if you need! Or write here like @hawkaa |
I am very interested in keeping this alive as well. |
What would be the next step towards moving forward with this? |
The next step would be to investigate to find a new way to encode Kotlin Data Classes (both at the top-level of DataFrames, as well as inside columns) and keep inferring types to encoders without using KotlinReflection.scala, such that it's compatible with all versions of Scala and Spark 3.X by default. That way we can keep the API relatively the same and improve the maintainability, compatibility, and stability. |
So if I understood that correctly, we will be able to create spark Dataset from a kotlinx DataFrame? That's exactly what I wanted to do, because working with spark Datasets is not that smooth. Btw. is there currently a workaround for this? |
No, that's currently not on the roadmap. They're two separate projects, although, we are exploring interop with other databases in DataFrame (Kotlin/dataframe#408) (including Spark). If you want to convert from Kotlin DataFrame to Spark DataSets, that's actually quite simple: @DataSchema
data class Name(
val firstName: String,
val lastName: String,
)
@DataSchema
data class Person(
val name: Name,
val age: Int,
val city: String?,
val weight: Int?,
val isHappy: Boolean,
)
// Kotlin DataFrame
val df: DataFrame<Person> = listOf(
Person(Name("Alice", "Cooper"), 15, "London", 54, true),
Person(Name("Bob", "Dylan"), 45, "Dubai", 87, true),
Person(Name("Charlie", "Daniels"), 20, "Moscow", null, false),
Person(Name("Charlie", "Chaplin"), 40, "Milan", null, true),
Person(Name("Bob", "Marley"), 30, "Tokyo", 68, true),
Person(Name("Alice", "Wolf"), 20, null, 55, false),
Person(Name("Charlie", "Byrd"), 30, "Moscow", 90, true),
).toDataFrame()
withSpark {
// Spark Dataset
val sparkDs: DataSet<Person> = df.toList().toDS()
} Note that If you want to be able to convert any Kotlin DataFrame to a Spark /**
* Converts the DataFrame to a Spark Dataset of Rows using the provided SparkSession and JavaSparkContext.
*
* @param spark The SparkSession object to use for creating the DataFrame.
* @param sc The JavaSparkContext object to use for converting the DataFrame to RDD.
* @return A Dataset of Rows representing the converted DataFrame.
*/
fun DataFrame<*>.toSpark(spark: SparkSession, sc: JavaSparkContext): Dataset<Row> {
val rows = sc.toRDD(rows().map(DataRow<*>::toSpark))
return spark.createDataFrame(rows, schema().toSpark())
}
/**
* Converts a DataRow to a Spark Row object.
*
* @return The converted Spark Row.
*/
fun DataRow<*>.toSpark(): Row =
RowFactory.create(
*values().map {
when (it) {
is DataRow<*> -> it.toSpark()
else -> it
}
}.toTypedArray()
)
/**
* Converts a DataFrameSchema to a Spark StructType.
*
* @return The converted Spark StructType.
*/
fun DataFrameSchema.toSpark(): StructType =
DataTypes.createStructType(
columns.map { (name, schema) ->
DataTypes.createStructField(name, schema.toSpark(), schema.nullable)
}
)
/**
* Converts a ColumnSchema object to Spark DataType.
*
* @return The Spark DataType corresponding to the given ColumnSchema object.
* @throws IllegalArgumentException if the column type or kind is unknown.
*/
fun ColumnSchema.toSpark(): DataType =
when (this) {
is ColumnSchema.Value -> type.toSpark() ?: error("unknown data type: $type")
is ColumnSchema.Group -> schema.toSpark()
is ColumnSchema.Frame -> error("nested dataframes are not supported")
else -> error("unknown column kind: $this")
}
/**
* Returns the corresponding Spark DataType for a given Kotlin type.
*
* @return The Spark DataType that corresponds to the Kotlin type, or null if no matching DataType is found.
*/
fun KType.toSpark(): DataType? = when(this) {
typeOf<Byte>(), typeOf<Byte?>() -> DataTypes.ByteType
typeOf<Short>(), typeOf<Short?>() -> DataTypes.ShortType
typeOf<Int>(), typeOf<Int?>() -> DataTypes.IntegerType
typeOf<Long>(), typeOf<Long?>() -> DataTypes.LongType
typeOf<Boolean>(), typeOf<Boolean?>() -> DataTypes.BooleanType
typeOf<Float>(), typeOf<Float?>() -> DataTypes.FloatType
typeOf<Double>(), typeOf<Double?>() -> DataTypes.DoubleType
typeOf<String>(), typeOf<String?>() -> DataTypes.StringType
typeOf<LocalDate>(), typeOf<LocalDate?>() -> DataTypes.DateType
typeOf<Date>(), typeOf<Date?>() -> DataTypes.DateType
typeOf<Timestamp>(), typeOf<Timestamp?>() -> DataTypes.TimestampType
typeOf<Instant>(), typeOf<Instant?>() -> DataTypes.TimestampType
typeOf<ByteArray>(), typeOf<ByteArray?>() -> DataTypes.BinaryType
typeOf<Decimal>(), typeOf<Decimal?>() -> DecimalType.SYSTEM_DEFAULT()
typeOf<BigDecimal>(), typeOf<BigDecimal?>() -> DecimalType.SYSTEM_DEFAULT()
typeOf<BigInteger>(), typeOf<BigInteger?>() -> DecimalType.SYSTEM_DEFAULT()
typeOf<CalendarInterval>(), typeOf<CalendarInterval?>() -> DataTypes.CalendarIntervalType
else -> null
}
withSpark {
// Spark Dataset
val sparkDs: DataSet<Row> = df.toSpark(spark, sc)
} Edit: for conversion the other way around, check the Wiki: https://github.com/Kotlin/kotlin-spark-api/wiki/Kotlin-DataFrame-interoperability |
@Jolanrensen Can you explain a little bit more about what ScalaReflection.scala and KotlinReflection.scala do, what they're for, and why the latter is a blocker to Spark 3.4 support? And what some of the considerations about using UDTs as a replacement might be? Thank you! |
Sure! But I gotta give a warning. I'm not the original author of the patch, just the maintainer of the rest of the library, so this will be my best understanding of what's going on. One of the biggest features of the Kotlin Spark API is the automatic recognizing and encoding of types as Datasets. Without this, encoders would need to be given explicitly, as is the case for the Java API of Spark. This is the difference between: spark.createDataset(listOf(1, 2, 3), Encoders.INT())
// and
listOf(1, 2, 3, 4, 5).toDS() or even data class Person1 @JvmOverloads constructor(
var name: String? = null,
var age: Int? = null,
) : Serializable
spark.createDataset(listOf(Person1("A", 1)), Encoders.bean(Person1::javaClass))
// and
data class Person2(
val name: String,
val age: Int,
)
listOf(Person2("A", 1)).toDS()
To do this, we need to automatically generate an encoder based on the typing information provided by the reified type parameter in If you follow the An (Expression)Encoder needs two things: An Expression to serialize an object and one to deserialize it. Functions to create these serializers and deserializers exist in ScalaReflection.scala, but they of course can only handle types supported by Spark itself. We want to be able to create (de)serializers for Kotlin Data classes, plus Tuples inside data classes, arrays inside tuples inside data classes inside maps, etc. We need the logic in this file but somehow inject extra functionality. At least, that's what @asm0dey likely thought when making it :). The downside is that we keep bumping into spark-internal functions if we want to call it from the Kotlin side (not even starting about the incompatibilities between Kotlin and Scala). So, a new module was created using the same To help with the de(serializing) of Kotlin-specific stuff, a Well, this worked, but, having a large piece of copied internal code in your codebase is bound to cause some issues over time. And so it did... After each major release of Spark it was a large hassle to keep compatibility between Spark 3.4 was the straw that broke the camel's back. We need a new way to encode Kotlin Data Classes while maintaining the current flexibility but without relying on internal Spark code. Spark version bumps (even major ones) need to be doable with minor preprocessor changes. (One version of the API for ALL spark versions is unrealistic, but one for, say 3.0, 3.1, etc. will probably be fine) There are probably several ways to do this:
Hopefully, this has given you or anyone interested enough inspiration and explanation to give it a try :) If someone can provide a proof-of-concept, I'd be happy to explore it further. |
I'm happy to provide any additional guidance if needed as an author of the
original implementation (but probably Jolan knows it better than me already
:)
…On Tue, 19 Sept 2023, 20:13 Jolan Rensen, ***@***.***> wrote:
@Jolanrensen <https://github.com/Jolanrensen> Can you explain a little
bit more about what ScalaReflection.scala and KotlinReflection.scala do,
what they're for, and why the latter is a blocker to Spark 3.4 support? And
what some of the considerations about using UDTs as a replacement might be?
Thank you!
Sure! But I gotta give a warning. I'm not the original author of the
patch, just the maintainer of the rest of the library, so this will be my
best understanding of what's going on.
One of the biggest features of the Kotlin Spark API is the automatic
recognizing and encoding of types as Datasets. Without this, encoders would
need to be given explicitly, as is the case for the Java API of Spark
<https://spark.apache.org/docs/latest/sql-getting-started.html#creating-datasets>.
This is the difference between:
spark.createDataset(listOf(1, 2, 3), Encoders.INT())// andlistOf(1, 2, 3, 4, 5).toDS()
or even
data class Person1 @jvmoverloads constructor(
var name: String? = null,
var age: Int? = null,
) : Serializable
spark.createDataset(listOf(Person1("A", 1)), Encoders.bean(Person1::javaClass))
// anddata class Person2(
val name: String,
val age: Int,
)listOf(Person2("A", 1)).toDS()
To do this, we need to automatically generate an encoder based on the
typing information provided by the reified type parameter in toDS<>()
<https://github.com/Kotlin/kotlin-spark-api/blob/470bcf4dd6a0318a1cd0e947670f921f8f62969e/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/SparkSession.kt#L64>
and the encoder<>()
<https://github.com/Kotlin/kotlin-spark-api/blob/470bcf4dd6a0318a1cd0e947670f921f8f62969e/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Encoding.kt#L137>
function.
If you follow the generateEncoder function in the same file you can see
we'll attempt to create an Encoder from the given KType. This can be
either a predefined encoder (from ENCODERS
<https://github.com/Kotlin/kotlin-spark-api/blob/470bcf4dd6a0318a1cd0e947670f921f8f62969e/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Encoding.kt#L78>)
or some data class, Map, iterable, Array, UDT or anything we can generate
an Encoder for on the fly.
An (Expression)Encoder needs two things: An Expression to serialize an
object and one to deserialize it. Functions to create these serializers and
deserializers exist in ScalaReflection.scala
<https://github.com/apache/spark/blob/2f3e4e36017d16d67086fd4ecaf39636a2fb4b7c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala#L166C4-L166C4>,
but they of course can only handle types supported by Spark itself. We want
to be able to create (de)serializers for Kotlin Data classes, plus Tuples
inside data classes, arrays inside tuples inside data classes inside maps,
etc. We need the logic in this file but somehow inject extra functionality.
At least, that's what @asm0dey <https://github.com/asm0dey> likely
thought when making it :). The downside is that we keep bumping into
spark-internal functions if we want to call it from the Kotlin side (not
even starting about the incompatibilities between Kotlin and Scala). So, a new
module
<https://github.com/Kotlin/kotlin-spark-api/tree/release/core/src/main/scala>
was created using the same org.apache.spark.sql package (to be able to
call Spark internal functions) and the code from ScalaReflection.scala
was copied to KotlinReflection.scala
<https://github.com/Kotlin/kotlin-spark-api/blob/release/core/src/main/scala/org/apache/spark/sql/KotlinReflection.scala>
modifying its behavior for Kotlin data class support.
To help with the de(serializing) of Kotlin-specific stuff, a schema/
predefinedDt argument was added such that the schema: DataType of a
certain type can be generated in Kotlin
<https://github.com/Kotlin/kotlin-spark-api/blob/470bcf4dd6a0318a1cd0e947670f921f8f62969e/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Encoding.kt#L186>
instead of Scala.
Well, this worked, but, having a large piece of copied internal code in
your codebase is bound to cause some issues over time. And so it did...
After each major release of Spark it was a large hassle to keep
compatibility between KotlinReflection.scala and the rest of Spark.
Especially, since internal calls can change between minor releases and
break on a bytecode level. This is why we have so many releases (one for
each Scala12/13 and minor Spark combo). Plus, if Spark adds a new feature
to 3.Z, well, they can just update their ScalaReflection.scala file. We
on the other hand need to support 3.X, 3.Y, ánd 3.Z with just one codebase
(which we currently do with a preprocessor
<https://github.com/raydac/java-comment-preprocessor>, but it's not a
walk in the park).
Spark 3.4 was the straw that broke the camel's back. ScalaReflection.scala
changed file location
<https://github.com/apache/spark/blob/branch-3.4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala>
and a lot was changed between the last version. It was a good wake-up call
to show that this wasn't the way forward. A simple preprocessor cannot
ensure compatibility between these versions anymore and who knows what else
will break in Spark 3.5 or with Scala 3 even.
We need a new way to encode Kotlin Data Classes while maintaining the
current flexibility but without relying on internal Spark code. Spark
version bumps (even major ones) need to be doable with minor preprocessor
changes
<https://github.com/Kotlin/kotlin-spark-api/blob/470bcf4dd6a0318a1cd0e947670f921f8f62969e/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Encoding.kt#L93>.
(One version of the API for ALL spark versions is unrealistic, but one for,
say 3.0, 3.1, etc. will probably be fine)
There are probably several ways to do this:
- UDT: Make data classes automatically a user-defined-type, either
with an @annotation or using a Kotlin (2.0) compiler plugin. Downside:
UDTs (afaik) only allow user defined objects serialized inside others, such
as Tuples, not as a top-level table-like object. This could be possible,
but I don't know how. Second, I don't know how (de)serializing of other
known JVM types (such as tuples, Seqs, arrays etc) inside data classes
would work.
- Java Bean: Make a compiler plugin that will convert all data classes
to something like Person1 at compile time. Top-level should now work,
but again I'm not sure about nested types.
- Other clever Kotlinx reflection + Spark magic: Somehow be able to
create an Encoder for any data class using reflection without the use of
Spark internals or a compiler plugin. This would be the holy grail, but
again, no clue how to do that.
- Maybe even another way. I'm no Spark expert by any means. Maybe this
gitbook
<https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-ExpressionEncoder.html>
could provide any inspiration.
Hopefully, this has given you or anyone interested enough inspiration and
explanation to give it a try :) If someone can provide a proof-of-concept,
I'd be happy to explore it further.
—
Reply to this email directly, view it on GitHub
<#195 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AAJ4XAURX5OSHEINJ4EIVM3X3HOFLANCNFSM6AAAAAAW7OYH3U>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
|
Thank you for the comprehensive answer! I will ponder this for a bit. I have a strong interest in using Kotlin and Spark together for a work project. It doesn't seem like there is a hard blocker per se (I can always just use the Java API), just that some of the nice-to-haves of this library may not be available unless I contribute a fix. |
@Jolanrensen we should probably take a look at the connect API: https://spark.apache.org/docs/latest/spark-connect-overview.html |
Allowing the Spark driver and code to use different versions from the application code might indeed solve a lot of problems for us! |
I can imagine that we'll generate pojos from used data classes. in the compile time, I know you are doing something similar in dataframe, but I don't have any idea how to implement it :) |
@asm0dey A compiler plugin could do that :) |
Hi, do you have any estimate on when (or if) 3.4+ will be supported? |
@gregfriis I'm sorry, no, we currently don't have the resources to figure that out. What could help is if someone from the community could provide a proof of concept solution. That way we could better weigh the time/cost to properly support it. |
Small weekend/hobby update regarding the issue: I tried Spark Connect but locally on my machine I couldn't get it to work reliably yet. Plus it requires running Spark locally with some special script, so for now, that's not ideal. I did experiment with the potential compiler plugin route and I do have a hunch that it might be possible :). It does require some special configuration and modules, but it should be doable. The basic angle is: Converting annotated Kotlin data classes to something Spark sees as a Scala case class. This can automatically provide us with all supported (and even nested) types without having to traverse the classes ourselves. In theory, this is not that difficult, but it comes with a few gotchas:
To recreate my experiment, we need: Scala module with type retrievalStolen from object TypeRetrieval {
val universe: scala.reflect.runtime.universe.type = scala.reflect.runtime.universe
import universe._
def getType[T](clazz: Class[T]): universe.Type = {
clazz match {
// not sure about this line, without it, no spark dependencies are needed
case _ if clazz == classOf[Array[Byte]] => localTypeOf[Array[Byte]]
case _ => {
val mir = runtimeMirror(clazz.getClassLoader)
mir.classSymbol(clazz).toType
}
}
}
} Kotlin module depending on the Scala moduleThis little helper function can create a Spark inline fun <reified T : Any> encoderFor(): Encoder<T> = encoderFor(T::class.java)
fun <T> encoderFor(klass: Class<T>): Encoder<T> =
ExpressionEncoder.apply(
ScalaReflection.encoderFor(
TypeRetrieval.getType(klass),
false
) as AgnosticEncoder<T>
) Next, to limit the amount of generated code, we can create a little bridge from data classes to abstract class DataClassProduct(private val klass: KClass<*>) : Product {
override fun productPrefix(): String = klass.simpleName!!
private val params
get() = klass
.primaryConstructor!!
.parameters
.withIndex()
.associate { (i, it) -> i to it }
override fun canEqual(that: Any?): Boolean = that!!::class == klass
override fun productElement(n: Int): Any = params[n]?.let { param ->
klass.declaredMemberProperties
.firstOrNull { it.name == param.name }
?.call(this)
} ?: throw IndexOutOfBoundsException(n.toString())
override fun productArity(): Int = params.size
override fun productElementNames(): Iterator<String> = CollectionConverters.asScala(
iterator {
for (param in params.values) {
yield(param.name!!)
}
}
)
} Writing actual Kotlin data classesNOTE: we need the kotlin {
jvmToolchain(8)
compilerOptions {
javaParameters = true
}
} Let's say we now want to encode a simple data class. We could just write @SomeSparkAnnotation
data class Person(
val name: String,
val age: Int,
val hobbies: List<String>,
val address: Pair<String, Int>,
) What the compiler plugin would produceand then the compiler plugin could convert this to: data class Person(
val name: String,
val age: Int,
val hobbies: scala.collection.immutable.List<String>, // converting java.util.List to scala.collection.immutable.List
val address: scala.Tuple2<String, Int>, // converting kotlin.Pair to scala.Tuple2
): DataClassProduct(Person::class) {
// accessors for Spark
fun name() = this.name
fun age() = this.age
fun hobbies() = this.hobbies
fun address() = this.address
companion object {
// so we can still create it normally from kotlin
operator fun invoke(
name: String,
age: Int,
hobbies: List<String>,
address: Pair<String, Int>,
): Person =
Person(
name = name,
age = age,
hobbies = CollectionConverters.asScala(hobbies).toList(),
address = address.toTuple() // from scalaTuplesInKotlin
)
}
} Running this with Spark will work correctly: val test = Person(
name = "Jolan",
age = 22,
hobbies = listOf("Programming", "Gaming"),
address = "Kerkstraat" to 1
)
spark.createDataset(listOf(test), encoderFor<Person>()).show()
// +-----+---+--------------------+---------------+
// | name|age| hobbies| address|
// +-----+---+--------------------+---------------+
// |Jolan| 22|[Programming, Gam...|{Kerkstraat, 1}|
// +-----+---+--------------------+---------------+ But, we might need something more advanced, because calling It does have some promise though, as nested data classes like these even work :), plus we don't need to hack into the Spark source code anymore. How furtherWe could try to generate a (I do have some issues calling the deserializer on an encoded row with the private val encoder = encoderFor(Person_::class.java)
private val serializer = encoder.createSerializer()
private val deserializer = encoder.resolveAndBind(DataTypeUtils.toAttributes(encoder.schema()) as Seq<Attribute>, `SimpleAnalyzer$`.`MODULE$`).createDeserializer() ) And of course, actually build a compiler plugin. This is tricky and requires Kotlin 2.0. |
Hey, this is just awesome research, thank you! I think it makes more sense to compile to Java POJOs rather than to case classes. POJOs are natively supported by |
@asm0dey you mean using data class AddressJava @JvmOverloads constructor(
var street: String = "",
var city: String = "",
) : Serializable
data class PersonJava @JvmOverloads constructor(
var name: String = "",
var age: Int = -1,
var tupleTest: Tuple2<AddressJava, Int>? = null,
var listTest: List<AddressJava> = emptyList(),
) : Serializable
...
val data = listOf(
PersonJava("Jolan", 25, Tuple2(AddressJava("Street", "City"), 5), listOf(AddressJava("Street", "City"))),
)
val encoder = Encoders.bean(PersonJava::class.java)
val df = spark.createDataset(data, encoder)
df.show()
// +---+----------------+-----+---------+
// |age| listTest| name|tupleTest|
// +---+----------------+-----+---------+
// | 25|[{City, Street}]|Jolan| {}|
// +---+----------------+-----+---------+ And calling
This is something that is possible in the current version of the Kotlin Spark API, so it seems a bit harsh to break that. We could extend |
Huh, right, I forgot about the tuples support. Is it possible and necessary to support default argument values? It seems that backend should not care at all because at runtime they are already known |
Java bean support requires an empty constructor + getters/setters, so yeah :/. That's what Actually, we can do it with |
no-args plugin already adds a noargs constructor to a data class without default arguments, see here: https://kotlinlang.org/docs/no-arg-plugin.html You can even force it to work on your custom annotation. Doesn't solve the nested tuples feature though |
2 updates: First: We can successfully define a UDT for a class (en/de)coding it using the The downside of this approach is: no real column creation:
Second: I tweaked the Given: @SomeAnnotation
data class Address(
val streetAndNumber: Pair<String, Int>,
val city: String,
) We generate: class Address( // main constructor with scala names/types
streetAndNumber: Tuple2<String, Int>,
city: String,
) : Product {
// secondary in companion object with kotlin names/types
// Mainly, so Spark doesn't see this constructor
companion object {
operator fun invoke(
streetAndNumber: Pair<String, Int>,
city: String,
): Address = Address(
streetAndNumber = streetAndNumber.toTuple(),
city = city,
)
}
private val scalaStreetAndNumber = streetAndNumber
private val scalaCity = city
// For Scala
fun streetAndNumber() = scalaStreetAndNumber
fun city() = scalaCity
// For Kotlin
val streetAndNumber = scalaStreetAndNumber.toPair()
val city = scalaCity
// Product functions
override fun canEqual(that: Any?): Boolean = that is Address
override fun productElement(n: Int): Any = when (n) {
0 -> scalaStreetAndNumber
1 -> scalaCity
else -> throw IndexOutOfBoundsException(n.toString())
}
override fun productArity(): Int = 2
// data class functions
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (other !is Address) return false
if (streetAndNumber != other.streetAndNumber) return false
if (city != other.city) return false
return true
}
override fun hashCode(): Int {
var result = streetAndNumber.hashCode()
result = 31 * result + city.hashCode()
return result
}
fun copy(
streetAndNumber: Pair<String, Int> = this.streetAndNumber,
city: String = this.city,
): Address = Address(
streetAndNumber = streetAndNumber,
city = city,
)
override fun toString(): String = "Address(streetAndNumber=$streetAndNumber, city='$city')"
} Even when nested, the The plugin does need to create these converters:
The rest should be good to go Edit: Okay, nullability detection is not really there: encoderFor<Address>().schema().printTreeString()
// root
// |-- streetAndNumber: struct (nullable = true)
// | |-- _1: string (nullable = true)
// | |-- _2: integer (nullable = true)
// |-- city: string (nullable = true) I'm not sure how to convey these to Scala correctly. In |
Wow, man, this sounds stunning! How? |
Well if it walks like a |
IIRC the reason why I created it in that package was Spark only could read it from that package :) I would never do it without a serious reason. Also, I'm not sure if you can do it in Kotlin — probably it operates with Scala-specific reflection, but here I might be wrong. If it's possible to implement with Kotlin and outside Spark packages - we won't be affected by changes in Spark anymore, which will bring a bearable amount of maintenance. |
I'm trying the KotlinTypeInference route rn. Seems that most things work fine with Kotlin reflection since, like here, the actual encoders are somewhere else currently. This means we can add both Scala encoders and Java encoders, as well as a special product-like encoder for data classes. I'm not sure how backwards compatible that is, but it seems like the cleanest custom solution to retrieving encoders for Kotlin. We cannot go fully custom with our encoders, as only the ones used here are compatible, but hey, we can try |
It was never or goal to go fully custom, we should support the sensible subset of possibilities. Looking with a fresh eye I start to think that we actually don't need all these Scala intricacies, we need only to support the Kotlin staff and he smart about friendship delegation of everything else. The most complex part of support - encoders for data classes and recursive generics are already written and probably should be reused. |
Update time :) I've been trying to make my own mixed version of JavaTypeInference and ScalaReflection in pure Kotlin with success! This will allow us to create a complete replacement of the old encoding system with data classes supported out of the box :). (I did need 1 dirty hack to get the names of the columns right for data classes, but I'll get to that later haha) One downside of this approach is that it's definitely Spark 3.4+. This is because they switched their encoding system for Spark connect to this. So, instead of defining a (de)serializer for each type, like before, they define intermediate "AgnosticEncoder" objects which later will get (de)serializers generated when building an ExpressionEncoder. This meant I had to use ProductEncoder for Kotlin data classes since they are the only Encoder that works by reading the values and instantiating a new instance of any class by calling the constructor with all arguments at once. I'll try to make a new branch in this repo with this approach and see if we can replace the encoding with the new system. This should allow us to drop the |
And also probably significantly reduce of not remove the code preprocessing!
…On Sun, 17 Mar 2024, 12:05 Jolan Rensen, ***@***.***> wrote:
Update time :)
I've been trying to make my own mixed version of JavaTypeInference
<https://github.com/apache/spark/blob/branch-3.5/sql/api/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala>
and ScalaReflection
<https://github.com/apache/spark/blob/branch-3.5/sql/api/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala>
in pure Kotlin with success!
image.png (view on web)
<https://github.com/Kotlin/kotlin-spark-api/assets/17594275/bf366bf7-038c-4950-8f01-e90877e21b3b>
This will allow us to create a complete replacement of the old encoding
system with data classes supported out of the box :). (I did need 1 dirty
hack to get the names of the columns right for data classes, but I'll get
to that later haha)
One downside of this approach is that it's definitely Spark 3.4+. This is
because they switched their encoding system for Spark connect to this
<https://github.com/apache/spark/blob/branch-3.4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala>.
So, instead of defining a (de)serializer for each type, like before, they
define intermediate "AgnosticEncoder" objects which later will get
(de)serializers generated when building an ExpressionEncoder. This meant I
had to use ProductEncoder
<https://github.com/apache/spark/blob/8c6eeb8ab0180368cc60de8b2dbae7457bee5794/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala#L114>
for Kotlin data classes since they are the only Encoder that works by reading
the values
<https://github.com/apache/spark/blob/8c6eeb8ab0180368cc60de8b2dbae7457bee5794/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala#L350>
and instantiating a new instance of any class by calling the constructor
with all arguments at once
<https://github.com/apache/spark/blob/8c6eeb8ab0180368cc60de8b2dbae7457bee5794/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala#L353>
.
I'll try to make a new branch in this repo with this approach and see if
we can replace the encoding with the new system. This should allow us to
drop the :core module entirely and be more spark-version-independent (in
theory).
—
Reply to this email directly, view it on GitHub
<#195 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AAJ4XAV2IOLJK3MMGUHL7VDYYV2IVAVCNFSM6AAAAAAW7OYH3WVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDAMBSGQYDSOJUHE>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
|
AFAIR it should be totally possible to write our own implementation of data class instantiation. I'll try to look at it a bit later |
@asm0dey I thought so too, but if we use AgnosticEncoders (which provides Spark connect compatibility as far as I know), we can only use the encoders given to us. |
You can check out #218 to see my progress :). All other Spark helpers should be doable in pure Kotlin (as I did with The encoders are still incomplete, so there are still many failing tests, but we can start experimenting with what does and what does not work on that branch. |
One of the main problems I currently have is with the encoding of data class names using the One way to tackle this is by marking you data classes like: data class User(@get:JvmName("name") val name: String, @get:JvmName("age") val age: Int) which sets the JVM function names of However, this doesn't work for data classes we don't have access to, like My name hack tries to override the So yeah... ideas are welcome. You can turn it off by setting @asm0dey Maybe a little compiler plugin after all? :) At least for manual user-created data classes |
Update: I managed to create a little compiler plugin that in the IR stage adds Come to think of it 🤔 I should check whether the Plus, we might be able to add something like Finally, I need to take a look at |
Most tests are now fixed at: There's just one place that now fails and that is when returning a |
They're a hassle to make, but compiler plugins are awesome :) The compiler plugin now converts: @Sparkify
data class User(
val name: String,
@ColumnName("test") val age: Int,
) in IR successfully to: @Sparkify
data class User(
@get:JvmName("name") val name: String,
@get:JvmName("test") @ColumnName("test") val age: Int,
): scala.Product {
override fun canEqual(that: Any?): Boolean = that is User
override fun productArity(): Int = 2
override fun productElement(n: Int): Any? =
if (n == 0) this.name
else if (n == 1) this.age
else throw IndexOutOfBoundsException()
} This is scala version independent since it just queries the classpath for |
Sorry to bother is there any expected release date for this version |
@mdsadique-inam Sorry for my absence, work called and KotlinConf and DataFrame had priority. There are still some (large) issues with notebook support (Spark and java serialization breaks everything), so until I can solve that I won't be able to release it. |
I understand your situation, so therefore I am also willing to contribute here, but don't know where to start, I am looking into it. |
@mdsadique-inam If you're familiar with how Spark serialization works. This is what I'm currently struggling with: |
Also, did anyone get spark-connect to work, like at all? I'm trying to run it with a sample project with scala 2.12 (2.13 breaks) and spark 3.5.1 (or 3.5.0), on java 8, 11, any combination, but I keep getting NoSuchMethodErrors. Even though I literally follow https://spark.apache.org/docs/latest/spark-connect-overview.html#use-spark-connect-in-standalone-applications. |
Alright! I finally got a working spark-connect connection using the following setup: Spark-connect server:
Client:
I cannot add a dependency on kotlin-spark-api yet because of clashes between the normal sparkSql and this sqlApi, but let's see if I can make a "watered down" version of the API with the knowledge I have now :) Watered down version: https://github.com/Kotlin/kotlin-spark-api/tree/3.4%2B-spark-connect |
Okay, that setup breaks down when you actually try to submit classes in a jar with Now with an actual dependence on the watered down Spark API: Spark-connect server:
Spark client:
working branch: https://github.com/Kotlin/kotlin-spark-api/tree/71f115a9fa0ebec4b44e5bc3857e0fc7bacc190b Encoding seems to work, but UDTs don't :( |
I managed to get spark-connect work in notebooks too! https://github.com/Kotlin/kotlin-spark-api/tree/3.4%2B-spark-connect I modified the integration with jupyter to, at the beginning of each cell, add a line that sends all created .class files to a class cache accessible by spark-connect :). Now I need to decide how to go forward:
@asm0dey Any wise words? :) |
I'm happy to lend a hand, would love to see this come to fruition. |
@Jolanrensen Let me know if there's a better communication channel, but happy to chat directly on how I could contribute. |
@leourbina I'm always reachable at https://kotlinlang.slack.com/archives/C015B9ZRGJF for anything :) That might be a good place for ideas, discussions etc.. Better there than to flood this issue even more haha |
https://spark.apache.org/news/spark-3-4-0-released.html
The text was updated successfully, but these errors were encountered: