Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1070,6 +1070,35 @@ class Dataset[T] private[sql] (val sparkSession: SparkSession, private[sql] val
proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY)
}

/**
* Groups the Dataset using the specified columns, so that we can run aggregation on them. See
* [[RelationalGroupedDataset]] for all the available aggregate functions.
*
* This is a variant of groupBy that can only group by existing columns using column names (i.e.
* cannot construct expressions).
*
* {{{
* // Compute the average for all numeric columns grouped by department.
* ds.groupBy("department").avg()
*
* // Compute the max age and average salary, grouped by department and gender.
* ds.groupBy($"department", $"gender").agg(Map(
* "salary" -> "avg",
* "age" -> "max"
* ))
* }}}
* @group untypedrel
* @since 3.4.0
*/
@scala.annotation.varargs
def groupBy(col1: String, cols: String*): RelationalGroupedDataset = {
val colNames: Seq[String] = col1 +: cols
new RelationalGroupedDataset(
toDF(),
colNames.map(colName => Column(colName).expr),
proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY)
}

/**
* Create a multi-dimensional rollup for the current Dataset using the specified columns, so we
* can run aggregation on them. See [[RelationalGroupedDataset]] for all the available aggregate
Expand Down Expand Up @@ -1990,14 +2019,14 @@ class Dataset[T] private[sql] (val sparkSession: SparkSession, private[sql] val
viewName: String,
replace: Boolean,
global: Boolean): Unit = {
val command = session.newCommand { builder =>
val command = sparkSession.newCommand { builder =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, I guess I broke the build...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we use this PR to fix the build, depending on you.

builder.getCreateDataframeViewBuilder
.setInput(plan.getRoot)
.setName(viewName)
.setIsGlobal(global)
.setReplace(replace)
}
session.execute(command)
sparkSession.execute(command)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1892,6 +1892,12 @@ class PlanGenerationTestSuite
"a" -> "count")
}

test("groupby agg string") {
simple
.groupBy("id", "b")
.agg("a" -> "max", "a" -> "count")
}

test("groupby agg columns") {
simple
.groupBy(Column("id"))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Aggregate [id#0L, b#0], [id#0L, b#0, max(a#0) AS max(a)#0, count(a#0) AS count(a)#0L]
+- LocalRelation <empty>, [id#0L, a#0, b#0]
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
{
"common": {
"planId": "1"
},
"aggregate": {
"input": {
"common": {
"planId": "0"
},
"localRelation": {
"schema": "struct\u003cid:bigint,a:int,b:double\u003e"
}
},
"groupType": "GROUP_TYPE_GROUPBY",
"groupingExpressions": [{
"unresolvedAttribute": {
"unparsedIdentifier": "id"
}
}, {
"unresolvedAttribute": {
"unparsedIdentifier": "b"
}
}],
"aggregateExpressions": [{
"unresolvedFunction": {
"functionName": "max",
"arguments": [{
"unresolvedAttribute": {
"unparsedIdentifier": "a",
"planId": "0"
}
}]
}
}, {
"unresolvedFunction": {
"functionName": "count",
"arguments": [{
"unresolvedAttribute": {
"unparsedIdentifier": "a",
"planId": "0"
}
}]
}
}]
}
}
Binary file not shown.