Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ object MimaExcludes {
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLImplicits.longRddToDataFrameHolder"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLImplicits.intRddToDataFrameHolder"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.GroupedDataset"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.Dataset.subtract"),

ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.mllib.evaluation.MultilabelMetrics.this"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.classification.LogisticRegressionSummary.predictions"),
Expand Down
2 changes: 2 additions & 0 deletions python/pyspark/sql/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,8 @@ def alias(self, *alias):
sc = SparkContext._active_spark_context
return Column(getattr(self._jc, "as")(_to_seq(sc, list(alias))))

name = copy_func(alias, sinceversion=2.0, doc=":func:`name` is an alias for :func:`alias`.")

@ignore_unicode_prefix
@since(1.3)
def cast(self, dataType):
Expand Down
14 changes: 12 additions & 2 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -911,14 +911,24 @@ def agg(self, *exprs):
"""
return self.groupBy().agg(*exprs)

@since(2.0)
def union(self, other):
""" Return a new :class:`DataFrame` containing union of rows in this
frame and another frame.

This is equivalent to `UNION ALL` in SQL. To do a SQL-style set union
(that does deduplication of elements), use this function followed by a distinct.
"""
return DataFrame(self._jdf.unionAll(other._jdf), self.sql_ctx)

@since(1.3)
def unionAll(self, other):
""" Return a new :class:`DataFrame` containing union of rows in this
frame and another frame.

