Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ lazy val commonScalacOptions = Seq(
"-encoding", "UTF-8",
"-feature",
"-unchecked",
"-Xfatal-warnings",
// "-Xfatal-warnings",
"-Xlint:-missing-interpolator,_",
"-Yinline-warnings",
"-Yno-adapted-args",
Expand Down
12 changes: 12 additions & 0 deletions dataset/src/main/scala/frameless/ColumnSyntax.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package frameless

import shapeless.Witness
import org.apache.spark.sql.Dataset

class ColumnSyntax[T](dataset: Dataset[T]) {
def /[A: TypedEncoder](column: Witness.Lt[Symbol])
(implicit exists: TypedColumn.Exists[T, column.T, A]): TypedColumn[A] = {
val colExpr = dataset.col(column.value.name).as[A](TypedExpressionEncoder[A])
new TypedColumn[A](colExpr)
}
}
58 changes: 27 additions & 31 deletions dataset/src/main/scala/frameless/TypedColumn.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,15 @@ import shapeless._

import scala.annotation.implicitNotFound

sealed trait UntypedExpression[T] {
sealed trait UntypedExpression {
def expr: Expression
}

/** Documentation marked "apache/spark" is thanks to apache/spark Contributors
* at https://github.com/apache/spark, licensed under Apache v2.0 available at
* http://www.apache.org/licenses/LICENSE-2.0
*/
sealed class TypedColumn[T, U](
val expr: Expression)(
implicit
val uencoder: TypedEncoder[U]
) extends UntypedExpression[T] { self =>
sealed class TypedColumn[U](val expr: Expression)(implicit val uencoder: TypedEncoder[U]) extends UntypedExpression {

/** From an untyped Column to a [[TypedColumn]]
*
Expand All @@ -41,8 +37,8 @@ sealed class TypedColumn[T, U](
*
* apache/spark
*/
def ===(other: U): TypedColumn[T, Boolean] = {
new TypedColumn[T, Boolean](untyped === other)
def ===(other: U): TypedColumn[Boolean] = {
new TypedColumn[Boolean](untyped === other)
}

/** Equality test.
Expand All @@ -52,8 +48,8 @@ sealed class TypedColumn[T, U](
*
* apache/spark
*/
def ===(other: TypedColumn[T, U]): TypedColumn[T, Boolean] = {
new TypedColumn[T, Boolean](untyped === other.untyped)
def ===(other: TypedColumn[U]): TypedColumn[Boolean] = {
new TypedColumn[Boolean](untyped === other.untyped)
}

/** Sum of this expression and another expression.
Expand All @@ -64,8 +60,8 @@ sealed class TypedColumn[T, U](
*
* apache/spark
*/
def plus(u: TypedColumn[T, U])(implicit n: CatalystNumeric[U]): TypedColumn[T, U] =
new TypedColumn[T, U](self.untyped.plus(u.untyped))
def plus(u: TypedColumn[U])(implicit n: CatalystNumeric[U]): TypedColumn[U] =
new TypedColumn[U](untyped.plus(u.untyped))

/** Sum of this expression and another expression.
* {{{
Expand All @@ -75,7 +71,7 @@ sealed class TypedColumn[T, U](
*
* apache/spark
*/
def +(u: TypedColumn[T, U])(implicit n: CatalystNumeric[U]): TypedColumn[T, U] = plus(u)
def +(u: TypedColumn[U])(implicit n: CatalystNumeric[U]): TypedColumn[U] = plus(u)

/** Sum of this expression (column) with a constant.
* {{{
Expand All @@ -86,7 +82,7 @@ sealed class TypedColumn[T, U](
* @param u a constant of the same type
* apache/spark
*/
def +(u: U)(implicit n: CatalystNumeric[U]): TypedColumn[T, U] = new TypedColumn[T, U](self.untyped.plus(u))
def +(u: U)(implicit n: CatalystNumeric[U]): TypedColumn[U] = new TypedColumn[U](untyped.plus(u))

/** Unary minus, i.e. negate the expression.
* {{{
Expand All @@ -96,7 +92,7 @@ sealed class TypedColumn[T, U](
*
* apache/spark
*/
def unary_-(implicit n: CatalystNumeric[U]): TypedColumn[T, U] = new TypedColumn[T, U](-self.untyped)
def unary_-(implicit n: CatalystNumeric[U]): TypedColumn[U] = new TypedColumn[U](-untyped)


/** Subtraction. Subtract the other expression from this expression.
Expand All @@ -107,8 +103,8 @@ sealed class TypedColumn[T, U](
*
* apache/spark
*/
def minus(u: TypedColumn[T, U])(implicit n: CatalystNumeric[U]): TypedColumn[T, U] =
new TypedColumn[T, U](self.untyped.minus(u.untyped))
def minus(u: TypedColumn[U])(implicit n: CatalystNumeric[U]): TypedColumn[U] =
new TypedColumn[U](untyped.minus(u.untyped))

/** Subtraction. Subtract the other expression from this expression.
* {{{
Expand All @@ -118,7 +114,7 @@ sealed class TypedColumn[T, U](
*
* apache/spark
*/
def -(u: TypedColumn[T, U])(implicit n: CatalystNumeric[U]): TypedColumn[T, U] = minus(u)
def -(u: TypedColumn[U])(implicit n: CatalystNumeric[U]): TypedColumn[U] = minus(u)

/** Subtraction. Subtract the other expression from this expression.
* {{{
Expand All @@ -129,7 +125,7 @@ sealed class TypedColumn[T, U](
* @param u a constant of the same type
* apache/spark
*/
def -(u: U)(implicit n: CatalystNumeric[U]): TypedColumn[T, U] = new TypedColumn[T, U](self.untyped.minus(u))
def -(u: U)(implicit n: CatalystNumeric[U]): TypedColumn[U] = new TypedColumn[U](untyped.minus(u))

/** Multiplication of this expression and another expression.
* {{{
Expand All @@ -139,8 +135,8 @@ sealed class TypedColumn[T, U](
*
* apache/spark
*/
def multiply(u: TypedColumn[T, U])(implicit n: CatalystNumeric[U]): TypedColumn[T, U] =
new TypedColumn[T, U](self.untyped.multiply(u.untyped))
def multiply(u: TypedColumn[U])(implicit n: CatalystNumeric[U]): TypedColumn[U] =
new TypedColumn[U](untyped.multiply(u.untyped))

/** Multiplication of this expression and another expression.
* {{{
Expand All @@ -150,7 +146,7 @@ sealed class TypedColumn[T, U](
*
* apache/spark
*/
def *(u: TypedColumn[T, U])(implicit n: CatalystNumeric[U]): TypedColumn[T, U] = multiply(u)
def *(u: TypedColumn[U])(implicit n: CatalystNumeric[U]): TypedColumn[U] = multiply(u)

/** Multiplication of this expression a constant.
* {{{
Expand All @@ -160,7 +156,7 @@ sealed class TypedColumn[T, U](
*
* apache/spark
*/
def *(u: U)(implicit n: CatalystNumeric[U]): TypedColumn[T, U] = new TypedColumn[T, U](self.untyped.multiply(u))
def *(u: U)(implicit n: CatalystNumeric[U]): TypedColumn[U] = new TypedColumn[U](untyped.multiply(u))

/**
* Division this expression by another expression.
Expand All @@ -172,7 +168,7 @@ sealed class TypedColumn[T, U](
* @param u another column of the same type
* apache/spark
*/
def divide(u: TypedColumn[T, U])(implicit n: CatalystNumeric[U]): TypedColumn[T, Double] = new TypedColumn[T, Double](self.untyped.divide(u.untyped))
def divide(u: TypedColumn[U])(implicit n: CatalystNumeric[U]): TypedColumn[Double] = new TypedColumn[Double](untyped.divide(u.untyped))

/**
* Division this expression by another expression.
Expand All @@ -184,7 +180,7 @@ sealed class TypedColumn[T, U](
* @param u another column of the same type
* apache/spark
*/
def /(u: TypedColumn[T, U])(implicit n: CatalystNumeric[U]): TypedColumn[T, Double] = divide(u)
def /(u: TypedColumn[U])(implicit n: CatalystNumeric[U]): TypedColumn[Double] = divide(u)

/**
* Division this expression by another expression.
Expand All @@ -196,27 +192,27 @@ sealed class TypedColumn[T, U](
* @param u a constant of the same type
* apache/spark
*/
def /(u: U)(implicit n: CatalystNumeric[U]): TypedColumn[T, Double] = new TypedColumn[T, Double](self.untyped.divide(u))
def /(u: U)(implicit n: CatalystNumeric[U]): TypedColumn[Double] = new TypedColumn[Double](untyped.divide(u))

/** Casts the column to a different type.
* {{{
* df.select(df('a).cast[Int])
* }}}
*/
def cast[A: TypedEncoder](implicit c: CatalystCast[U, A]): TypedColumn[T, A] =
new TypedColumn(self.untyped.cast(TypedEncoder[A].targetDataType))
def cast[A: TypedEncoder](implicit c: CatalystCast[U, A]): TypedColumn[A] =
new TypedColumn(untyped.cast(TypedEncoder[A].targetDataType))
}

sealed trait TypedAggregate[T, A] extends UntypedExpression[T] {
sealed trait TypedAggregate[A] extends UntypedExpression {
def expr: Expression
def aencoder: TypedEncoder[A]
}

sealed class TypedAggregateAndColumn[T, A, U](expr: Expression)(
sealed class TypedAggregateAndColumn[A, U](expr: Expression)(
implicit
val aencoder: TypedEncoder[A],
uencoder: TypedEncoder[U]
) extends TypedColumn[T, U](expr) with TypedAggregate[T, A] {
) extends TypedColumn[U](expr) with TypedAggregate[A] {

def this(column: Column)(implicit aencoder: TypedEncoder[A], uencoder: TypedEncoder[U]) {
this(FramelessInternals.expr(column))
Expand Down
Loading