Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.connect.common.DataTypeProtoConverter
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.types.{DataType, Metadata}
import org.apache.spark.sql.types._

/**
* A column that will be computed based on the data in a `DataFrame`.
Expand All @@ -51,6 +51,12 @@ import org.apache.spark.sql.types.{DataType, Metadata}
*/
class Column private[sql] (private[sql] val expr: proto.Expression) extends Logging {

private[sql] def this(name: String, planId: Option[Long]) =
this(Column.nameToExpression(name, planId))

private[sql] def this(name: String) =
this(name, None)

private def fn(name: String): Column = Column.fn(name, this)
private def fn(name: String, other: Column): Column = Column.fn(name, this, other)
private def fn(name: String, other: Any): Column = Column.fn(name, this, lit(other))
Expand Down Expand Up @@ -1270,9 +1276,12 @@ class Column private[sql] (private[sql] val expr: proto.Expression) extends Logg

private[sql] object Column {

def apply(name: String): Column = Column(name, None)
def apply(name: String): Column = new Column(name)

def apply(name: String, planId: Option[Long]): Column = new Column(name, planId)

def apply(name: String, planId: Option[Long]): Column = Column { builder =>
def nameToExpression(name: String, planId: Option[Long] = None): proto.Expression = {
val builder = proto.Expression.newBuilder()
name match {
case "*" =>
builder.getUnresolvedStarBuilder
Expand All @@ -1282,6 +1291,7 @@ private[sql] object Column {
val attributeBuilder = builder.getUnresolvedAttributeBuilder.setUnparsedIdentifier(name)
planId.foreach(attributeBuilder.setPlanId)
}
builder.build()
}

private[sql] def apply(f: proto.Expression.Builder => Unit): Column = {
Expand All @@ -1302,3 +1312,121 @@ private[sql] object Column {
.addAllArguments(inputs.map(_.expr).asJava)
}
}

/**
* A convenient class used for constructing schema.
*
* @since 3.4.0
*/
class ColumnName(name: String) extends Column(name) {

/**
* Creates a new `StructField` of type boolean.
* @since 3.4.0
*/
def boolean: StructField = StructField(name, BooleanType)

/**
* Creates a new `StructField` of type byte.
* @since 3.4.0
*/
def byte: StructField = StructField(name, ByteType)

/**
* Creates a new `StructField` of type short.
* @since 3.4.0
*/
def short: StructField = StructField(name, ShortType)

/**
* Creates a new `StructField` of type int.
* @since 3.4.0
*/
def int: StructField = StructField(name, IntegerType)

/**
* Creates a new `StructField` of type long.
* @since 3.4.0
*/
def long: StructField = StructField(name, LongType)

/**
* Creates a new `StructField` of type float.
* @since 3.4.0
*/
def float: StructField = StructField(name, FloatType)

/**
* Creates a new `StructField` of type double.
* @since 3.4.0
*/
def double: StructField = StructField(name, DoubleType)

/**
* Creates a new `StructField` of type string.
* @since 3.4.0
*/
def string: StructField = StructField(name, StringType)

/**
* Creates a new `StructField` of type date.
* @since 3.4.0
*/
def date: StructField = StructField(name, DateType)

/**
* Creates a new `StructField` of type decimal.
* @since 3.4.0
*/
def decimal: StructField = StructField(name, DecimalType.USER_DEFAULT)

/**
* Creates a new `StructField` of type decimal.
* @since 3.4.0
*/
def decimal(precision: Int, scale: Int): StructField =
StructField(name, DecimalType(precision, scale))

/**
* Creates a new `StructField` of type timestamp.
* @since 3.4.0
*/
def timestamp: StructField = StructField(name, TimestampType)

/**
* Creates a new `StructField` of type binary.
* @since 3.4.0
*/
def binary: StructField = StructField(name, BinaryType)

/**
* Creates a new `StructField` of type array.
* @since 3.4.0
*/
def array(dataType: DataType): StructField = StructField(name, ArrayType(dataType))

/**
* Creates a new `StructField` of type map.
* @since 3.4.0
*/
def map(keyType: DataType, valueType: DataType): StructField =
map(MapType(keyType, valueType))

/**
* Creates a new `StructField` of type map.
* @since 3.4.0
*/
def map(mapType: MapType): StructField = StructField(name, mapType)

/**
* Creates a new `StructField` of type struct.
* @since 3.4.0
*/
def struct(fields: StructField*): StructField = struct(StructType(fields))

/**
* Creates a new `StructField` of type struct.
* @since 3.4.0
*/
def struct(structType: StructType): StructField = StructField(name, structType)
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.collection.JavaConverters._

import org.apache.spark.sql.{functions => fn}
import org.apache.spark.sql.connect.client.util.ConnectFunSuite
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types._

/**
* Tests for client local Column behavior.
Expand Down Expand Up @@ -176,4 +176,36 @@ class ColumnTestSuite extends ConnectFunSuite {
assert(explain1.contains(fragment))
}
}

private def testColName(dataType: DataType, f: ColumnName => StructField): Unit = {
test("ColumnName " + dataType.catalogString) {
val actual = f(new ColumnName("col"))
val expected = StructField("col", dataType)
assert(actual === expected)
}
}

testColName(BooleanType, _.boolean)
testColName(ByteType, _.byte)
testColName(ShortType, _.short)
testColName(IntegerType, _.int)
testColName(LongType, _.long)
testColName(FloatType, _.float)
testColName(DoubleType, _.double)
testColName(DecimalType.USER_DEFAULT, _.decimal)
testColName(DecimalType(20, 10), _.decimal(20, 10))
testColName(DateType, _.date)
testColName(TimestampType, _.timestamp)
testColName(StringType, _.string)
testColName(BinaryType, _.binary)
testColName(ArrayType(IntegerType), _.array(IntegerType))

private val mapType = MapType(StringType, StringType)
testColName(mapType, _.map(mapType))
testColName(MapType(StringType, IntegerType), _.map(StringType, IntegerType))

private val structType1 = new StructType().add("a", "int").add("b", "string")
private val structType2 = structType1.add("c", "binary")
testColName(structType1, _.struct(structType1))
testColName(structType2, _.struct(structType2.fields: _*))
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class CompatibilitySuite extends ConnectFunSuite {
val allProblems = mima.collectProblems(sqlJar, clientJar, List.empty)
val includedRules = Seq(
IncludeByName("org.apache.spark.sql.Column.*"),
IncludeByName("org.apache.spark.sql.ColumnName.*"),
IncludeByName("org.apache.spark.sql.DataFrame.*"),
IncludeByName("org.apache.spark.sql.DataFrameReader.*"),
IncludeByName("org.apache.spark.sql.DataFrameWriter.*"),
Expand Down Expand Up @@ -155,6 +156,7 @@ class CompatibilitySuite extends ConnectFunSuite {
ProblemFilters.exclude[Problem]("org.apache.spark.sql.functions.typedLit"),

// RelationalGroupedDataset
ProblemFilters.exclude[Problem]("org.apache.spark.sql.RelationalGroupedDataset.apply"),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhenlineo why didn't this fail during CI?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you so much for fixing this, @hvanhovell ! I also spent some time to dig this.

ProblemFilters.exclude[Problem]("org.apache.spark.sql.RelationalGroupedDataset.as"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.RelationalGroupedDataset.pivot"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.RelationalGroupedDataset.this"),
Expand Down