Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 4 additions & 227 deletions sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,20 @@

package org.apache.spark.sql

import java.lang.reflect.Modifier

import scala.annotation.implicitNotFound
import scala.reflect.{classTag, ClassTag}
import scala.reflect.ClassTag

import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder}
import org.apache.spark.sql.catalyst.expressions.{BoundReference, DecodeUsingSerializer, EncodeUsingSerializer}
import org.apache.spark.sql.types._


/**
* :: Experimental ::
* Used to convert a JVM object of type `T` to and from the internal Spark SQL representation.
*
* == Scala ==
* Encoders are generally created automatically through implicits from a `SQLContext`.
* Encoders are generally created automatically through implicits from a `SQLContext`, or can be
* explicitly created by calling static methods on [[Encoders]].
*
* {{{
* import sqlContext.implicits._
Expand Down Expand Up @@ -81,224 +79,3 @@ trait Encoder[T] extends Serializable {
/** A ClassTag that can be used to construct and Array to contain a collection of `T`. */
def clsTag: ClassTag[T]
}

/**
* :: Experimental ::
* Methods for creating an [[Encoder]].
*
* @since 1.6.0
*/
@Experimental
object Encoders {

/**
* An encoder for nullable boolean type.
* @since 1.6.0
*/
def BOOLEAN: Encoder[java.lang.Boolean] = ExpressionEncoder()

/**
* An encoder for nullable byte type.
* @since 1.6.0
*/
def BYTE: Encoder[java.lang.Byte] = ExpressionEncoder()

/**
* An encoder for nullable short type.
* @since 1.6.0
*/
def SHORT: Encoder[java.lang.Short] = ExpressionEncoder()

/**
* An encoder for nullable int type.
* @since 1.6.0
*/
def INT: Encoder[java.lang.Integer] = ExpressionEncoder()

/**
* An encoder for nullable long type.
* @since 1.6.0
*/
def LONG: Encoder[java.lang.Long] = ExpressionEncoder()

/**
* An encoder for nullable float type.
* @since 1.6.0
*/
def FLOAT: Encoder[java.lang.Float] = ExpressionEncoder()

/**
* An encoder for nullable double type.
* @since 1.6.0
*/
def DOUBLE: Encoder[java.lang.Double] = ExpressionEncoder()

/**
* An encoder for nullable string type.
* @since 1.6.0
*/
def STRING: Encoder[java.lang.String] = ExpressionEncoder()

/**
* An encoder for nullable decimal type.
* @since 1.6.0
*/
def DECIMAL: Encoder[java.math.BigDecimal] = ExpressionEncoder()

/**
* An encoder for nullable date type.
* @since 1.6.0
*/
def DATE: Encoder[java.sql.Date] = ExpressionEncoder()

/**
* An encoder for nullable timestamp type.
* @since 1.6.0
*/
def TIMESTAMP: Encoder[java.sql.Timestamp] = ExpressionEncoder()

/**
* An encoder for arrays of bytes.
* @since 1.6.1
*/
def BINARY: Encoder[Array[Byte]] = ExpressionEncoder()

/**
* Creates an encoder for Java Bean of type T.
*
* T must be publicly accessible.
*
* supported types for java bean field:
* - primitive types: boolean, int, double, etc.
* - boxed types: Boolean, Integer, Double, etc.
* - String
* - java.math.BigDecimal
* - time related: java.sql.Date, java.sql.Timestamp
* - collection types: only array and java.util.List currently, map support is in progress
* - nested java bean.
*
* @since 1.6.0
*/
def bean[T](beanClass: Class[T]): Encoder[T] = ExpressionEncoder.javaBean(beanClass)

/**
* (Scala-specific) Creates an encoder that serializes objects of type T using Kryo.
* This encoder maps T into a single byte array (binary) field.
*
* T must be publicly accessible.
*
* @since 1.6.0
*/
def kryo[T: ClassTag]: Encoder[T] = genericSerializer(useKryo = true)

/**
* Creates an encoder that serializes objects of type T using Kryo.
* This encoder maps T into a single byte array (binary) field.
*
* T must be publicly accessible.
*
* @since 1.6.0
*/
def kryo[T](clazz: Class[T]): Encoder[T] = kryo(ClassTag[T](clazz))

/**
* (Scala-specific) Creates an encoder that serializes objects of type T using generic Java
* serialization. This encoder maps T into a single byte array (binary) field.
*
* Note that this is extremely inefficient and should only be used as the last resort.
*
* T must be publicly accessible.
*
* @since 1.6.0
*/
def javaSerialization[T: ClassTag]: Encoder[T] = genericSerializer(useKryo = false)

/**
* Creates an encoder that serializes objects of type T using generic Java serialization.
* This encoder maps T into a single byte array (binary) field.
*
* Note that this is extremely inefficient and should only be used as the last resort.
*
* T must be publicly accessible.
*
* @since 1.6.0
*/
def javaSerialization[T](clazz: Class[T]): Encoder[T] = javaSerialization(ClassTag[T](clazz))

/** Throws an exception if T is not a public class. */
private def validatePublicClass[T: ClassTag](): Unit = {
if (!Modifier.isPublic(classTag[T].runtimeClass.getModifiers)) {
throw new UnsupportedOperationException(
s"${classTag[T].runtimeClass.getName} is not a public class. " +
"Only public classes are supported.")
}
}

/** A way to construct encoders using generic serializers. */
private def genericSerializer[T: ClassTag](useKryo: Boolean): Encoder[T] = {
if (classTag[T].runtimeClass.isPrimitive) {
throw new UnsupportedOperationException("Primitive types are not supported.")
}

validatePublicClass[T]()

ExpressionEncoder[T](
schema = new StructType().add("value", BinaryType),
flat = true,
serializer = Seq(
EncodeUsingSerializer(
BoundReference(0, ObjectType(classOf[AnyRef]), nullable = true), kryo = useKryo)),
deserializer =
DecodeUsingSerializer[T](
BoundReference(0, BinaryType, nullable = true), classTag[T], kryo = useKryo),
clsTag = classTag[T]
)
}

/**
* An encoder for 2-ary tuples.
* @since 1.6.0
*/
def tuple[T1, T2](
e1: Encoder[T1],
e2: Encoder[T2]): Encoder[(T1, T2)] = {
ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2))
}

/**
* An encoder for 3-ary tuples.
* @since 1.6.0
*/
def tuple[T1, T2, T3](
e1: Encoder[T1],
e2: Encoder[T2],
e3: Encoder[T3]): Encoder[(T1, T2, T3)] = {
ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2), encoderFor(e3))
}

/**
* An encoder for 4-ary tuples.
* @since 1.6.0
*/
def tuple[T1, T2, T3, T4](
e1: Encoder[T1],
e2: Encoder[T2],
e3: Encoder[T3],
e4: Encoder[T4]): Encoder[(T1, T2, T3, T4)] = {
ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2), encoderFor(e3), encoderFor(e4))
}

/**
* An encoder for 5-ary tuples.
* @since 1.6.0
*/
def tuple[T1, T2, T3, T4, T5](
e1: Encoder[T1],
e2: Encoder[T2],
e3: Encoder[T3],
e4: Encoder[T4],
e5: Encoder[T5]): Encoder[(T1, T2, T3, T4, T5)] = {
ExpressionEncoder.tuple(
encoderFor(e1), encoderFor(e2), encoderFor(e3), encoderFor(e4), encoderFor(e5))
}
}
Loading