Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@
</th>
<th>
<span data-toggle="tooltip" data-placement="right" title="Started time of this application.">
Started
Started (GMT)
</span>
</th>
<th>
<span data-toggle="tooltip" data-placement="above" title="The completed time of this application.">
Completed
Completed (GMT)
</span>
</th>
<th>
Expand All @@ -56,7 +56,7 @@
</th>
<th>
<span data-toggle="tooltip" data-placement="above" title="The timestamp of the last updating on this application">
Last Updated
Last Updated (GMT)
</span>
</th>
</tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@

package org.apache.spark.sql.catalyst

import org.apache.spark.SparkConf
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.catalyst.analysis.{GetColumnByOrdinal, UnresolvedAttribute, UnresolvedExtractValue}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.objects._
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData}
Expand Down Expand Up @@ -403,6 +407,20 @@ object ScalaReflection extends ScalaReflection {
} else {
newInstance
}

case other =>

val obj = NewInstance(
classOf[KryoSerializer],
new SparkConf :: Nil,
dataType = ObjectType(classOf[KryoSerializer])
)


val kryoSerializer = Encoders.kryo(getPath.getClass)
.asInstanceOf[ExpressionEncoder[_]]
kryoSerializer.deserializer

}
}

Expand Down Expand Up @@ -582,9 +600,35 @@ object ScalaReflection extends ScalaReflection {
val nullOutput = expressions.Literal.create(null, nonNullOutput.dataType)
expressions.If(IsNull(inputObject), nullOutput, nonNullOutput)


case other =>
throw new UnsupportedOperationException(
s"No Encoder found for $tpe\n" + walkedTypePath.mkString("\n"))
val kryoSerializer = Encoders.kryo(inputObject.getClass)
.asInstanceOf[ExpressionEncoder[_]]
kryoSerializer.serializer.head

// val obj = NewInstance(
// KryoSer
//
// )
// val kryoEncoder = Encoders.kryo(inputObject.dataType.getClass)


// Invoke(kryoEncoder, "serialize", BinaryType)
//
// val obj = new KryoSerializer(new SparkConf).newInstance()
//
// val obj = NewInstance(
// classOf[KryoSerializer],
// new SparkConf :: Nil,
// classOf[KryoSerializer]
// )
//
// Invoke(obj, "serialize", BinaryType, inputObject :: Nil)
//
// implicit
// inputObjectect
// throw new UnsupportedOperationException(
// s"No Encoder found for $tpe\n" + walkedTypePath.mkString("\n"))
}

}
Expand Down Expand Up @@ -701,7 +745,8 @@ object ScalaReflection extends ScalaReflection {
StructField(fieldName, dataType, nullable)
}), nullable = true)
case other =>
throw new UnsupportedOperationException(s"Schema for type $other is not supported")
Schema(BinaryType, nullable = false)
// throw new UnsupportedOperationException(s"Schema for type $other is not supported")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,12 +225,20 @@ case class ExpressionEncoder[T](
// (intermediate value is not an attribute). We assume that all serializer expressions use a same
// `BoundReference` to refer to the object, and throw exception if they don't.
assert(serializer.forall(_.references.isEmpty), "serializer cannot reference to any attributes.")

val x = serializer.flatMap { ser =>
val boundRefs = ser.collect { case b: BoundReference => b }
assert(boundRefs.nonEmpty,
"each serializer expression should contains at least one `BoundReference`")
boundRefs
}

assert(serializer.flatMap { ser =>
val boundRefs = ser.collect { case b: BoundReference => b }
assert(boundRefs.nonEmpty,
"each serializer expression should contains at least one `BoundReference`")
boundRefs
}.distinct.length <= 1, "all serializer expressions must use the same BoundReference.")
}.distinct.length <= 2, "all serializer expressions must use the same BoundReference.")

/**
* Returns a new copy of this encoder, where the `deserializer` is resolved and bound to the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,17 @@ class UDTForCaseClass extends UserDefinedType[UDTCaseClass] {
}
}

case class MyClass(a: String, b: Option[Set[Int]])
case class MyClass2(c: MyClass)
case class MyClass3(a: String, b: Set[Int])
class ExpressionEncoderSuite extends PlanTest with AnalysisTest {
OuterScopes.addOuterScope(this)

// implicit val kryo = Encoders.kryo[MyClass2]
implicit def encoder[T : TypeTag]: ExpressionEncoder[T] = ExpressionEncoder()


encodeDecodeTest(Seq(MyClass2(MyClass("a", None)), MyClass2(MyClass("b", None))), "testmyclass")
// test flat encoders
encodeDecodeTest(false, "primitive boolean")
encodeDecodeTest(-3.toByte, "primitive byte")
Expand Down