Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 156 additions & 0 deletions dataset/src/main/scala/frameless/TypedWindow.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package frameless

import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.expressions.{ UnspecifiedFrame, WindowFrame }
import org.apache.spark.sql.expressions.{ Window, WindowSpec }
import shapeless.ops.hlist.{ Mapper, ToTraversable }
import shapeless.{ HList, ProductArgs }

trait OrderedWindow
trait PartitionedWindow
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you considered using a HKT instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you mean by that here. I use these traits to ensure that a partition or an order has been assigned to a window as some window functions require one or both to be present (eg denseRank needs an ordering). An ordering might be on multiple disparate types and has no affect on the return type of the window. What would be gained by making OrderedWindow a HKT here or do you mean something else?


class TypedWindow[T, A] private (
partitionSpec: Seq[UntypedExpression[T]],
orderSpec: Seq[UntypedExpression[T]],
frame: WindowFrame //TODO. Really a rows or range between
) {

def untyped: WindowSpec = Window
.partitionBy(partitionSpec.map(e => new Column(e.expr)):_*)
.orderBy(orderSpec.map(e => new Column(e.expr)):_*)
//TODO: frame


def partitionBy[U](
column: TypedColumn[T, U]
): TypedWindow[T, A with PartitionedWindow] =
partitionByMany(column)

def partitionBy[U, V](
column1: TypedColumn[T, U],
column2: TypedColumn[T, V]
): TypedWindow[T, A with PartitionedWindow] =
partitionByMany(column1, column2)

def partitionBy[U, V, W](
column1: TypedColumn[T, U],
column2: TypedColumn[T, V],
column3: TypedColumn[T,W]
): TypedWindow[T, A with PartitionedWindow] =
partitionByMany(column1, column2, column3)

object partitionByMany extends ProductArgs {
def applyProduct[U <: HList](columns: U)
(implicit
i1: ToTraversable.Aux[U, List, TypedColumn[T, _]]
): TypedWindow[T, A with PartitionedWindow] = {
new TypedWindow[T, A with PartitionedWindow](
partitionSpec = columns.toList[TypedColumn[T, _]],
orderSpec = orderSpec,
frame = frame
)
}
}

def orderBy[U](
column: SortedTypedColumn[T, U]
): TypedWindow[T, A with OrderedWindow] =
orderByMany(column)

def orderBy[U, V](
column1: SortedTypedColumn[T, U],
column2: SortedTypedColumn[T, V]
): TypedWindow[T, A with OrderedWindow] =
orderByMany(column1, column2)

def orderBy[U, V, W](
column1: SortedTypedColumn[T, U],
column2: SortedTypedColumn[T, V],
column3: SortedTypedColumn[T, W]
): TypedWindow[T, A with OrderedWindow] =
orderByMany(column1, column2, column3)

object orderByMany extends ProductArgs {
def applyProduct[U <: HList, O <: HList](columns: U)
(implicit
i0: Mapper.Aux[SortedTypedColumn.defaultAscendingPoly.type, U, O],
i1: ToTraversable.Aux[O, List, SortedTypedColumn[T, _]]
): TypedWindow[T, A with OrderedWindow] = {
new TypedWindow[T, A with OrderedWindow](
partitionSpec = partitionSpec,
orderSpec = i0(columns).toList[SortedTypedColumn[T, _]],
frame = frame
)
}
}
}

