Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup Manual TypedEncoders #559

Merged
merged 1 commit into from
Sep 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package org.locationtech.rasterframes.encoders

import frameless.{RecordEncoderField, TypedEncoder}
import org.apache.spark.sql.FramelessInternals
import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, InvokeLike, NewInstance, StaticInvoke}
import org.apache.spark.sql.catalyst.expressions.{CreateNamedStruct, Expression, GetStructField, If, IsNull, Literal}
import org.apache.spark.sql.types.{DataType, Metadata, StructField, StructType}

import scala.reflect.{ClassTag, classTag}

/** Can be useful for non Scala types and for complicated case classes with implicits in the constructor. */
object ManualTypedEncoder {
/** Invokes apply from the companion object. */
def staticInvoke[T: ClassTag](
fields: List[RecordEncoderField],
fieldNameModify: String => String = identity,
isNullable: Boolean = true
): TypedEncoder[T] = apply[T](fields, { (classTag, newArgs, jvmRepr) => StaticInvoke(classTag.runtimeClass, jvmRepr, "apply", newArgs, propagateNull = true, returnNullable = false) }, fieldNameModify, isNullable)

/** Invokes object constructor. */
def newInstance[T: ClassTag](
fields: List[RecordEncoderField],
fieldNameModify: String => String = identity,
isNullable: Boolean = true
): TypedEncoder[T] = apply[T](fields, { (classTag, newArgs, jvmRepr) => NewInstance(classTag.runtimeClass, newArgs, jvmRepr, propagateNull = true) }, fieldNameModify, isNullable)

def apply[T: ClassTag](
fields: List[RecordEncoderField],
newInstanceExpression: (ClassTag[T], Seq[Expression], DataType) => InvokeLike,
fieldNameModify: String => String = identity,
isNullable: Boolean = true
): TypedEncoder[T] = make[T](fields, newInstanceExpression, fieldNameModify, isNullable, classTag[T])

private def make[T](
// the catalyst struct
fields: List[RecordEncoderField],
// newInstanceExpression for the fromCatalyst function
newInstanceExpression: (ClassTag[T], Seq[Expression], DataType) => InvokeLike,
// allows to convert the field name into the field name getter
fieldNameModify: String => String,
// is the codec nullable
isNullable: Boolean,
// ClassTag is required for the TypedEncoder constructor
// it is passed explicitly to disambiguate ClassTag passed implicitly as a function argument
// and the one from the TypedEncoder constructor
ct: ClassTag[T]
): TypedEncoder[T] = new TypedEncoder[T]()(ct) {
def nullable: Boolean = isNullable

def jvmRepr: DataType = FramelessInternals.objectTypeFor[T]

def catalystRepr: DataType = {
val structFields = fields.map { field =>
StructField(
name = field.name,
dataType = field.encoder.catalystRepr,
nullable = field.encoder.nullable,
metadata = Metadata.empty
)
}

StructType(structFields)
}

def fromCatalyst(path: Expression): Expression = {
val newArgs: Seq[Expression] = fields.map { field =>
field.encoder.fromCatalyst( GetStructField(path, field.ordinal, Some(field.name)) )
}
val newExpr = newInstanceExpression(classTag, newArgs, jvmRepr)

val nullExpr = Literal.create(null, jvmRepr)
If(IsNull(path), nullExpr, newExpr)
}

def toCatalyst(path: Expression): Expression = {
val nameExprs = fields.map { field => Literal(field.name) }

val valueExprs: Seq[Expression] = fields.map { field =>
val fieldPath = Invoke(path, fieldNameModify(field.name), field.encoder.jvmRepr, Nil)
field.encoder.toCatalyst(fieldPath)
}

// the way exprs are encoded in CreateNamedStruct
val exprs = nameExprs.zip(valueExprs).flatMap { case (nameExpr, valueExpr) => nameExpr :: valueExpr :: Nil }

val createExpr = CreateNamedStruct(exprs)
val nullExpr = Literal.create(null, createExpr.dataType)
If(IsNull(path), nullExpr, createExpr)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ package org.locationtech.rasterframes.encoders
import org.locationtech.rasterframes.stats.{CellHistogram, CellStatistics, LocalCellStatistics}
import org.locationtech.jts.geom.Envelope
import geotrellis.proj4.CRS
import geotrellis.raster.{CellSize, CellType, Dimensions, Raster, Tile, TileLayout}
import geotrellis.raster.{CellSize, CellType, Dimensions, Raster, Tile, TileLayout, GridBounds, CellGrid}
import geotrellis.layer._
import geotrellis.vector.{Extent, ProjectedExtent}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
Expand All @@ -39,34 +39,22 @@ import java.sql.Timestamp
import scala.reflect.ClassTag
import scala.reflect.runtime.universe._

/**
* TODO: move this overload to GeoTrellis, the reason is in the generic method invocation and Integral in implicits
*/
object DimensionsInt {
def apply(cols: Int, rows: Int): Dimensions[Int] = new Dimensions(cols, rows)
}

object EnvelopeLocal {
def apply(minx: Double, maxx: Double, miny: Double, maxy: Double): Envelope = new Envelope(minx, maxx, miny, miny)
}

/**
* Implicit encoder definitions for RasterFrameLayer types.
*/
trait StandardEncoders extends SpatialEncoders with TypedEncoders {
def expressionEncoder[T: TypeTag]: ExpressionEncoder[T] = ExpressionEncoder()

implicit def optionalEncoder[T: TypedEncoder]: ExpressionEncoder[Option[T]] = typedExpressionEncoder[Option[T]]

implicit lazy val strMapEncoder: ExpressionEncoder[Map[String, String]] = ExpressionEncoder()
implicit lazy val crsExpressionEncoder: ExpressionEncoder[CRS] = ExpressionEncoder()
implicit lazy val projectedExtentEncoder: ExpressionEncoder[ProjectedExtent] = ExpressionEncoder()
implicit lazy val temporalProjectedExtentEncoder: ExpressionEncoder[TemporalProjectedExtent] = ExpressionEncoder()
implicit lazy val timestampEncoder: ExpressionEncoder[Timestamp] = ExpressionEncoder()
implicit lazy val strMapEncoder: ExpressionEncoder[Map[String, String]] = ExpressionEncoder()
implicit lazy val cellStatsEncoder: ExpressionEncoder[CellStatistics] = ExpressionEncoder()
implicit lazy val cellHistEncoder: ExpressionEncoder[CellHistogram] = ExpressionEncoder()
implicit lazy val localCellStatsEncoder: ExpressionEncoder[LocalCellStatistics] = ExpressionEncoder()
implicit lazy val uriEncoder: ExpressionEncoder[URI] = typedExpressionEncoder[URI]
implicit lazy val quantileSummariesEncoder: ExpressionEncoder[QuantileSummaries] = typedExpressionEncoder[QuantileSummaries]

implicit lazy val uriEncoder: ExpressionEncoder[URI] = typedExpressionEncoder[URI]
implicit lazy val quantileSummariesEncoder: ExpressionEncoder[QuantileSummaries] = typedExpressionEncoder[QuantileSummaries]
implicit lazy val envelopeEncoder: ExpressionEncoder[Envelope] = typedExpressionEncoder
implicit lazy val longExtentEncoder: ExpressionEncoder[LongExtent] = typedExpressionEncoder
implicit lazy val extentEncoder: ExpressionEncoder[Extent] = typedExpressionEncoder
Expand All @@ -76,18 +64,17 @@ trait StandardEncoders extends SpatialEncoders with TypedEncoders {
implicit lazy val temporalKeyEncoder: ExpressionEncoder[TemporalKey] = typedExpressionEncoder
implicit lazy val spaceTimeKeyEncoder: ExpressionEncoder[SpaceTimeKey] = typedExpressionEncoder
implicit def keyBoundsEncoder[K: TypedEncoder]: ExpressionEncoder[KeyBounds[K]] = typedExpressionEncoder[KeyBounds[K]]
implicit def boundsEncoder[K: TypedEncoder]: ExpressionEncoder[Bounds[K]] = keyBoundsEncoder[KeyBounds[K]].asInstanceOf[ExpressionEncoder[Bounds[K]]]
implicit lazy val cellTypeEncoder: ExpressionEncoder[CellType] = typedExpressionEncoder[CellType]
implicit lazy val dimensionsEncoder: ExpressionEncoder[Dimensions[Int]] = typedExpressionEncoder
implicit def dimensionsEncoder[N: Integral: TypedEncoder]: ExpressionEncoder[Dimensions[N]] = typedExpressionEncoder[Dimensions[N]]
implicit def gridBoundsEncoder[N: Integral: TypedEncoder]: ExpressionEncoder[GridBounds[N]] = typedExpressionEncoder[GridBounds[N]]
implicit lazy val layoutDefinitionEncoder: ExpressionEncoder[LayoutDefinition] = typedExpressionEncoder
implicit def tileLayerMetadataEncoder[K: TypedEncoder: ClassTag]: ExpressionEncoder[TileLayerMetadata[K]] = typedExpressionEncoder[TileLayerMetadata[K]]
implicit lazy val tileContextEncoder: ExpressionEncoder[TileContext] = typedExpressionEncoder
implicit lazy val tileDataContextEncoder: ExpressionEncoder[TileDataContext] = typedExpressionEncoder
implicit lazy val cellContextEncoder: ExpressionEncoder[CellContext] = typedExpressionEncoder

implicit lazy val tileEncoder: ExpressionEncoder[Tile] = typedExpressionEncoder
implicit lazy val optionalTileEncoder: ExpressionEncoder[Option[Tile]] = typedExpressionEncoder
implicit lazy val rasterEncoder: ExpressionEncoder[Raster[Tile]] = typedExpressionEncoder
implicit def rasterEncoder[T <: CellGrid[Int]: TypedEncoder]: ExpressionEncoder[Raster[T]] = typedExpressionEncoder[Raster[T]]
}

object StandardEncoders extends StandardEncoders
Loading