diff --git a/dataset/src/main/scala/frameless/CanAccess.scala b/dataset/src/main/scala/frameless/CanAccess.scala new file mode 100644 index 000000000..20ccfed8a --- /dev/null +++ b/dataset/src/main/scala/frameless/CanAccess.scala @@ -0,0 +1,21 @@ +package frameless + +/** `CanAccess[_, A with B]` indicates that in this context it is possible to + * access columns from both table `A` and table `B`. The first type parameter + * is a dummy argument used for type inference. + */ +sealed trait CanAccess[-T, X] + +object CanAccess { + private[this] val theInstance = new CanAccess[Nothing, Nothing] {} + private[frameless] def localCanAccessInstance[X]: CanAccess[Any, X] = theInstance.asInstanceOf[CanAccess[Any, X]] + + implicit def globalCanAccessInstance[X] = theInstance.asInstanceOf[CanAccess[X, X]] + // The trick works as follows: `(df: TypedDataset[T]).col('a)` looks for a + // CanAccess[T, T] which is always available thanks to the `globalInstance` + // implicit defined above. Expression for joins (and other multi dataset + // operations) take an `implicit a: CanAccess[Any, U with T] =>` closure. + // Because the first (dummy) type parameter of `CanAccess` is contravariant, + // the locally defined implicit will always be preferred over + // `globalInstance`, which implements the desired behavior. +} diff --git a/dataset/src/main/scala/frameless/TypedDataset.scala b/dataset/src/main/scala/frameless/TypedDataset.scala index f36403163..ea988aeb8 100644 --- a/dataset/src/main/scala/frameless/TypedDataset.scala +++ b/dataset/src/main/scala/frameless/TypedDataset.scala @@ -2,13 +2,14 @@ package frameless import frameless.ops._ import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, CreateStruct, EqualTo} -import org.apache.spark.sql.catalyst.plans.logical.{Join, Project} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Literal} +import org.apache.spark.sql.catalyst.plans.logical.Join +import org.apache.spark.sql.catalyst.plans.{Inner, LeftOuter, RightOuter, FullOuter} import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.catalyst.plans.{Inner, LeftOuter} import org.apache.spark.sql._ import shapeless._ import shapeless.ops.hlist.{Prepend, ToTraversable, Tupler} +import CanAccess.localCanAccessInstance /** [[TypedDataset]] is a safer interface for working with `Dataset`. * @@ -163,13 +164,14 @@ class TypedDataset[T] protected[frameless](val dataset: Dataset[T])(implicit val * * It is statically checked that column with such name exists and has type `A`. */ - def col[A](column: Witness.Lt[Symbol])( + def col[A, X](column: Witness.Lt[Symbol])( implicit + ca: CanAccess[T, X], exists: TypedColumn.Exists[T, column.T, A], encoder: TypedEncoder[A] - ): TypedColumn[T, A] = { + ): TypedColumn[X, A] = { val colExpr = dataset.col(column.value.name).as[A](TypedExpressionEncoder[A]) - new TypedColumn[T, A](colExpr) + new TypedColumn[X, A](colExpr) } object colMany extends SingletonProductArgs { @@ -288,6 +290,82 @@ class TypedDataset[T] protected[frameless](val dataset: Dataset[T])(implicit val ): GroupedByManyOps[T, TK, K, KT] = new GroupedByManyOps[T, TK, K, KT](self, groupedBy) } + /** Computes the inner join of `this` `Dataset` with the `other` `Dataset`, + * returning a `Tuple2` for each pair where condition evaluates to true. + */ + def joinInner[U](other: TypedDataset[U])(condition: CanAccess[Any, T with U] => TypedColumn[T with U, Boolean]) + (implicit e: TypedEncoder[(T, U)]): TypedDataset[(T, U)] = { + import FramelessInternals._ + val leftPlan = logicalPlan(dataset) + val rightPlan = logicalPlan(other.dataset) + val join = resolveSelfJoin(Join(leftPlan, rightPlan, Inner, Some(condition(localCanAccessInstance).expr))) + val joinedPlan = joinPlan(dataset, join, leftPlan, rightPlan) + val joinedDs = mkDataset(dataset.sqlContext, joinedPlan, TypedExpressionEncoder[(T, U)]) + TypedDataset.create[(T, U)](joinedDs) + } + + /** Computes the cartesian project of `this` `Dataset` with the `other` `Dataset` */ + def joinCross[U](other: TypedDataset[U]) + (implicit e: TypedEncoder[(T, U)]): TypedDataset[(T, U)] = + new TypedDataset(self.dataset.joinWith(other.dataset, new Column(Literal(true)), "cross")) + + /** Computes the full outer join of `this` `Dataset` with the `other` `Dataset`, + * returning a `Tuple2` for each pair where condition evaluates to true. + */ + def joinFull[U](other: TypedDataset[U])(condition: CanAccess[Any, T with U] => TypedColumn[T with U, Boolean]) + (implicit e: TypedEncoder[(Option[T], Option[U])]): TypedDataset[(Option[T], Option[U])] = { + import FramelessInternals._ + val leftPlan = logicalPlan(dataset) + val rightPlan = logicalPlan(other.dataset) + val join = resolveSelfJoin(Join(leftPlan, rightPlan, FullOuter, Some(condition(localCanAccessInstance).expr))) + val joinedPlan = joinPlan(dataset, join, leftPlan, rightPlan) + val joinedDs = mkDataset(dataset.sqlContext, joinedPlan, TypedExpressionEncoder[(Option[T], Option[U])]) + TypedDataset.create[(Option[T], Option[U])](joinedDs) + } + + /** Computes the right outer join of `this` `Dataset` with the `other` `Dataset`, + * returning a `Tuple2` for each pair where condition evaluates to true. + */ + def joinRight[U](other: TypedDataset[U])(condition: CanAccess[Any, T with U] => TypedColumn[T with U, Boolean]) + (implicit e: TypedEncoder[(Option[T], U)]): TypedDataset[(Option[T], U)] = { + import FramelessInternals._ + val leftPlan = logicalPlan(dataset) + val rightPlan = logicalPlan(other.dataset) + val join = resolveSelfJoin(Join(leftPlan, rightPlan, RightOuter, Some(condition(localCanAccessInstance).expr))) + val joinedPlan = joinPlan(dataset, join, leftPlan, rightPlan) + val joinedDs = mkDataset(dataset.sqlContext, joinedPlan, TypedExpressionEncoder[(Option[T], U)]) + TypedDataset.create[(Option[T], U)](joinedDs) + } + + /** Computes the left outer join of `this` `Dataset` with the `other` `Dataset`, + * returning a `Tuple2` for each pair where condition evaluates to true. + */ + def joinLeft[U](other: TypedDataset[U])(condition: CanAccess[Any, T with U] => TypedColumn[T with U, Boolean]) + (implicit e: TypedEncoder[(T, Option[U])]): TypedDataset[(T, Option[U])] = { + import FramelessInternals._ + val leftPlan = logicalPlan(dataset) + val rightPlan = logicalPlan(other.dataset) + val join = resolveSelfJoin(Join(leftPlan, rightPlan, LeftOuter, Some(condition(localCanAccessInstance).expr))) + val joinedPlan = joinPlan(dataset, join, leftPlan, rightPlan) + val joinedDs = mkDataset(dataset.sqlContext, joinedPlan, TypedExpressionEncoder[(T, Option[U])]) + + TypedDataset.create[(T, Option[U])](joinedDs) + } + + /** Computes the left semi join of `this` `Dataset` with the `other` `Dataset`, + * returning a `Tuple2` for each pair where condition evaluates to true. + */ + def joinLeftSemi[U](other: TypedDataset[U])(condition: CanAccess[Any, T with U] => TypedColumn[T with U, Boolean]): TypedDataset[T] = + new TypedDataset(self.dataset.join(other.dataset, condition(localCanAccessInstance).untyped, "leftsemi") + .as[T](TypedExpressionEncoder(encoder))) + + /** Computes the left anti join of `this` `Dataset` with the `other` `Dataset`, + * returning a `Tuple2` for each pair where condition evaluates to true. + */ + def joinLeftAnti[U](other: TypedDataset[U])(condition: CanAccess[Any, T with U] => TypedColumn[T with U, Boolean]): TypedDataset[T] = + new TypedDataset(self.dataset.join(other.dataset, condition(localCanAccessInstance).untyped, "leftanti") + .as[T](TypedExpressionEncoder(encoder))) + /** Fixes SPARK-6231, for more details see original code in [[Dataset#join]] **/ private def resolveSelfJoin(join: Join): Join = { val plan = FramelessInternals.ofRows(dataset.sparkSession, join).queryExecution.analyzed.asInstanceOf[Join] @@ -315,84 +393,39 @@ class TypedDataset[T] protected[frameless](val dataset: Dataset[T])(implicit val } } - def join[A, B]( - right: TypedDataset[A], - leftCol: TypedColumn[T, B], - rightCol: TypedColumn[A, B] - ): TypedDataset[(T, A)] = { - implicit def re = right.encoder - - val leftPlan = FramelessInternals.logicalPlan(dataset) - val rightPlan = FramelessInternals.logicalPlan(right.dataset) - val condition = EqualTo(leftCol.expr, rightCol.expr) - - val join = resolveSelfJoin(Join(leftPlan, rightPlan, Inner, Some(condition))) - val joined = FramelessInternals.executePlan(dataset, join) - val leftOutput = joined.analyzed.output.take(leftPlan.output.length) - val rightOutput = joined.analyzed.output.takeRight(rightPlan.output.length) - - val joinedPlan = Project(List( - Alias(CreateStruct(leftOutput), "_1")(), - Alias(CreateStruct(rightOutput), "_2")() - ), joined.analyzed) - - val joinedDs = FramelessInternals.mkDataset(dataset.sqlContext, joinedPlan, TypedExpressionEncoder[(T, A)]) - - TypedDataset.create[(T, A)](joinedDs) - } - - def joinLeft[A: TypedEncoder, B]( - right: TypedDataset[A], - leftCol: TypedColumn[T, B], - rightCol: TypedColumn[A, B] - )(implicit e: TypedEncoder[(T, Option[A])]): TypedDataset[(T, Option[A])] = { - val leftPlan = FramelessInternals.logicalPlan(dataset) - val rightPlan = FramelessInternals.logicalPlan(right.dataset) - val condition = EqualTo(leftCol.expr, rightCol.expr) - - val join = resolveSelfJoin(Join(leftPlan, rightPlan, LeftOuter, Some(condition))) - val joined = FramelessInternals.executePlan(dataset, join) - val leftOutput = joined.analyzed.output.take(leftPlan.output.length) - val rightOutput = joined.analyzed.output.takeRight(rightPlan.output.length) - - val joinedPlan = Project(List( - Alias(CreateStruct(leftOutput), "_1")(), - Alias(CreateStruct(rightOutput), "_2")() - ), joined.analyzed) - - val joinedDs = FramelessInternals.mkDataset(dataset.sqlContext, joinedPlan, TypedExpressionEncoder[(T, Option[A])]) - - TypedDataset.create[(T, Option[A])](joinedDs) - } - - /** Takes a function from A => R and converts it to a UDF for TypedColumn[T, A] => TypedColumn[T, R]. + /** Takes a function from A => R and converts it to a UDF for TypedColumn[A] => TypedColumn[R]. */ def makeUDF[A: TypedEncoder, R: TypedEncoder](f: A => R): - TypedColumn[T, A] => TypedColumn[T, R] = functions.udf(f) + TypedColumn[T, A] => TypedColumn[T, R] = + functions.udf(f) /** Takes a function from (A1, A2) => R and converts it to a UDF for * (TypedColumn[T, A1], TypedColumn[T, A2]) => TypedColumn[T, R]. */ def makeUDF[A1: TypedEncoder, A2: TypedEncoder, R: TypedEncoder](f: (A1, A2) => R): - (TypedColumn[T, A1], TypedColumn[T, A2]) => TypedColumn[T, R] = functions.udf(f) + (TypedColumn[T, A1], TypedColumn[T, A2]) => TypedColumn[T, R] = + functions.udf(f) /** Takes a function from (A1, A2, A3) => R and converts it to a UDF for * (TypedColumn[T, A1], TypedColumn[T, A2], TypedColumn[T, A3]) => TypedColumn[T, R]. */ def makeUDF[A1: TypedEncoder, A2: TypedEncoder, A3: TypedEncoder, R: TypedEncoder](f: (A1, A2, A3) => R): - (TypedColumn[T, A1], TypedColumn[T, A2], TypedColumn[T, A3]) => TypedColumn[T, R] = functions.udf(f) + (TypedColumn[T, A1], TypedColumn[T, A2], TypedColumn[T, A3]) => TypedColumn[T, R] = + functions.udf(f) /** Takes a function from (A1, A2, A3, A4) => R and converts it to a UDF for * (TypedColumn[T, A1], TypedColumn[T, A2], TypedColumn[T, A3], TypedColumn[T, A4]) => TypedColumn[T, R]. */ def makeUDF[A1: TypedEncoder, A2: TypedEncoder, A3: TypedEncoder, A4: TypedEncoder, R: TypedEncoder](f: (A1, A2, A3, A4) => R): - (TypedColumn[T, A1], TypedColumn[T, A2], TypedColumn[T, A3], TypedColumn[T, A4]) => TypedColumn[T, R] = functions.udf(f) + (TypedColumn[T, A1], TypedColumn[T, A2], TypedColumn[T, A3], TypedColumn[T, A4]) => TypedColumn[T, R] = + functions.udf(f) /** Takes a function from (A1, A2, A3, A4, A5) => R and converts it to a UDF for * (TypedColumn[T, A1], TypedColumn[T, A2], TypedColumn[T, A3], TypedColumn[T, A4], TypedColumn[T, A5]) => TypedColumn[T, R]. */ def makeUDF[A1: TypedEncoder, A2: TypedEncoder, A3: TypedEncoder, A4: TypedEncoder, A5: TypedEncoder, R: TypedEncoder](f: (A1, A2, A3, A4, A5) => R): - (TypedColumn[T, A1], TypedColumn[T, A2], TypedColumn[T, A3], TypedColumn[T, A4], TypedColumn[T, A5]) => TypedColumn[T, R] = functions.udf(f) + (TypedColumn[T, A1], TypedColumn[T, A2], TypedColumn[T, A3], TypedColumn[T, A4], TypedColumn[T, A5]) => TypedColumn[T, R] = + functions.udf(f) /** Type-safe projection from type T to Tuple1[A] * {{{ diff --git a/dataset/src/main/scala/org/apache/spark/sql/FramelessInternals.scala b/dataset/src/main/scala/org/apache/spark/sql/FramelessInternals.scala index 934546db5..7eb0c171b 100644 --- a/dataset/src/main/scala/org/apache/spark/sql/FramelessInternals.scala +++ b/dataset/src/main/scala/org/apache/spark/sql/FramelessInternals.scala @@ -1,7 +1,8 @@ package org.apache.spark.sql import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression} -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.expressions.{Alias, CreateStruct} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.types.ObjectType @@ -23,17 +24,25 @@ object FramelessInternals { def logicalPlan(ds: Dataset[_]): LogicalPlan = ds.logicalPlan - def executePlan(ds: Dataset[_], plan: LogicalPlan): QueryExecution = { + def executePlan(ds: Dataset[_], plan: LogicalPlan): QueryExecution = ds.sparkSession.sessionState.executePlan(plan) + + def joinPlan(ds: Dataset[_], plan: LogicalPlan, leftPlan: LogicalPlan, rightPlan: LogicalPlan): LogicalPlan = { + val joined = executePlan(ds, plan) + val leftOutput = joined.analyzed.output.take(leftPlan.output.length) + val rightOutput = joined.analyzed.output.takeRight(rightPlan.output.length) + + Project(List( + Alias(CreateStruct(leftOutput), "_1")(), + Alias(CreateStruct(rightOutput), "_2")() + ), joined.analyzed) } - def mkDataset[T](sqlContext: SQLContext, plan: LogicalPlan, encoder: Encoder[T]): Dataset[T] = { + def mkDataset[T](sqlContext: SQLContext, plan: LogicalPlan, encoder: Encoder[T]): Dataset[T] = new Dataset(sqlContext, plan, encoder) - } - def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = { + def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = Dataset.ofRows(sparkSession, logicalPlan) - } // because org.apache.spark.sql.types.UserDefinedType is private[spark] type UserDefinedType[A >: Null] = org.apache.spark.sql.types.UserDefinedType[A] diff --git a/dataset/src/test/scala/frameless/ColTests.scala b/dataset/src/test/scala/frameless/ColTests.scala index ad62aa068..59d419fcb 100644 --- a/dataset/src/test/scala/frameless/ColTests.scala +++ b/dataset/src/test/scala/frameless/ColTests.scala @@ -13,18 +13,18 @@ class ColTests extends TypedDatasetSuite { x4.col('a) t4.col('_1) - x4.col[Int]('a) - t4.col[Int]('_1) + x4.col[Int, X4[Int, String, Long, Boolean]]('a) + t4.col[Int, (Int, String, Long, Boolean)]('_1) - illTyped("x4.col[String]('a)", "No column .* of type String in frameless.X4.*") + illTyped("x4.col[String, X4[Int, String, Long, Boolean]]('a)", "No column .* of type String in frameless.X4.*") x4.col('b) t4.col('_2) - x4.col[String]('b) - t4.col[String]('_2) + x4.col[String, X4[Int, String, Long, Boolean]]('b) + t4.col[String, (Int, String, Long, Boolean)]('_2) - illTyped("x4.col[Int]('b)", "No column .* of type Int in frameless.X4.*") + illTyped("x4.col[Int, X4[Int, String, Long, Boolean]]('b)", "No column .* of type Int in frameless.X4.*") () } diff --git a/dataset/src/test/scala/frameless/GroupByTests.scala b/dataset/src/test/scala/frameless/GroupByTests.scala index ea7f3e36d..5828d8cfe 100644 --- a/dataset/src/test/scala/frameless/GroupByTests.scala +++ b/dataset/src/test/scala/frameless/GroupByTests.scala @@ -16,8 +16,8 @@ class GroupByTests extends TypedDatasetSuite { widen: B => Out ): Prop = { val dataset = TypedDataset.create(data) - val A = dataset.col[A]('a) - val B = dataset.col[B]('b) + val A = dataset.col('a) + val B = dataset.col('b) val datasetSumByA = dataset.groupByMany(A).agg(sum(B)).collect().run.toVector.sortBy(_._1) val sumByA = data.groupBy(_.a).mapValues(_.map(_.b).map(widen).sum).toVector.sortBy(_._1) @@ -34,7 +34,7 @@ class GroupByTests extends TypedDatasetSuite { summable: CatalystSummable[A, A] ): Prop = { val dataset = TypedDataset.create(data) - val A = dataset.col[A]('a) + val A = dataset.col('a) val datasetSum = dataset.agg(sum(A)).collect().run().toVector val listSum = data.map(_.a).sum @@ -55,8 +55,8 @@ class GroupByTests extends TypedDatasetSuite { bs: CatalystSummable[B, B] ): Prop = { val dataset = TypedDataset.create(data) - val A = dataset.col[A]('a) - val B = dataset.col[B]('b) + val A = dataset.col('a) + val B = dataset.col('b) val datasetSum = dataset.agg(sum(A), sum(B)).collect().run().toVector val listSumA = data.map(_.a).sum @@ -80,9 +80,9 @@ class GroupByTests extends TypedDatasetSuite { cs: CatalystSummable[C, C] ): Prop = { val dataset = TypedDataset.create(data) - val A = dataset.col[A]('a) - val B = dataset.col[B]('b) - val C = dataset.col[C]('c) + val A = dataset.col('a) + val B = dataset.col('b) + val C = dataset.col('c) val datasetSum = dataset.agg(sum(A), sum(B), sum(C)).collect().run().toVector val listSumA = data.map(_.a).sum @@ -109,10 +109,10 @@ class GroupByTests extends TypedDatasetSuite { fo: CatalystOrdered[D] ): Prop = { val dataset = TypedDataset.create(data) - val A = dataset.col[A]('a) - val B = dataset.col[B]('b) - val C = dataset.col[C]('c) - val D = dataset.col[D]('d) + val A = dataset.col('a) + val B = dataset.col('b) + val C = dataset.col('c) + val D = dataset.col('d) val datasetSum = dataset.agg(sum(A), sum(B), min(C), max(D)).collect().run().toVector val listSumA = data.map(_.a).sum @@ -139,8 +139,8 @@ class GroupByTests extends TypedDatasetSuite { widen: B => Out ): Prop = { val dataset = TypedDataset.create(data) - val A = dataset.col[A]('a) - val B = dataset.col[B]('b) + val A = dataset.col('a) + val B = dataset.col('b) val datasetSumByA = dataset.groupBy(A).agg(sum(B)).collect().run.toVector.sortBy(_._1) val sumByA = data.groupBy(_.a).mapValues(_.map(_.b).map(widen).sum).toVector.sortBy(_._1) @@ -157,8 +157,8 @@ class GroupByTests extends TypedDatasetSuite { B: TypedEncoder : Numeric ](data: List[X2[A, B]]): Prop = { val dataset = TypedDataset.create(data) - val A = dataset.col[A]('a) - val B = dataset.col[B]('b) + val A = dataset.col('a) + val B = dataset.col('b) val datasetSumByA = dataset.groupBy(A) .mapGroups { case (a, xs) => (a, xs.map(_.b).sum) } @@ -186,9 +186,9 @@ class GroupByTests extends TypedDatasetSuite { widenc: C => OutC ): Prop = { val dataset = TypedDataset.create(data) - val A = dataset.col[A]('a) - val B = dataset.col[B]('b) - val C = dataset.col[C]('c) + val A = dataset.col('a) + val B = dataset.col('b) + val C = dataset.col('c) val framelessSumBC = dataset .groupBy(A) @@ -255,9 +255,9 @@ class GroupByTests extends TypedDatasetSuite { widenc: C => OutC ): Prop = { val dataset = TypedDataset.create(data) - val A = dataset.col[A]('a) - val B = dataset.col[B]('b) - val C = dataset.col[C]('c) + val A = dataset.col('a) + val B = dataset.col('b) + val C = dataset.col('c) val framelessSumC = dataset .groupBy(A,B) @@ -330,10 +330,10 @@ class GroupByTests extends TypedDatasetSuite { widend: D => OutD ): Prop = { val dataset = TypedDataset.create(data) - val A = dataset.col[A]('a) - val B = dataset.col[B]('b) - val C = dataset.col[C]('c) - val D = dataset.col[D]('d) + val A = dataset.col('a) + val B = dataset.col('b) + val C = dataset.col('c) + val D = dataset.col('d) val datasetSumByAB = dataset .groupBy(A, B) @@ -359,9 +359,9 @@ class GroupByTests extends TypedDatasetSuite { C: TypedEncoder : Numeric ](data: List[X3[A, B, C]]): Prop = { val dataset = TypedDataset.create(data) - val A = dataset.col[A]('a) - val B = dataset.col[B]('b) - val C = dataset.col[C]('c) + val A = dataset.col('a) + val B = dataset.col('b) + val C = dataset.col('c) val datasetSumByAB = dataset .groupBy(A, B) @@ -384,7 +384,7 @@ class GroupByTests extends TypedDatasetSuite { B: TypedEncoder ](data: Vector[X2[A, B]]): Prop = { val dataset = TypedDataset.create(data) - val A = dataset.col[A]('a) + val A = dataset.col('a) val datasetGrouped = dataset .groupBy(A) @@ -407,7 +407,7 @@ class GroupByTests extends TypedDatasetSuite { B: TypedEncoder : Ordering ](data: Vector[X2[A, B]]): Prop = { val dataset = TypedDataset.create(data) - val A = dataset.col[A]('a) + val A = dataset.col('a) val datasetGrouped = dataset .groupBy(A) @@ -435,8 +435,8 @@ class GroupByTests extends TypedDatasetSuite { C: TypedEncoder : Ordering ](data: Vector[X3[A, B, C]]): Prop = { val dataset = TypedDataset.create(data) - val cA = dataset.col[A]('a) - val cB = dataset.col[B]('b) + val cA = dataset.col('a) + val cB = dataset.col('b) val datasetGrouped = dataset .groupBy(cA, cB) diff --git a/dataset/src/test/scala/frameless/JoinTests.scala b/dataset/src/test/scala/frameless/JoinTests.scala index 5bf65027d..3b73aed64 100644 --- a/dataset/src/test/scala/frameless/JoinTests.scala +++ b/dataset/src/test/scala/frameless/JoinTests.scala @@ -4,7 +4,7 @@ import org.scalacheck.Prop import org.scalacheck.Prop._ class JoinTests extends TypedDatasetSuite { - test("ab.joinLeft(ac, ab.a, ac.a)") { + test("ab.joinRight(ac)(ab.a == ac.a)") { def prop[ A : TypedEncoder : Ordering, B : TypedEncoder : Ordering, @@ -13,7 +13,62 @@ class JoinTests extends TypedDatasetSuite { val leftDs = TypedDataset.create(left) val rightDs = TypedDataset.create(right) val joinedDs = leftDs - .joinLeft(rightDs, leftDs.col('a), rightDs.col('a)) + .joinRight(rightDs)(implicit a => leftDs.col('a) === rightDs.col('a)) + .collect().run().toVector.sorted + + val leftKeys = left.map(_.a).toSet + val joined = { + for { + ab <- left + ac <- right if ac.a == ab.a + } yield (Some(ab), ac) + }.toVector ++ { + for { + ac <- right if !leftKeys.contains(ac.a) + } yield (None, ac) + }.toVector + + (joined.sorted ?= joinedDs) && (joinedDs.map(_._2).toSet ?= right.toSet) + } + + check(forAll(prop[Int, Long, String] _)) + } + + test("ab.joinInner(ac)(ab.a == ac.a)") { + def prop[ + A : TypedEncoder : Ordering, + B : TypedEncoder : Ordering, + C : TypedEncoder : Ordering + ](left: List[X2[A, B]], right: List[X2[A, C]]): Prop = { + val leftDs = TypedDataset.create(left) + val rightDs = TypedDataset.create(right) + val joinedDs = leftDs + .joinInner(rightDs)(implicit a => leftDs.col('a) === rightDs.col('a)) + .collect().run().toVector.sorted + + val joined = { + for { + ab <- left + ac <- right if ac.a == ab.a + } yield (ab, ac) + }.toVector + + joined.sorted ?= joinedDs + } + + check(forAll(prop[Int, Long, String] _)) + } + + test("ab.joinLeft(ac)(ab.a == ac.a)") { + def prop[ + A : TypedEncoder : Ordering, + B : TypedEncoder : Ordering, + C : TypedEncoder : Ordering + ](left: List[X2[A, B]], right: List[X2[A, C]]): Prop = { + val leftDs = TypedDataset.create(left) + val rightDs = TypedDataset.create(right) + val joinedDs = leftDs + .joinLeft(rightDs)(implicit a => leftDs.col('a) === rightDs.col('a)) .collect().run().toVector.sorted val rightKeys = right.map(_.a).toSet @@ -34,7 +89,7 @@ class JoinTests extends TypedDatasetSuite { check(forAll(prop[Int, Long, String] _)) } - test("ab.join(ac, ab.a, ac.a)") { + test("ab.joinFull(ac)(ab.a == ac.a)") { def prop[ A : TypedEncoder : Ordering, B : TypedEncoder : Ordering, @@ -43,23 +98,108 @@ class JoinTests extends TypedDatasetSuite { val leftDs = TypedDataset.create(left) val rightDs = TypedDataset.create(right) val joinedDs = leftDs - .join(rightDs, leftDs.col('a), rightDs.col('a)) + .joinFull(rightDs)(implicit a => leftDs.col('a) === rightDs.col('a)) .collect().run().toVector.sorted + val rightKeys = right.map(_.a).toSet + val leftKeys = left.map(_.a).toSet val joined = { for { ab <- left ac <- right if ac.a == ab.a + } yield (Some(ab), Some(ac)) + }.toVector ++ { + for { + ab <- left if !rightKeys.contains(ab.a) + } yield (Some(ab), None) + }.toVector ++ { + for { + ac <- right if !leftKeys.contains(ac.a) + } yield (None, Some(ac)) + }.toVector + + (joined.sorted ?= joinedDs) + } + + check(forAll(prop[Int, Long, String] _)) + } + + test("ab.joinCross(ac)") { + def prop[ + A : TypedEncoder : Ordering, + B : TypedEncoder : Ordering, + C : TypedEncoder : Ordering + ](left: List[X2[A, B]], right: List[X2[A, C]]): Prop = { + val leftDs = TypedDataset.create(left) + val rightDs = TypedDataset.create(right) + val joinedDs = leftDs + .joinCross(rightDs) + .collect().run().toVector.sorted + + val joined = { + for { + ab <- left + ac <- right } yield (ab, ac) }.toVector + (joined.sorted ?= joinedDs) + } + + check(forAll(prop[Int, Long, String] _)) + } + + test("ab.joinLeftSemi(ac)(ab.a == ac.a)") { + def prop[ + A : TypedEncoder : Ordering, + B : TypedEncoder : Ordering, + C : TypedEncoder : Ordering + ](left: List[X2[A, B]], right: List[X2[A, C]]): Prop = { + val leftDs = TypedDataset.create(left) + val rightDs = TypedDataset.create(right) + val rightKeys = right.map(_.a).toSet + val joinedDs = leftDs + .joinLeftSemi(rightDs)(implicit a => leftDs.col('a) === rightDs.col('a)) + .collect().run().toVector.sorted + + val joined = { + for { + ab <- left if rightKeys.contains(ab.a) + } yield ab + }.toVector + + joined.sorted ?= joinedDs + } + + check(forAll(prop[Int, Long, String] _)) + } + + test("ab.joinLeftAnti(ac)(ab.a == ac.a)") { + def prop[ + A : TypedEncoder : Ordering, + B : TypedEncoder : Ordering, + C : TypedEncoder : Ordering + ](left: List[X2[A, B]], right: List[X2[A, C]]): Prop = { + val leftDs = TypedDataset.create(left) + val rightDs = TypedDataset.create(right) + val rightKeys = right.map(_.a).toSet + val joinedDs = leftDs + .joinLeftAnti(rightDs)(implicit a => leftDs.col('a) === rightDs.col('a)) + .collect().run().toVector.sorted + + val joined = { + for { + ab <- left if !rightKeys.contains(ab.a) + } yield ab + }.toVector + joined.sorted ?= joinedDs } check(forAll(prop[Int, Long, String] _)) } - test("self join") { + test("self inner join") { def prop[ A : TypedEncoder : Ordering, B : TypedEncoder : Ordering @@ -68,7 +208,7 @@ class JoinTests extends TypedDatasetSuite { val count = ds.dataset.join(ds.dataset, ds.dataset.col("a") === ds.dataset.col("a")).count() - val countDs = ds.join(ds, ds.col('a), ds.col('a)) + val countDs = ds.joinInner(ds)(implicit a => ds.col('a) === ds.col('a)) .count().run() count ?= countDs diff --git a/dataset/src/test/scala/frameless/SelectTests.scala b/dataset/src/test/scala/frameless/SelectTests.scala index d90466ccd..0d563bec3 100644 --- a/dataset/src/test/scala/frameless/SelectTests.scala +++ b/dataset/src/test/scala/frameless/SelectTests.scala @@ -15,7 +15,7 @@ class SelectTests extends TypedDatasetSuite { ca: ClassTag[A] ): Prop = { val dataset = TypedDataset.create(data) - val A = dataset.col[A]('a) + val A = dataset.col('a) val dataset2 = dataset.select(A).collect().run().toVector val data2 = data.map { case X4(a, _, _, _) => a } @@ -39,8 +39,8 @@ class SelectTests extends TypedDatasetSuite { ca: ClassTag[A] ): Prop = { val dataset = TypedDataset.create(data) - val A = dataset.col[A]('a) - val B = dataset.col[B]('b) + val A = dataset.col('a) + val B = dataset.col('b) val dataset2 = dataset.select(A, B).collect().run().toVector val data2 = data.map { case X4(a, b, _, _) => (a, b) } @@ -64,9 +64,9 @@ class SelectTests extends TypedDatasetSuite { ca: ClassTag[A] ): Prop = { val dataset = TypedDataset.create(data) - val A = dataset.col[A]('a) - val B = dataset.col[B]('b) - val C = dataset.col[C]('c) + val A = dataset.col('a) + val B = dataset.col('b) + val C = dataset.col('c) val dataset2 = dataset.select(A, B, C).collect().run().toVector val data2 = data.map { case X4(a, b, c, _) => (a, b, c) } @@ -90,10 +90,10 @@ class SelectTests extends TypedDatasetSuite { ca: ClassTag[A] ): Prop = { val dataset = TypedDataset.create(data) - val a1 = dataset.col[A]('a) - val a2 = dataset.col[B]('b) - val a3 = dataset.col[C]('c) - val a4 = dataset.col[D]('d) + val a1 = dataset.col('a) + val a2 = dataset.col('b) + val a3 = dataset.col('c) + val a4 = dataset.col('d) val dataset2 = dataset.select(a1, a2, a3, a4).collect().run().toVector val data2 = data.map { case X4(a, b, c, d) => (a, b, c, d) } @@ -117,10 +117,10 @@ class SelectTests extends TypedDatasetSuite { ca: ClassTag[A] ): Prop = { val dataset = TypedDataset.create(data) - val a1 = dataset.col[A]('a) - val a2 = dataset.col[B]('b) - val a3 = dataset.col[C]('c) - val a4 = dataset.col[D]('d) + val a1 = dataset.col('a) + val a2 = dataset.col('b) + val a3 = dataset.col('c) + val a4 = dataset.col('d) val dataset2 = dataset.select(a1, a2, a3, a4, a1).collect().run().toVector val data2 = data.map { case X4(a, b, c, d) => (a, b, c, d, a) } @@ -144,10 +144,10 @@ class SelectTests extends TypedDatasetSuite { ca: ClassTag[A] ): Prop = { val dataset = TypedDataset.create(data) - val a1 = dataset.col[A]('a) - val a2 = dataset.col[B]('b) - val a3 = dataset.col[C]('c) - val a4 = dataset.col[D]('d) + val a1 = dataset.col('a) + val a2 = dataset.col('b) + val a3 = dataset.col('c) + val a4 = dataset.col('d) val dataset2 = dataset.select(a1, a2, a3, a4, a1, a3).collect().run().toVector val data2 = data.map { case X4(a, b, c, d) => (a, b, c, d, a, c) } @@ -171,10 +171,10 @@ class SelectTests extends TypedDatasetSuite { ca: ClassTag[A] ): Prop = { val dataset = TypedDataset.create(data) - val a1 = dataset.col[A]('a) - val a2 = dataset.col[B]('b) - val a3 = dataset.col[C]('c) - val a4 = dataset.col[D]('d) + val a1 = dataset.col('a) + val a2 = dataset.col('b) + val a3 = dataset.col('c) + val a4 = dataset.col('d) val dataset2 = dataset.select(a1, a2, a3, a4, a1, a3, a2).collect().run().toVector val data2 = data.map { case X4(a, b, c, d) => (a, b, c, d, a, c, b) } @@ -198,10 +198,10 @@ class SelectTests extends TypedDatasetSuite { ca: ClassTag[A] ): Prop = { val dataset = TypedDataset.create(data) - val a1 = dataset.col[A]('a) - val a2 = dataset.col[B]('b) - val a3 = dataset.col[C]('c) - val a4 = dataset.col[D]('d) + val a1 = dataset.col('a) + val a2 = dataset.col('b) + val a3 = dataset.col('c) + val a4 = dataset.col('d) val dataset2 = dataset.select(a1, a2, a3, a4, a1, a3, a2, a1).collect().run().toVector val data2 = data.map { case X4(a, b, c, d) => (a, b, c, d, a, c, b, a) } @@ -225,10 +225,10 @@ class SelectTests extends TypedDatasetSuite { ca: ClassTag[A] ): Prop = { val dataset = TypedDataset.create(data) - val a1 = dataset.col[A]('a) - val a2 = dataset.col[B]('b) - val a3 = dataset.col[C]('c) - val a4 = dataset.col[D]('d) + val a1 = dataset.col('a) + val a2 = dataset.col('b) + val a3 = dataset.col('c) + val a4 = dataset.col('d) val dataset2 = dataset.select(a1, a2, a3, a4, a1, a3, a2, a1, a3).collect().run().toVector val data2 = data.map { case X4(a, b, c, d) => (a, b, c, d, a, c, b, a, c) } @@ -252,10 +252,10 @@ class SelectTests extends TypedDatasetSuite { ca: ClassTag[A] ): Prop = { val dataset = TypedDataset.create(data) - val a1 = dataset.col[A]('a) - val a2 = dataset.col[B]('b) - val a3 = dataset.col[C]('c) - val a4 = dataset.col[D]('d) + val a1 = dataset.col('a) + val a2 = dataset.col('b) + val a3 = dataset.col('c) + val a4 = dataset.col('d) val dataset2 = dataset.select(a1, a2, a3, a4, a1, a3, a2, a1, a3, a4).collect().run().toVector val data2 = data.map { case X4(a, b, c, d) => (a, b, c, d, a, c, b, a, c, d) } @@ -397,4 +397,4 @@ class SelectTests extends TypedDatasetSuite { val e = TypedDataset.create[(Int, String, Long)]((1, "a", 2L) :: (2, "b", 4L) :: (2, "b", 1L) :: Nil) illTyped("""e.select(frameless.functions.aggregate.sum(e('_1)))""") } -} \ No newline at end of file +} diff --git a/dataset/src/test/scala/frameless/functions/AggregateFunctionsTests.scala b/dataset/src/test/scala/frameless/functions/AggregateFunctionsTests.scala index 8090efb83..2cd0ca98f 100644 --- a/dataset/src/test/scala/frameless/functions/AggregateFunctionsTests.scala +++ b/dataset/src/test/scala/frameless/functions/AggregateFunctionsTests.scala @@ -41,7 +41,7 @@ class AggregateFunctionsTests extends TypedDatasetSuite { summer: Sum4Tests[A, Out] ): Prop = { val dataset = TypedDataset.create(xs.map(X1(_))) - val A = dataset.col[A]('a) + val A = dataset.col('a) val datasetSum: List[Out] = dataset.agg(sum(A)).collect().run().toList @@ -81,7 +81,7 @@ class AggregateFunctionsTests extends TypedDatasetSuite { summer: Sum4Tests[A, Out] ): Prop = { val dataset = TypedDataset.create(xs.map(X1(_))) - val A = dataset.col[A]('a) + val A = dataset.col('a) val datasetSum: List[Out] = dataset.agg(sumDistinct(A)).collect().run().toList @@ -115,7 +115,7 @@ class AggregateFunctionsTests extends TypedDatasetSuite { averager: Averager4Tests[A, Out] ): Prop = { val dataset = TypedDataset.create(xs.map(X1(_))) - val A = dataset.col[A]('a) + val A = dataset.col('a) val datasetAvg: Vector[Out] = dataset.agg(avg(A)).collect().run().toVector @@ -148,7 +148,7 @@ class AggregateFunctionsTests extends TypedDatasetSuite { def prop[A: TypedEncoder : CatalystVariance : Numeric](xs: List[A]): Prop = { val numeric = implicitly[Numeric[A]] val dataset = TypedDataset.create(xs.map(X1(_))) - val A = dataset.col[A]('a) + val A = dataset.col('a) val datasetStdOpt = dataset.agg(stddev(A)).collect().run().toVector.headOption val datasetVarOpt = dataset.agg(variance(A)).collect().run().toVector.headOption @@ -185,7 +185,7 @@ class AggregateFunctionsTests extends TypedDatasetSuite { test("count('a)") { def prop[A: TypedEncoder](xs: List[A]): Prop = { val dataset = TypedDataset.create(xs.map(X1(_))) - val A = dataset.col[A]('a) + val A = dataset.col('a) val datasetCount = dataset.agg(count(A)).collect().run() datasetCount ?= List(xs.size.toLong) @@ -198,7 +198,7 @@ class AggregateFunctionsTests extends TypedDatasetSuite { test("max") { def prop[A: TypedEncoder: CatalystOrdered](xs: List[A])(implicit o: Ordering[A]): Prop = { val dataset = TypedDataset.create(xs.map(X1(_))) - val A = dataset.col[A]('a) + val A = dataset.col('a) val datasetMax = dataset.agg(max(A)).collect().run().toList datasetMax ?= xs.reduceOption(o.max).toList @@ -215,7 +215,7 @@ class AggregateFunctionsTests extends TypedDatasetSuite { test("min") { def prop[A: TypedEncoder: CatalystOrdered](xs: List[A])(implicit o: Ordering[A]): Prop = { val dataset = TypedDataset.create(xs.map(X1(_))) - val A = dataset.col[A]('a) + val A = dataset.col('a) val datasetMin = dataset.agg(min(A)).collect().run().toList @@ -233,7 +233,7 @@ class AggregateFunctionsTests extends TypedDatasetSuite { test("first") { def prop[A: TypedEncoder](xs: List[A]): Prop = { val dataset = TypedDataset.create(xs.map(X1(_))) - val A = dataset.col[A]('a) + val A = dataset.col('a) val datasetFirst = dataset.agg(first(A)).collect().run().toList @@ -252,7 +252,7 @@ class AggregateFunctionsTests extends TypedDatasetSuite { test("last") { def prop[A: TypedEncoder](xs: List[A]): Prop = { val dataset = TypedDataset.create(xs.map(X1(_))) - val A = dataset.col[A]('a) + val A = dataset.col('a) val datasetLast = dataset.agg(last(A)).collect().run().toList diff --git a/dataset/src/test/scala/frameless/functions/UdfTests.scala b/dataset/src/test/scala/frameless/functions/UdfTests.scala index e63e1d67b..a92d4859c 100644 --- a/dataset/src/test/scala/frameless/functions/UdfTests.scala +++ b/dataset/src/test/scala/frameless/functions/UdfTests.scala @@ -11,7 +11,7 @@ class UdfTests extends TypedDatasetSuite { val dataset: TypedDataset[X1[A]] = TypedDataset.create(data) val u1 = udf[X1[A], A, B](f1) val u2 = dataset.makeUDF(f1) - val A = dataset.col[A]('a) + val A = dataset.col('a) // filter forces whole codegen val codegen = dataset.deserialized.filter((_:X1[A]) => true).select(u1(A)).collect().run().toVector @@ -51,9 +51,9 @@ class UdfTests extends TypedDatasetSuite { val u12 = dataset.makeUDF(f1) val u22 = dataset.makeUDF(f2) val u32 = dataset.makeUDF(f3) - val A = dataset.col[A]('a) - val B = dataset.col[B]('b) - val C = dataset.col[C]('c) + val A = dataset.col('a) + val B = dataset.col('b) + val C = dataset.col('c) val dataset21 = dataset.select(u11(A), u21(B), u31(C)).collect().run().toVector val dataset22 = dataset.select(u12(A), u22(B), u32(C)).collect().run().toVector @@ -73,8 +73,8 @@ class UdfTests extends TypedDatasetSuite { val dataset = TypedDataset.create(data) val u1 = udf[X3[A, B, C], A, B, C](f1) val u2 = dataset.makeUDF(f1) - val A = dataset.col[A]('a) - val B = dataset.col[B]('b) + val A = dataset.col('a) + val B = dataset.col('b) val dataset21 = dataset.select(u1(A, B)).collect().run().toVector val dataset22 = dataset.select(u2(A, B)).collect().run().toVector @@ -96,9 +96,9 @@ class UdfTests extends TypedDatasetSuite { val u21 = udf[X3[A, B, C], B, C, A](f2) val u22 = dataset.makeUDF(f2) - val A = dataset.col[A]('a) - val B = dataset.col[B]('b) - val C = dataset.col[C]('c) + val A = dataset.col('a) + val B = dataset.col('b) + val C = dataset.col('c) val dataset21 = dataset.select(u11(A, B), u21(B, C)).collect().run().toVector val dataset22 = dataset.select(u12(A, B), u22(B, C)).collect().run().toVector @@ -118,9 +118,9 @@ class UdfTests extends TypedDatasetSuite { val u1 = udf[X3[A, B, C], A, B, C, C](f) val u2 = dataset.makeUDF(f) - val A = dataset.col[A]('a) - val B = dataset.col[B]('b) - val C = dataset.col[C]('c) + val A = dataset.col('a) + val B = dataset.col('b) + val C = dataset.col('c) val dataset21 = dataset.select(u1(A, B, C)).collect().run().toVector val dataset22 = dataset.select(u2(A, B, C)).collect().run().toVector @@ -140,9 +140,9 @@ class UdfTests extends TypedDatasetSuite { val u1 = udf[X3[A, B, C], A, B, C, A, C](f) val u2 = dataset.makeUDF(f) - val A = dataset.col[A]('a) - val B = dataset.col[B]('b) - val C = dataset.col[C]('c) + val A = dataset.col('a) + val B = dataset.col('b) + val C = dataset.col('c) val dataset21 = dataset.select(u1(A, B, C, A)).collect().run().toVector val dataset22 = dataset.select(u2(A, B, C, A)).collect().run().toVector @@ -162,11 +162,11 @@ class UdfTests extends TypedDatasetSuite { val u1 = udf[X5[A, B, C, D, E], A, B, C, D, E, C](f) val u2 = dataset.makeUDF(f) - val A = dataset.col[A]('a) - val B = dataset.col[B]('b) - val C = dataset.col[C]('c) - val D = dataset.col[D]('d) - val E = dataset.col[E]('e) + val A = dataset.col('a) + val B = dataset.col('b) + val C = dataset.col('c) + val D = dataset.col('d) + val E = dataset.col('e) val dataset21 = dataset.select(u1(A, B, C, D, E)).collect().run().toVector val dataset22 = dataset.select(u2(A, B, C, D, E)).collect().run().toVector diff --git a/docs/src/main/tut/FeatureOverview.md b/docs/src/main/tut/FeatureOverview.md index c829e320c..3ee023c74 100644 --- a/docs/src/main/tut/FeatureOverview.md +++ b/docs/src/main/tut/FeatureOverview.md @@ -84,7 +84,7 @@ aptDs.select('citi) aptTypedDs.select(aptTypedDs('surface) * 10, aptTypedDs('surface) + 2).show().run() ``` -Note that unlike the standard Spark API where some operations are lazy and some are not, **TypedDatasets have all operations to be lazy.** +Note that unlike the standard Spark API where some operations are lazy and some are not, **TypedDatasets have all operations to be lazy.** In the above example, `show()` is lazy. It requires to apply `run()` for the `show` job to materialize. A more detailed explanation of `Job` is given [here](Job.md). @@ -94,9 +94,9 @@ Next we compute the price by surface unit: val priceBySurfaceUnit = aptTypedDs.select(aptTypedDs('price) / aptTypedDs('surface)) ``` -As the error suggests, we can't divide a `TypedColumn` of `Double` by `Int.` -For safety, in Frameless only math operations between same types is allowed. -There are two ways to proceed here: +As the error suggests, we can't divide a `TypedColumn` of `Double` by `Int.` +For safety, in Frameless only math operations between same types is allowed. +There are two ways to proceed here: (a) Explicitly cast `Int` to `Double` (manual) @@ -121,20 +121,20 @@ Let's try to cast a `TypedColumn` of `String` to `Double`: aptTypedDs('city).cast[Double] ``` -The compile-time error tells us that to perform the cast, an evidence -(in the form of `CatalystCast[String, Double]`) must be available. -Since casting from `String` to `Double` is not allowed, this results -in a compilation error. +The compile-time error tells us that to perform the cast, an evidence +(in the form of `CatalystCast[String, Double]`) must be available. +Since casting from `String` to `Double` is not allowed, this results +in a compilation error. -Check [here](https://github.com/typelevel/frameless/blob/master/core/src/main/scala/frameless/CatalystCast.scala) +Check [here](https://github.com/typelevel/frameless/blob/master/core/src/main/scala/frameless/CatalystCast.scala) for the set of available `CatalystCast.` ## TypeSafe TypedDataset casting and projections With `select()` the resulting TypedDataset is of type `TypedDataset[TupleN[...]]` (with N in `[1...10]`). For example, if we select three columns with types `String`, `Int`, and `Boolean` the result will have type -`TypedDataset[(String, Int, Boolean)]`. To select more than ten columns use the `selectMany()` method. -Select has better IDE support than the macro based selectMany, so prefer `select()` for the general case. +`TypedDataset[(String, Int, Boolean)]`. To select more than ten columns use the `selectMany()` method. +Select has better IDE support than the macro based selectMany, so prefer `select()` for the general case. We often want to give more expressive types to the result of our computations. `as[T]` allows us to safely cast a `TypedDataset[U]` to another of type `TypedDataset[T]` as long @@ -259,23 +259,23 @@ cityPriceRatio.groupBy(cityPriceRatio('_1)).agg(avg(cityPriceRatio('_2))).show() ### Entire TypedDataset Aggregation We often want to aggregate the entire `TypedDataset` and skip the `groupBy()` clause. -In `Frameless` you can do this using the `agg()` operator directly on the `TypedDataset`. -In the following example, we compute the average price, the average surface, -the minimum surface, and the set of cities for the entire dataset. +In `Frameless` you can do this using the `agg()` operator directly on the `TypedDataset`. +In the following example, we compute the average price, the average surface, +the minimum surface, and the set of cities for the entire dataset. ```tut:book case class Stats( - avgPrice: Double, - avgSurface: Double, - minSurface: Int, + avgPrice: Double, + avgSurface: Double, + minSurface: Int, allCities: Vector[String]) - + aptds.agg( - avg(aptds('price)), + avg(aptds('price)), avg(aptds('surface)), min(aptds('surface)), collectSet(aptds('city)) -).as[Stats].show().run() +).as[Stats].show().run() ``` ## Joins @@ -295,7 +295,7 @@ val citiInfoTypedDS = TypedDataset.create(cityInfo) Here is how to join the population information to the apartment's dataset. ```tut:book -val withCityInfo = aptTypedDs.join(citiInfoTypedDS, aptTypedDs('city), citiInfoTypedDS('name)) +val withCityInfo = aptTypedDs.joinInner(citiInfoTypedDS)(implicit a => aptTypedDs('city) === citiInfoTypedDS('name)) withCityInfo.show().run() ```