Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-49415][CONNECT][SQL] Move SQLImplicits to sql/api #48151

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,283 +16,8 @@
*/
package org.apache.spark.sql

import scala.collection.Map
import scala.language.implicitConversions
import scala.reflect.classTag
import scala.reflect.runtime.universe.TypeTag

import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder, AgnosticEncoders}
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders._

/**
* A collection of implicit methods for converting names and Symbols into [[Column]]s, and for
* converting common Scala objects into [[Dataset]]s.
*
* @since 3.4.0
*/
abstract class SQLImplicits private[sql] (session: SparkSession) extends LowPrioritySQLImplicits {

/**
* Converts $"col name" into a [[Column]].
*
* @since 3.4.0
*/
implicit class StringToColumn(val sc: StringContext) {
def $(args: Any*): ColumnName = {
new ColumnName(sc.s(args: _*))
}
}

/**
* An implicit conversion that turns a Scala `Symbol` into a [[Column]].
* @since 3.4.0
*/
implicit def symbolToColumn(s: Symbol): ColumnName = new ColumnName(s.name)

/** @since 3.4.0 */
implicit val newIntEncoder: Encoder[Int] = PrimitiveIntEncoder

/** @since 3.4.0 */
implicit val newLongEncoder: Encoder[Long] = PrimitiveLongEncoder

/** @since 3.4.0 */
implicit val newDoubleEncoder: Encoder[Double] = PrimitiveDoubleEncoder

/** @since 3.4.0 */
implicit val newFloatEncoder: Encoder[Float] = PrimitiveFloatEncoder

/** @since 3.4.0 */
implicit val newByteEncoder: Encoder[Byte] = PrimitiveByteEncoder

/** @since 3.4.0 */
implicit val newShortEncoder: Encoder[Short] = PrimitiveShortEncoder

/** @since 3.4.0 */
implicit val newBooleanEncoder: Encoder[Boolean] = PrimitiveBooleanEncoder

/** @since 3.4.0 */
implicit val newStringEncoder: Encoder[String] = StringEncoder

/** @since 3.4.0 */
implicit val newJavaDecimalEncoder: Encoder[java.math.BigDecimal] =
AgnosticEncoders.DEFAULT_JAVA_DECIMAL_ENCODER

/** @since 3.4.0 */
implicit val newScalaDecimalEncoder: Encoder[scala.math.BigDecimal] =
AgnosticEncoders.DEFAULT_SCALA_DECIMAL_ENCODER

/** @since 3.4.0 */
implicit val newDateEncoder: Encoder[java.sql.Date] = AgnosticEncoders.STRICT_DATE_ENCODER

/** @since 3.4.0 */
implicit val newLocalDateEncoder: Encoder[java.time.LocalDate] =
AgnosticEncoders.STRICT_LOCAL_DATE_ENCODER

/** @since 3.4.0 */
implicit val newLocalDateTimeEncoder: Encoder[java.time.LocalDateTime] =
AgnosticEncoders.LocalDateTimeEncoder

/** @since 3.4.0 */
implicit val newTimeStampEncoder: Encoder[java.sql.Timestamp] =
AgnosticEncoders.STRICT_TIMESTAMP_ENCODER

/** @since 3.4.0 */
implicit val newInstantEncoder: Encoder[java.time.Instant] =
AgnosticEncoders.STRICT_INSTANT_ENCODER

/** @since 3.4.0 */
implicit val newDurationEncoder: Encoder[java.time.Duration] = DayTimeIntervalEncoder

/** @since 3.4.0 */
implicit val newPeriodEncoder: Encoder[java.time.Period] = YearMonthIntervalEncoder

/** @since 3.4.0 */
implicit def newJavaEnumEncoder[A <: java.lang.Enum[_]: TypeTag]: Encoder[A] = {
ScalaReflection.encoderFor[A]
}

// Boxed primitives

/** @since 3.4.0 */
implicit val newBoxedIntEncoder: Encoder[java.lang.Integer] = BoxedIntEncoder

/** @since 3.4.0 */
implicit val newBoxedLongEncoder: Encoder[java.lang.Long] = BoxedLongEncoder

/** @since 3.4.0 */
implicit val newBoxedDoubleEncoder: Encoder[java.lang.Double] = BoxedDoubleEncoder

/** @since 3.4.0 */
implicit val newBoxedFloatEncoder: Encoder[java.lang.Float] = BoxedFloatEncoder

/** @since 3.4.0 */
implicit val newBoxedByteEncoder: Encoder[java.lang.Byte] = BoxedByteEncoder

/** @since 3.4.0 */
implicit val newBoxedShortEncoder: Encoder[java.lang.Short] = BoxedShortEncoder

/** @since 3.4.0 */
implicit val newBoxedBooleanEncoder: Encoder[java.lang.Boolean] = BoxedBooleanEncoder

// Seqs
private def newSeqEncoder[E](elementEncoder: AgnosticEncoder[E]): AgnosticEncoder[Seq[E]] = {
IterableEncoder(
classTag[Seq[E]],
elementEncoder,
elementEncoder.nullable,
elementEncoder.lenientSerialization)
}

/**
* @since 3.4.0
* @deprecated
* use [[newSequenceEncoder]]
*/
@deprecated("Use newSequenceEncoder instead", "2.2.0")
val newIntSeqEncoder: Encoder[Seq[Int]] = newSeqEncoder(PrimitiveIntEncoder)

/**
* @since 3.4.0
* @deprecated
* use [[newSequenceEncoder]]
*/
@deprecated("Use newSequenceEncoder instead", "2.2.0")
val newLongSeqEncoder: Encoder[Seq[Long]] = newSeqEncoder(PrimitiveLongEncoder)

/**
* @since 3.4.0
* @deprecated
* use [[newSequenceEncoder]]
*/
@deprecated("Use newSequenceEncoder instead", "2.2.0")
val newDoubleSeqEncoder: Encoder[Seq[Double]] = newSeqEncoder(PrimitiveDoubleEncoder)

/**
* @since 3.4.0
* @deprecated
* use [[newSequenceEncoder]]
*/
@deprecated("Use newSequenceEncoder instead", "2.2.0")
val newFloatSeqEncoder: Encoder[Seq[Float]] = newSeqEncoder(PrimitiveFloatEncoder)

/**
* @since 3.4.0
* @deprecated
* use [[newSequenceEncoder]]
*/
@deprecated("Use newSequenceEncoder instead", "2.2.0")
val newByteSeqEncoder: Encoder[Seq[Byte]] = newSeqEncoder(PrimitiveByteEncoder)

/**
* @since 3.4.0
* @deprecated
* use [[newSequenceEncoder]]
*/
@deprecated("Use newSequenceEncoder instead", "2.2.0")
val newShortSeqEncoder: Encoder[Seq[Short]] = newSeqEncoder(PrimitiveShortEncoder)

/**
* @since 3.4.0
* @deprecated
* use [[newSequenceEncoder]]
*/
@deprecated("Use newSequenceEncoder instead", "2.2.0")
val newBooleanSeqEncoder: Encoder[Seq[Boolean]] = newSeqEncoder(PrimitiveBooleanEncoder)

/**
* @since 3.4.0
* @deprecated
* use [[newSequenceEncoder]]
*/
@deprecated("Use newSequenceEncoder instead", "2.2.0")
val newStringSeqEncoder: Encoder[Seq[String]] = newSeqEncoder(StringEncoder)

/**
* @since 3.4.0
* @deprecated
* use [[newSequenceEncoder]]
*/
@deprecated("Use newSequenceEncoder instead", "2.2.0")
def newProductSeqEncoder[A <: Product: TypeTag]: Encoder[Seq[A]] =
newSeqEncoder(ScalaReflection.encoderFor[A])

/** @since 3.4.0 */
implicit def newSequenceEncoder[T <: Seq[_]: TypeTag]: Encoder[T] =
ScalaReflection.encoderFor[T]

// Maps
/** @since 3.4.0 */
implicit def newMapEncoder[T <: Map[_, _]: TypeTag]: Encoder[T] = ScalaReflection.encoderFor[T]

/**
* Notice that we serialize `Set` to Catalyst array. The set property is only kept when
* manipulating the domain objects. The serialization format doesn't keep the set property. When
* we have a Catalyst array which contains duplicated elements and convert it to
* `Dataset[Set[T]]` by using the encoder, the elements will be de-duplicated.
*
* @since 3.4.0
*/
implicit def newSetEncoder[T <: Set[_]: TypeTag]: Encoder[T] = ScalaReflection.encoderFor[T]

// Arrays
private def newArrayEncoder[E](
elementEncoder: AgnosticEncoder[E]): AgnosticEncoder[Array[E]] = {
ArrayEncoder(elementEncoder, elementEncoder.nullable)
}

/** @since 3.4.0 */
implicit val newIntArrayEncoder: Encoder[Array[Int]] = newArrayEncoder(PrimitiveIntEncoder)

/** @since 3.4.0 */
implicit val newLongArrayEncoder: Encoder[Array[Long]] = newArrayEncoder(PrimitiveLongEncoder)

/** @since 3.4.0 */
implicit val newDoubleArrayEncoder: Encoder[Array[Double]] =
newArrayEncoder(PrimitiveDoubleEncoder)

/** @since 3.4.0 */
implicit val newFloatArrayEncoder: Encoder[Array[Float]] = newArrayEncoder(
PrimitiveFloatEncoder)

/** @since 3.4.0 */
implicit val newByteArrayEncoder: Encoder[Array[Byte]] = BinaryEncoder

/** @since 3.4.0 */
implicit val newShortArrayEncoder: Encoder[Array[Short]] = newArrayEncoder(
PrimitiveShortEncoder)

/** @since 3.4.0 */
implicit val newBooleanArrayEncoder: Encoder[Array[Boolean]] =
newArrayEncoder(PrimitiveBooleanEncoder)

/** @since 3.4.0 */
implicit val newStringArrayEncoder: Encoder[Array[String]] = newArrayEncoder(StringEncoder)

/** @since 3.4.0 */
implicit def newProductArrayEncoder[A <: Product: TypeTag]: Encoder[Array[A]] = {
newArrayEncoder(ScalaReflection.encoderFor[A])
}

/**
* Creates a [[Dataset]] from a local Seq.
* @since 3.4.0
*/
implicit def localSeqToDatasetHolder[T: Encoder](s: Seq[T]): DatasetHolder[T] = {
DatasetHolder(session.createDataset(s))
}
}

