-
-
Notifications
You must be signed in to change notification settings - Fork 138
[Final] Unifying typed column and aggregate column. #153
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Final] Unifying typed column and aggregate column. #153
Conversation
Codecov Report
@@ Coverage Diff @@
## master #153 +/- ##
=========================================
+ Coverage 96.62% 96.83% +0.2%
=========================================
Files 52 52
Lines 860 852 -8
Branches 11 11
=========================================
- Hits 831 825 -6
+ Misses 29 27 -2
Continue to review full report at Codecov.
|
ee4d28e to
7496a83
Compare
|
Hi @kanterov and @OlivierBlanvillain. I think this is a step towards the right approach of unifying |
|
I'm not sure if it's optimal approach, what if we do it as it was done before? 18e1cdd was there any problem with it except confusion? |
|
In this pull request Or we can even use phantom types, something like private sealed abstract class UntypedColumn[T, U](expr: Expression)(
implicit encoder: TypedEncoder[U])
sealed trait TypedColumnTag[T, U]
sealed trait TypedAggregateTag[T, U]
type TypedAggregate[T, U] = UntypedColumn[T, U] with TypedAggregateTag[T, U]
type TypedColumn[T, U] = UntypedColumn[T, U] with TypedColumnTag[T, U]
type TypedColumnAndAggregate[T, U] = UntypedColumn[T, U] with TypedColumnTag[T, U] with TypedAggregateTag[T, U] |
7496a83 to
f2ef3f8
Compare
|
@OlivierBlanvillain working thith Frameless is really not fun without solving this. Each time you need to do something to a column you aggregate, well you cannot. You need to create a new dataset and then do something on that column then (like cast it, multiply it, etc.). This PR let's you do things like: c.groupBy(c('a)).agg(count()/2, sum(c('b)) * 2, sum(c('d)) > 10) Whereas now the only way to do this is by: val tmp = c.groupBy(c('a)).agg(count(), sum(c('b)), sum(c('d)))
tmp.select(tmp('_1)/2, tmp('_2) * 2, tmp('_3)>10)not cool. |
|
@OlivierBlanvillain I am all done with what I wanted to do with this PR. Will you have any time to look at this? |
|
I will, hopefully by the end of the week 😄 |
OlivierBlanvillain
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@imarios sorry for leaving this unreviewed for so long. I have to say I don't really find the encoding... But I don't really see a better way, and this is already minimal in term of duplicated code, so I guess my feeling alone is not a good reason to hold back on these changes. I would be interested to hear if someone has ideas for an another way to achieve the same thing (ping @kanterov) 😄
|
|
||
| /** Creates a typed column of either TypedColumn or TypedAggregate. | ||
| */ | ||
| protected def mkLit[U1: TypedEncoder](c: U1): TC[U1] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a fan of the mk prefix here, it sounds like this is doing more than it actually does. What about def lit and def typed?
| def this(column: Column)(implicit uencoder: TypedEncoder[U]) { | ||
| this(FramelessInternals.expr(column)) | ||
| } | ||
| type TC[A] <: AbstractTypedColumn[T, A] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, that's F-bounded polymorphism, the heaviest hammer available... But I don't really see a better way here. Still, I have a couple of questions:
- Why a type member instead of a type parameter?
- Why not expose
TinTCas well? This is definitely something I'm going to need for my work on joins (given thatTtracks the "source" of a column)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both of these are things I considered. No particular reason on taking this approach other that it simplifies the type signatures throughout. It does exposes less, so given that you have already a use case that needs T, I can make these changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@OlivierBlanvillain I didn't find a nice way to do the same I do with using a type member. Any suggestions? It gets into this recursive type definition and finding myself to use a type-lamda. This is what I have:
AbstractTypedColumn[T, U, TC[T, _] <: AbstractTypedColumn[T, U, _]]But I would want to write something like:
AbstractTypedColumn[T, U, TC[T,?] <: AbstractTypedColumn[T, U, ?]]Idk, unless you have any other ideas, I think type member looks much better for doing this than a type parameter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess you could just write TC[_, _] (without the <: part), no need to constrain it further given than it's for internal use only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I take that back, you obviously need the the <: AbstractTypedColumn to be able to do call methods on TC... Here is the type parameter version:
def AbstractTypedColumn
[T, U, ThisType[x, y] <: AbstractTypedColumn[x, y, ThisType]] ...I see no benefits of using that instead of the type member version, so let's stick to what's already here.
| object aggregate extends AggregateFunctions | ||
| object nonAggregate extends NonAggregateFunctions | ||
|
|
||
| private def typedColumnToAggregate[A: TypedEncoder, T](a: TypedColumn[T, A]): TypedAggregate[T, A] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think both of these methods are used exactly once. Any reason not to inline them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably though initially that they will used more often and then forgotten to revise. Yes, I can inline these.
| package frameless | ||
|
|
||
| import frameless.functions.aggregate._ | ||
| import frameless.functions._ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unused import?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can check
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is needed for lit.
| * @param u another column of the same type | ||
| * apache/spark | ||
| */ | ||
| def <(u: TypedColumn[T, U])(implicit canOrder: CatalystOrdered[U]): TC[Boolean] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is < from TypedColumn to TC, whereas or is from TC to TC?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, missed this one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the next 3 functions also have the same typo
| def prop(xs: List[Long]): Prop = { | ||
| val dataset = TypedDataset.create(xs.map(X1(_))) | ||
| val A = dataset.col[Long]('a) | ||
| val datasetMax = dataset.agg(max(A) * 2).collect().run().headOption |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How confident are you that all operations are supported on aggregated columns? Given what you've found with orderBy I wouldn't be surprised if only a subset was, so maybe we should be a bit more exhaustive. If there was a way to do that without duplicating all the column tests, that would be ideal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pretty confident. All the columns inside an agg() (after you applied an aggregation method), are treated as regular columns and they support all ops. This is a bit different from orderBy, where the selected columns are not really projections; just columns you use to order the data. On the other hand, with select() and agg(), columns included there are the columns on the resulting dataframe (its schema), and I found those to be consistent.
|
@imarios I played with it a bit and I'm getting more and more convinced that there is not better way to do that :) I just pushed a commit that exposes the second type parameter and renames |
|
Looks great! Thanks @OlivierBlanvillain :) I didn't even know that this pattern was called F-bounded polymorphism, so thank you for the lesson! I will do another quick pass, rebase, squash and merge. |
3d72e2f to
46d7d58
Compare
|
@OlivierBlanvillain , ok so during my "quick pass" I realized that a large collection of methods under val t = TypedDataset.create(("a","b")::("a","c")::Nil)
t.groupBy(t('_1)).agg(concatWs(":",first(t('_2)), last(t('_2)))).show().run
+---+---+
| _1| _2|
+---+---+
| a|b:c|
+---+---+ |
| def atan2[A, T](l: AbstractTypedColumn[T, A], r: Double) | ||
| (implicit | ||
| evCanBeDoubleL: CatalystCast[A, Double]): l.ThisType[T, Double] = | ||
| atan2(l, l.lit(r)).asInstanceOf[l.ThisType[T, Double]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this cast neeeded?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question, idk, but it doesn't compile otherwise. Probably some type inference quark. It works for the other one that has the right parameter to be AbstractTypedColumn[T, A] but it doesn't work for this one her. I am kind of "helping" the compiler with the asInstanceOf. If you have this branch already locally, can you give it a shot?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's because it calls into atan2 which doesn't check both l & r have same the same type. I'll fix it
| def levenshtein[T](l: TypedColumn[T, String], r: TypedColumn[T, String]): TypedColumn[T, Int] = { | ||
| new TypedColumn[T, Int](untyped.levenshtein(l.untyped, r.untyped)) | ||
| } | ||
| def levenshtein[T](l: AbstractTypedColumn[T, String], r: AbstractTypedColumn[T, String]): l.ThisType[T, Int] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not r.ThisType? Does this even make sense when l.ThisType != r.ThisType?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question for concat and concatWs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Meaning that one is an aggregate column and the other one is not? Let me try what would happen there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, good catch! This one fails in case you use one that is aggregate and one that is not ... so all of them need to be of the same kind (either all aggregates or all projections).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thought I found an easy solution ... was kind of surprised this didn't work ...
def concatWs[T, G[_,_] <: AbstractTypedColumn[T, String]](sep: String,
c1: G[T, String],
rest: G[T, String]*): c1.ThisType[T, String] =
c1.typed(untyped.concat_ws(sep, (c1 +: rest).map(_.untyped): _*))There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even with -Ypartial-unification scalac can't infer these types :(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Never seen this done this way. Nice :).
Yeah, even doing this, it doesn't really force all Gs to be of the same type. It would still compile when you have one being aggr and one being projected..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well in theory it should do the right thing, but type inference can't handle the [_, _] here. This is the signature I have for levenshtein:
def levenshtein[T, ColumnType[a, b] <: AbstractTypedColumn[a, b]](
l: AbstractTypedColumn[T, String] { type ThisType[a, b] = ColumnType[a, b] },
r: AbstractTypedColumn[T, String] { type ThisType[a, b] = ColumnType[a, b] }
): ColumnType[T, Int] =
l.typed(untyped.levenshtein(l.untyped, r.untyped))I think we can just go for the dirty solution: duplicate all the methods involving two AbstractTypedColumn.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OhhI think I got it(?)
Usage:
scala> val t1 = TypedDataset.create((1,"b")::(2,"c")::Nil)
t1: frameless.TypedDataset[(Int, String)] = [_1: int, _2: string]
scala> import frameless.functions.nonAggregate._
import frameless.functions.nonAggregate._
scala> t1.agg(concatWs(",", sum(t1('_1)).cast[String], t1('_2))).show().run
<console>:29: error: Cannot prove that frameless.TypedAggregate[_, _] =:= frameless.TypedColumn[_, _].
t1.agg(concatWs(",", sum(t1('_1)).cast[String], t1('_2))).show().run
^
scala> t1.agg(concatWs(",", sum(t1('_1)).cast[String], sum(t1('_1)).cast[String])).show().run
+---+
| _1|
+---+
|3,3|
+---+Here is what seemed to work:
def concatWs[T, G1[a,b] <: AbstractTypedColumn[a, b], G2[a,b] <: AbstractTypedColumn[a, b]](sep: String,
c1: G1[T, String],
rest: G2[T, String]*)(implicit eq: G1[_,_] =:= G2[_,_]): c1.ThisType[T, String] =
c1.typed(untyped.concat_ws(sep, (c1 +: rest).map(_.untyped): _*))There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice 😄. But I'm not sure it's worth the generality, see my latest commit where I duplicate these methods...
| }) | ||
| } | ||
|
|
||
| def stringFuncProp[A : Encoder](strFunc: TypedColumn[X1[String], String] => TypedColumn[X1[String], A], sparkFunc: Column => Column) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why can't we keep this one? So much boilerplate...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I learned this one the hard way (actually spend quite some time trying to make this work). Everything I tried failed in one way or the other with the same compiler error:
NonAggregateFunctionsTests.scala:611: method with dependent type (str: frameless.AbstractTypedColumn[T,String])str.ThisType[T,String] cannot be converted to function valueI think the error is pretty clear. Unless you have an ace up your sleeve, not sure we can do much here than bare the extra code. Note that by writing the extra code I actually had the time to make the tests a bit more realistic. The random Strings from scalacheck always result in non english characters, so upper-casing and replacing numbers never really do anything. I constrained the generators a bit more and actually ingested numbers and whitespaces to make sure that we are testing using data that trigger the logic in question.
| package functions | ||
| import frameless.functions.nonAggregate._ | ||
| import org.apache.spark.sql.{ Column, Encoder } | ||
| import org.apache.spark.sql.{Encoder, functions => untyped} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's renamed this way {functions => sparkFunctions} in two other places, this should probably be consistent (I know you just moved it around, but since we are at it...)
| def concat[T](c1: AbstractTypedColumn[T, String], | ||
| rest: AbstractTypedColumn[T, String]*): c1.ThisType[T, String] = | ||
| c1.typed(untyped.concat((c1 +: rest).map(_.untyped): _*)) | ||
| def concat[T](c1: TypedColumn[T, String], xs: TypedColumn[T, String]*): TypedColumn[T, String] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks much cleaner :D yeah, not worth the generality for sure. Now that we don't need the depended type, we can actually go back to having one vararg parameter.
|
@imarios do you mind adding a few more tests to please codecov? |
bf3bb15 to
703eb5d
Compare
|
@OlivierBlanvillain added quite few more tests. Codecov is in fact getting higher by merging this PR. |
| */ | ||
| def concat[T](c1: TypedColumn[T, String], xs: TypedColumn[T, String]*): TypedColumn[T, String] = | ||
| c1.typed(untyped.concat((c1 +: xs).map(_.untyped): _*)) | ||
| def concat[T](columns: TypedColumn[T, String]*): TypedColumn[T, String] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this break when called with no arguments?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is what we had before this PR.
scala> t.select(concat())
res1: frameless.TypedDataset[String] = [_1: string]
scala> t.select(concat()).show().run
+---+
| _1|
+---+
| |
| |
+---+
scala> t.agg(concatWs(",")).show().run
+---+
| _1|
+---+
| |
+---+There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
works ok for both agg and select and for all variations.
703eb5d to
5cd3979
Compare
|
@OlivierBlanvillain any last comments here? If it looks good I will rebase and merge. |
|
LGTM! |
…and aggregated columns.
5cd3979 to
bbe975d
Compare
|
@imarios I think master fails because of this PR, maybe something went wrong in the rebase? |
This fixes #148 as well. It essentially uses a base class for both TypedColumn and TypedAggregate that implements the vast majority of the methods once. Now, whatever you can do with a TypedColumn you can also do with TypedAggregate (like compare, multiply, cast, etc.).
In its core, this implementation uses a type member to help with type inference. This ensures that aggregated types cannot be used where a simple column is expected and vis a versa.