object TypedWindow {

def orderBy[T](
column: SortedTypedColumn[T, _]
): TypedWindow[T, OrderedWindow] =
new orderByManyNew[T].apply(column) //TODO: This is some ugly syntax

def orderBy[T](
column1: SortedTypedColumn[T, _],
column2: SortedTypedColumn[T, _]
): TypedWindow[T, OrderedWindow] =
new orderByManyNew[T].apply(column1, column2)

def orderBy[T](
column1: SortedTypedColumn[T, _],
column2: SortedTypedColumn[T, _],
column3: SortedTypedColumn[T, _]
): TypedWindow[T, OrderedWindow] =
new orderByManyNew[T].apply(column1, column2, column3)

//Need different name because companion class has `orderByMany` defined as well
//Need a class and not object in order to define what `T` is explicitly. Otherwise it's a mess
//This makes for some pretty horrid syntax though.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very open to suggestions on how to make this syntax nicer. https://github.com/typelevel/frameless/pull/248/files#diff-c5eeb2825da1c4337deff8bccfefb194R93 is pretty gross in my view.

class orderByManyNew[T] extends ProductArgs {
def applyProduct[U <: HList, O <: HList](columns: U)
(implicit
i0: Mapper.Aux[SortedTypedColumn.defaultAscendingPoly.type, U, O],
i1: ToTraversable.Aux[O, List, SortedTypedColumn[T, _]]
): TypedWindow[T, OrderedWindow] = {
new TypedWindow[T, OrderedWindow](
partitionSpec = Seq.empty,
orderSpec = i0(columns).toList[SortedTypedColumn[T, _]],
frame = UnspecifiedFrame
)
}
}

def partitionBy[T](
column: TypedColumn[T, _]
): TypedWindow[T, PartitionedWindow] =
new partitionByManyNew[T].apply(column)

def partitionBy[T](
column1: TypedColumn[T, _],
column2: TypedColumn[T, _]
): TypedWindow[T, PartitionedWindow] =
new partitionByManyNew[T].apply(column1, column2)

def partitionBy[T](
column1: TypedColumn[T, _],
column2: TypedColumn[T, _],
column3: TypedColumn[T, _]
): TypedWindow[T, PartitionedWindow] =
new partitionByManyNew[T].apply(column1, column2, column3)

class partitionByManyNew[T] extends ProductArgs {
def applyProduct[U <: HList](columns: U)
(implicit
i1: ToTraversable.Aux[U, List, TypedColumn[T, _]]
): TypedWindow[T, PartitionedWindow] = {
new TypedWindow[T, PartitionedWindow](
partitionSpec = columns.toList[TypedColumn[T, _]],
orderSpec = Seq.empty,
frame = UnspecifiedFrame
)
}
}
}

17 changes: 17 additions & 0 deletions dataset/src/main/scala/frameless/functions/WindowFunctions.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package frameless.functions

import frameless.{ OrderedWindow, TypedColumn, TypedWindow }
import org.apache.spark.sql.{ functions => untyped }

trait WindowFunctions {

//TODO: TypedAggregate version that can be used in `agg`
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Punting to a later PR. I'm still unsure how to do this right.

// whose specs are all either aggs or in the groupBy. Not sure how to do the latter one
def denseRank[T, A <: OrderedWindow](over: TypedWindow[T, A]): TypedColumn[T, Int] = {
new TypedColumn[T, Int](untyped.dense_rank().over(over.untyped))
}

}

//TODO: Move these to the other funcs?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should WindowFunction be its own object or should we lop them in with the other functions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is it done in vanilla? I would follow that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's in with the regular functions in functions.scala but that file is also 2500 lines long and seems a bit excessive.
We've also split out AggregateFunctions despite those also originally coming from functions.scala

object WindowFunctions extends WindowFunctions
41 changes: 41 additions & 0 deletions dataset/src/test/scala/frameless/WindowTests.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package frameless

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{ functions => sfuncs }
import frameless.functions.WindowFunctions._
import org.scalacheck.Prop
import org.scalacheck.Prop._

object WindowTests {
case class Foo(a: Int, b: String)
case class FooRank(a: Int, b: String, rank: Int)
}

class WindowTests extends TypedDatasetSuite {
test("dense rank") {
def prop[
A : TypedEncoder,
B : TypedEncoder : CatalystOrdered
](data: Vector[X2[A, B]]): Prop = {
val ds = TypedDataset.create(data)

val untypedWindow = Window.partitionBy("a").orderBy("b")

val untyped = TypedDataset.createUnsafe[X3[A, B, Int]](ds.toDF()
.withColumn("c", sfuncs.dense_rank().over(untypedWindow))
).collect().run().toVector

val denseRankWindow = denseRank(TypedWindow.orderBy(ds[B]('b))
.partitionBy(ds('a)))

val typed = ds.withColumn[X3[A, B, Int]](denseRankWindow)
.collect().run().toVector

typed ?= untyped
}

check(forAll(prop[Int, String] _))
check(forAll(prop[SQLDate, SQLDate] _))
check(forAll(prop[String, Boolean] _))
}
}