This is equivalent to `UNION ALL` in SQL.
.. note:: Deprecated in 2.0, use union instead.
"""
return DataFrame(self._jdf.unionAll(other._jdf), self.sql_ctx)
return self.union(other)

@since(1.3)
def intersect(self, other):
Expand Down
29 changes: 22 additions & 7 deletions sql/core/src/main/scala/org/apache/spark/sql/Column.scala
Original file line number Diff line number Diff line change
Expand Up @@ -856,7 +856,7 @@ class Column(protected[sql] val expr: Expression) extends Logging {
* @group expr_ops
* @since 1.4.0
*/
def alias(alias: String): Column = as(alias)
def alias(alias: String): Column = name(alias)

/**
* Gives the column an alias.
Expand All @@ -871,12 +871,7 @@ class Column(protected[sql] val expr: Expression) extends Logging {
* @group expr_ops
* @since 1.3.0
*/
def as(alias: String): Column = withExpr {
expr match {
case ne: NamedExpression => Alias(expr, alias)(explicitMetadata = Some(ne.metadata))
case other => Alias(other, alias)()
}
}
def as(alias: String): Column = name(alias)

/**
* (Scala-specific) Assigns the given aliases to the results of a table generating function.
Expand Down Expand Up @@ -936,6 +931,26 @@ class Column(protected[sql] val expr: Expression) extends Logging {
Alias(expr, alias)(explicitMetadata = Some(metadata))
}

/**
* Gives the column a name (alias).
* {{{
* // Renames colA to colB in select output.
* df.select($"colA".name("colB"))
* }}}
*
* If the current column has metadata associated with it, this metadata will be propagated
* to the new column. If this not desired, use `as` with explicitly empty metadata.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any way to avoid duplicating documents at scala side?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no unfortunately...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, actually there is. You can use @define <name> <definition> and then reference it using $name later. See here. Scala standard library uses this trick extensively.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it work with the unidoc for java?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just checked, unfortunately it doesn't work for Java :(

*
* @group expr_ops
* @since 2.0.0
*/
def name(alias: String): Column = withExpr {
expr match {
case ne: NamedExpression => Alias(expr, alias)(explicitMetadata = Some(ne.metadata))
case other => Alias(other, alias)()
}
}

/**
* Casts the column to a different data type.
* {{{
Expand Down
30 changes: 11 additions & 19 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1350,20 +1350,24 @@ class Dataset[T] private[sql](
* @group typedrel
* @since 2.0.0
*/
def unionAll(other: Dataset[T]): Dataset[T] = withTypedPlan {
// This breaks caching, but it's usually ok because it addresses a very specific use case:
// using union to union many files or partitions.
CombineUnions(Union(logicalPlan, other.logicalPlan))
}
@deprecated("use union()", "2.0.0")
def unionAll(other: Dataset[T]): Dataset[T] = union(other)

/**
* Returns a new [[Dataset]] containing union of rows in this Dataset and another Dataset.
* This is equivalent to `UNION ALL` in SQL.
*
* To do a SQL-style set union (that does deduplication of elements), use this function followed
* by a [[distinct]].
*
* @group typedrel
* @since 2.0.0
*/
def union(other: Dataset[T]): Dataset[T] = unionAll(other)
def union(other: Dataset[T]): Dataset[T] = withTypedPlan {
// This breaks caching, but it's usually ok because it addresses a very specific use case:
// using union to union many files or partitions.
CombineUnions(Union(logicalPlan, other.logicalPlan))
}

/**
* Returns a new [[Dataset]] containing rows only in both this Dataset and another Dataset.
Expand Down Expand Up @@ -1393,18 +1397,6 @@ class Dataset[T] private[sql](
Except(logicalPlan, other.logicalPlan)
}

/**
* Returns a new [[Dataset]] containing rows in this Dataset but not in another Dataset.
* This is equivalent to `EXCEPT` in SQL.
*
* Note that, equality checking is performed directly on the encoded representation of the data
* and thus is not affected by a custom `equals` function defined on `T`.
*
* @group typedrel
* @since 2.0.0
*/
def subtract(other: Dataset[T]): Dataset[T] = except(other)

/**
* Returns a new [[Dataset]] by sampling a fraction of rows.
*
Expand Down Expand Up @@ -1756,7 +1748,7 @@ class Dataset[T] private[sql](
outputCols.map(c => Column(Cast(colToAgg(Column(c).expr), StringType)).as(c))
}

val row = agg(aggExprs.head, aggExprs.tail: _*).head().toSeq
val row = groupBy().agg(aggExprs.head, aggExprs.tail: _*).head().toSeq

// Pivot the data so each summary is one row
row.grouped(outputCols.size).toSeq.zip(statistics).map { case (aggregation, (statistic, _)) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
*
* @since 1.6.0
*/
def reduce(f: (V, V) => V): Dataset[(K, V)] = {
def reduceGroups(f: (V, V) => V): Dataset[(K, V)] = {
val func = (key: K, it: Iterator[V]) => Iterator((key, it.reduce(f)))

implicit val resultEncoder = ExpressionEncoder.tuple(unresolvedKEncoder, unresolvedVEncoder)
Expand All @@ -203,15 +203,10 @@ class KeyValueGroupedDataset[K, V] private[sql](
*
* @since 1.6.0
*/
def reduce(f: ReduceFunction[V]): Dataset[(K, V)] = {
reduce(f.call _)
def reduceGroups(f: ReduceFunction[V]): Dataset[(K, V)] = {
reduceGroups(f.call _)
}

// This is here to prevent us from adding overloads that would be ambiguous.
@scala.annotation.varargs
private def agg(exprs: Column*): DataFrame =
groupedData.agg(withEncoder(exprs.head), exprs.tail.map(withEncoder): _*)

private def withEncoder(c: Column): Column = c match {
case tc: TypedColumn[_, _] =>
tc.withInputType(resolvedVEncoder.bind(dataAttributes), dataAttributes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public Iterator<String> call(Integer key, Iterator<String> values) {

Assert.assertEquals(asSet("1a", "3foobar"), toSet(flatMapped.collectAsList()));

Dataset<Tuple2<Integer, String>> reduced = grouped.reduce(new ReduceFunction<String>() {
Dataset<Tuple2<Integer, String>> reduced = grouped.reduceGroups(new ReduceFunction<String>() {
@Override
public String call(String v1, String v2) throws Exception {
return v1 + v2;
Expand Down Expand Up @@ -300,7 +300,7 @@ public void testSetOperation() {
Arrays.asList("abc", "abc", "xyz", "xyz", "foo", "foo", "abc", "abc", "xyz"),
unioned.collectAsList());

Dataset<String> subtracted = ds.subtract(ds2);
Dataset<String> subtracted = ds.except(ds2);
Assert.assertEquals(Arrays.asList("abc", "abc"), subtracted.collectAsList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,11 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext {
Row("a") :: Nil)
}

test("alias") {
test("alias and name") {
val df = Seq((1, Seq(1, 2, 3))).toDF("a", "intList")
assert(df.select(df("a").as("b")).columns.head === "b")
assert(df.select(df("a").alias("b")).columns.head === "b")
assert(df.select(df("a").name("b")).columns.head === "b")
}

test("as propagates metadata") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext {

test("groupBy function, reduce") {
val ds = Seq("abc", "xyz", "hello").toDS()
val agged = ds.groupByKey(_.length).reduce(_ + _)
val agged = ds.groupByKey(_.length).reduceGroups(_ + _)

checkDataset(
agged,
Expand Down