/**
* Lower priority implicit methods for converting Scala objects into [[Dataset]]s. Conflicting
* implicits are placed here to disambiguate resolution.
*
* Reasons for including specific implicits: newProductEncoder - to disambiguate for `List`s which
* are both `Seq` and `Product`
*/
trait LowPrioritySQLImplicits {

/** @since 3.4.0 */
implicit def newProductEncoder[T <: Product: TypeTag]: Encoder[T] =
ScalaReflection.encoderFor[T]
/** @inheritdoc */
abstract class SQLImplicits private[sql] (override val session: SparkSession)
extends api.SQLImplicits {
type DS[U] = Dataset[U]
}
Original file line number Diff line number Diff line change
Expand Up @@ -252,19 +252,8 @@ class SparkSession private[sql] (
lazy val udf: UDFRegistration = new UDFRegistration(this)

// scalastyle:off
// Disable style checker so "implicits" object can start with lowercase i
/**
* (Scala-specific) Implicit methods available in Scala for converting common names and Symbols
* into [[Column]]s, and for converting common Scala objects into DataFrame`s.
*
* {{{
* val sparkSession = SparkSession.builder.getOrCreate()
* import sparkSession.implicits._
* }}}
*
* @since 3.4.0
*/
object implicits extends SQLImplicits(this) with Serializable
/** @inheritdoc */
object implicits extends SQLImplicits(this)
// scalastyle:on

/** @inheritdoc */
Expand Down
12 changes: 12 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,18 @@ object MimaExcludes {
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryListener$QueryStartedEvent"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminatedEvent"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryStatus"),

// SPARK-49415: Shared SQLImplicits.
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.DatasetHolder"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.DatasetHolder$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.LowPrioritySQLImplicits"),
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.SQLContext$implicits$"),
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.SQLImplicits"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.SQLImplicits.StringToColumn"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLImplicits.this"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.SQLImplicits$StringToColumn"),
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.SparkSession$implicits$"),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.sql.SQLImplicits.session"),
)

// Default exclude rules
Expand Down
Loading