Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
d0e2f99
date functions
tarekbecker Jun 24, 2015
5ebb235
resolved naming conflict
tarekbecker Jun 24, 2015
4d8049b
fixed tests and added type check
tarekbecker Jun 24, 2015
638596f
improved codegen
tarekbecker Jun 24, 2015
849fb41
fixed stupid test
tarekbecker Jun 24, 2015
c739788
added support for quarter SPARK-8178
tarekbecker Jun 24, 2015
b680db6
added codegeneration to all functions
tarekbecker Jun 24, 2015
a5ea120
added python api; changed test to be more meaningful
tarekbecker Jun 24, 2015
02efc5d
removed doubled code
tarekbecker Jun 26, 2015
356df78
rely on cast mechanism of Spark. Simplified implementation
tarekbecker Jun 29, 2015
3bfac90
fixed style
tarekbecker Jun 29, 2015
5fe74e1
fixed python style
tarekbecker Jun 29, 2015
a8edebd
use Calendar instead of SimpleDateFormat
tarekbecker Jun 29, 2015
f120415
improved runtime
tarekbecker Jun 30, 2015
eb6760d
Merge branch 'master' into SPARK-8199
tarekbecker Jul 4, 2015
5a105d9
[SPARK-8199] rebase after #6985 got merged
tarekbecker Jul 4, 2015
7bc9d93
Merge branch 'master' into SPARK-8199
tarekbecker Jul 9, 2015
d9f8ac3
[SPARK-8199] implement fast track
tarekbecker Jul 9, 2015
6f5d95c
[SPARK-8199] fixed year interval
tarekbecker Jul 9, 2015
f3e7a9f
[SPARK-8199] revert change in DataFrameFunctionsSuite
tarekbecker Jul 9, 2015
7d9f0eb
[SPARK-8199] git renaming issue
tarekbecker Jul 9, 2015
10e4ad1
Merge branch 'master' into date-functions-fast
tarekbecker Jul 9, 2015
ccb723c
[SPARK-8199] style and fixed merge issues
tarekbecker Jul 9, 2015
c42b444
Removed merge conflict file
tarekbecker Jul 9, 2015
ad17e96
improved implementation
tarekbecker Jul 10, 2015
f775f39
fixed return type
tarekbecker Jul 10, 2015
1a436c9
wip
tarekbecker Jul 13, 2015
4fb66da
WIP: date functions on calculation only
tarekbecker Jul 13, 2015
740af0e
implement date function using a calculation based on days
tarekbecker Jul 13, 2015
1358cdc
Merge remote-tracking branch 'origin/master' into SPARK-8199
tarekbecker Jul 16, 2015
ec87c69
[SPARK-8119] bug fixing and refactoring
tarekbecker Jul 16, 2015
0852655
[SPARK-8119] changed from ExpectsInputTypes to implicit casts
tarekbecker Jul 16, 2015
1b2e540
[SPARK-8119] style fix
tarekbecker Jul 16, 2015
b382267
[SPARK-8199] fixed bug in day calculation; removed set TimeZone in Hi…
tarekbecker Jul 17, 2015
d6aa14e
[SPARK-8199] fixed Hive compatibility
tarekbecker Jul 17, 2015
e223bc0
[SPARK-8199] refactoring
tarekbecker Jul 17, 2015
56c4a92
[SPARK-8199] update python docu
tarekbecker Jul 17, 2015
d01b977
[SPARK-8199] python underscore
tarekbecker Jul 17, 2015
2259299
[SPARK-8199] day_of_month alias
tarekbecker Jul 17, 2015
523542d
[SPARK-8199] address comments
tarekbecker Jul 17, 2015
0ad6db8
[SPARK-8199] minor fix
tarekbecker Jul 17, 2015
746b80a
[SPARK-8199] build fix
tarekbecker Jul 17, 2015
cdfae27
[SPARK-8199] cleanup & python docstring fix
tarekbecker Jul 17, 2015
fb98ba0
[SPARK-8199] python docstring fix
tarekbecker Jul 17, 2015
3c6ae2e
[SPARK-8199] removed binary search
tarekbecker Jul 18, 2015
70238e0
Merge branch 'master' into SPARK-8199
tarekbecker Jul 18, 2015
ea6c110
[SPARK-8199] fix after merging master
tarekbecker Jul 18, 2015
4afc09c
[SPARK-8199] concise leap year handling
tarekbecker Jul 18, 2015
6e0c78f
[SPARK-8199] removed setTimeZone in tests, according to cloud-fans co…
tarekbecker Jul 18, 2015
5983dcc
[SPARK-8199] whitespace fix
tarekbecker Jul 18, 2015
256c357
[SPARK-8199] code cleanup
tarekbecker Jul 18, 2015
3e095ba
[SPARK-8199] style and timezone fix
tarekbecker Jul 18, 2015
bb567b6
[SPARK-8199] fixed test
tarekbecker Jul 18, 2015
f7b4c8c
[SPARK-8199] fixed bug in tests
tarekbecker Jul 19, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 150 additions & 0 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,156 @@ def ntile(n):
return Column(sc._jvm.functions.ntile(int(n)))


@ignore_unicode_prefix
@since(1.5)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ignore_unicode_prefix

def date_format(dateCol, format):
"""
Converts a date/timestamp/string to a value of string in the format specified by the date
format given by the second argument.

A pattern could be for instance `dd.MM.yyyy` and could return a string like '18.03.1993'. All
pattern letters of the Java class `java.text.SimpleDateFormat` can be used.

NOTE: Use when ever possible specialized functions like `year`. These benefit from a
specialized implementation.

>>> df = sqlContext.createDataFrame([('2015-04-08',)], ['a'])
>>> df.select(date_format('a', 'MM/dd/yyy').alias('date')).collect()
[Row(date=u'04/08/2015')]
"""
sc = SparkContext._active_spark_context
return Column(sc._jvm.functions.date_format(dateCol, format))


@since(1.5)
def year(col):
"""
Extract the year of a given date as integer.

>>> df = sqlContext.createDataFrame([('2015-04-08',)], ['a'])
>>> df.select(year('a').alias('year')).collect()
[Row(year=2015)]
"""
sc = SparkContext._active_spark_context
return Column(sc._jvm.functions.year(col))


@since(1.5)
def quarter(col):
"""
Extract the quarter of a given date as integer.

>>> df = sqlContext.createDataFrame([('2015-04-08',)], ['a'])
>>> df.select(quarter('a').alias('quarter')).collect()
[Row(quarter=2)]
"""
sc = SparkContext._active_spark_context
return Column(sc._jvm.functions.quarter(col))


@since(1.5)
def month(col):
"""
Extract the month of a given date as integer.

>>> df = sqlContext.createDataFrame([('2015-04-08',)], ['a'])
>>> df.select(month('a').alias('month')).collect()
[Row(month=4)]
"""
sc = SparkContext._active_spark_context
return Column(sc._jvm.functions.month(col))


@since(1.5)
def day(col):
"""
Extract the day of the month of a given date as integer.

>>> sqlContext.createDataFrame([('2015-04-08',)], ['a']).select(day('a').alias('day')).collect()
[Row(day=8)]
"""
sc = SparkContext._active_spark_context
return Column(sc._jvm.functions.day(col))


@since(1.5)
def day_of_month(col):
"""
Extract the day of the month of a given date as integer.

>>> df = sqlContext.createDataFrame([('2015-04-08',)], ['a'])
>>> df.select(day_of_month('a').alias('day')).collect()
[Row(day=8)]
"""
sc = SparkContext._active_spark_context
return Column(sc._jvm.functions.day_of_month(col))


@since(1.5)
def day_in_year(col):
"""
Extract the day of the year of a given date as integer.

>>> df = sqlContext.createDataFrame([('2015-04-08',)], ['a'])
>>> df.select(day_in_year('a').alias('day')).collect()
[Row(day=98)]
"""
sc = SparkContext._active_spark_context
return Column(sc._jvm.functions.day_in_year(col))


@since(1.5)
def hour(col):
"""
Extract the hours of a given date as integer.

>>> df = sqlContext.createDataFrame([('2015-04-08 13:08:15',)], ['a'])
>>> df.select(hour('a').alias('hour')).collect()
[Row(hour=13)]
"""
sc = SparkContext._active_spark_context
return Column(sc._jvm.functions.hour(col))


@since(1.5)
def minute(col):
"""
Extract the minutes of a given date as integer.

>>> df = sqlContext.createDataFrame([('2015-04-08 13:08:15',)], ['a'])
>>> df.select(minute('a').alias('minute')).collect()
[Row(minute=8)]
"""
sc = SparkContext._active_spark_context
return Column(sc._jvm.functions.minute(col))


@since(1.5)
def second(col):
"""
Extract the seconds of a given date as integer.

>>> df = sqlContext.createDataFrame([('2015-04-08 13:08:15',)], ['a'])
>>> df.select(second('a').alias('second')).collect()
[Row(second=15)]
"""
sc = SparkContext._active_spark_context
return Column(sc._jvm.functions.second(col))


@since(1.5)
def week_of_year(col):
"""
Extract the week number of a given date as integer.

>>> df = sqlContext.createDataFrame([('2015-04-08',)], ['a'])
>>> df.select(week_of_year('a').alias('week')).collect()
[Row(week=15)]
"""
sc = SparkContext._active_spark_context
return Column(sc._jvm.functions.week_of_year(col))


class UserDefinedFunction(object):
"""
User defined function in Python
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,19 @@ object FunctionRegistry {

// datetime functions
expression[CurrentDate]("current_date"),
expression[CurrentTimestamp]("current_timestamp")
expression[CurrentTimestamp]("current_timestamp"),
expression[DateFormatClass]("date_format"),
expression[Day]("day"),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rxin In Jira you mentioned there should be an alias. Can I just add expression[Day]("day_of_month")?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw please sort the expressions alphabetically

expression[DayInYear]("day_in_year"),
expression[Day]("day_of_month"),
expression[Hour]("hour"),
expression[Month]("month"),
expression[Minute]("minute"),
expression[Quarter]("quarter"),
expression[Second]("second"),
expression[WeekOfYear]("week_of_year"),
expression[Year]("year")

)

val builtin: FunctionRegistry = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,15 @@

package org.apache.spark.sql.catalyst.expressions

import java.sql.Date
import java.text.SimpleDateFormat
import java.util.{Calendar, TimeZone}

import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

/**
* Returns the current date at the start of query evaluation.
Expand Down Expand Up @@ -54,3 +60,203 @@ case class CurrentTimestamp() extends LeafExpression {
System.currentTimeMillis() * 1000L
}
}

case class Hour(child: Expression) extends UnaryExpression with ImplicitCastInputTypes {

override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType)

override def dataType: DataType = IntegerType

override protected def nullSafeEval(timestamp: Any): Any = {
DateTimeUtils.getHours(timestamp.asInstanceOf[Long])
}

override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
defineCodeGen(ctx, ev, (c) =>
s"""$dtu.getHours($c)"""
)
}
}

case class Minute(child: Expression) extends UnaryExpression with ImplicitCastInputTypes {

override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType)

override def dataType: DataType = IntegerType

override protected def nullSafeEval(timestamp: Any): Any = {
DateTimeUtils.getMinutes(timestamp.asInstanceOf[Long])
}

override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
defineCodeGen(ctx, ev, (c) =>
s"""$dtu.getMinutes($c)"""
)
}
}

case class Second(child: Expression) extends UnaryExpression with ImplicitCastInputTypes {

override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType)

override def dataType: DataType = IntegerType

override protected def nullSafeEval(timestamp: Any): Any = {
DateTimeUtils.getSeconds(timestamp.asInstanceOf[Long])
}

override protected def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
defineCodeGen(ctx, ev, (c) =>
s"""$dtu.getSeconds($c)"""
)
}
}

case class DayInYear(child: Expression) extends UnaryExpression with ImplicitCastInputTypes {

override def inputTypes: Seq[AbstractDataType] = Seq(DateType)

override def dataType: DataType = IntegerType

override def prettyName: String = "day_in_year"

override protected def nullSafeEval(date: Any): Any = {
DateTimeUtils.getDayInYear(date.asInstanceOf[Int])
}

override protected def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
defineCodeGen(ctx, ev, (c) =>
s"""$dtu.getDayInYear($c)"""
)
}
}


case class Year(child: Expression) extends UnaryExpression with ImplicitCastInputTypes {

override def inputTypes: Seq[AbstractDataType] = Seq(DateType)

override def dataType: DataType = IntegerType

override protected def nullSafeEval(date: Any): Any = {
DateTimeUtils.getYear(date.asInstanceOf[Int])
}

override protected def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
defineCodeGen(ctx, ev, (c) =>
s"""$dtu.getYear($c)"""
)
}
}

case class Quarter(child: Expression) extends UnaryExpression with ImplicitCastInputTypes {

override def inputTypes: Seq[AbstractDataType] = Seq(DateType)

override def dataType: DataType = IntegerType

override protected def nullSafeEval(date: Any): Any = {
DateTimeUtils.getQuarter(date.asInstanceOf[Int])
}

override protected def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
defineCodeGen(ctx, ev, (c) =>
s"""$dtu.getQuarter($c)"""
)
}
}

case class Month(child: Expression) extends UnaryExpression with ImplicitCastInputTypes {

override def inputTypes: Seq[AbstractDataType] = Seq(DateType)

override def dataType: DataType = IntegerType

override protected def nullSafeEval(date: Any): Any = {
DateTimeUtils.getMonth(date.asInstanceOf[Int])
}

override protected def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
defineCodeGen(ctx, ev, (c) =>
s"""$dtu.getMonth($c)"""
)
}
}

case class Day(child: Expression) extends UnaryExpression with ImplicitCastInputTypes {

override def inputTypes: Seq[AbstractDataType] = Seq(DateType)

override def dataType: DataType = IntegerType

override protected def nullSafeEval(date: Any): Any = {
DateTimeUtils.getDayOfMonth(date.asInstanceOf[Int])
}

override protected def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
defineCodeGen(ctx, ev, (c) =>
s"""$dtu.getDayOfMonth($c)"""
)
}
}

case class WeekOfYear(child: Expression) extends UnaryExpression with ImplicitCastInputTypes {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sure you override prettyName here too


override def inputTypes: Seq[AbstractDataType] = Seq(DateType)

override def dataType: DataType = IntegerType

override def prettyName: String = "week_of_year"

override protected def nullSafeEval(date: Any): Any = {
val c = Calendar.getInstance(TimeZone.getTimeZone("UTC"))
c.setFirstDayOfWeek(Calendar.MONDAY)
c.setMinimalDaysInFirstWeek(4)
c.setTimeInMillis(date.asInstanceOf[Int] * 1000L * 3600L * 24L)
c.get(Calendar.WEEK_OF_YEAR)
}

override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String =
nullSafeCodeGen(ctx, ev, (time) => {
val cal = classOf[Calendar].getName
val c = ctx.freshName("cal")
s"""
$cal $c = $cal.getInstance(java.util.TimeZone.getTimeZone("UTC"));
$c.setFirstDayOfWeek($cal.MONDAY);
$c.setMinimalDaysInFirstWeek(4);
$c.setTimeInMillis($time * 1000L * 3600L * 24L);
${ev.primitive} = $c.get($cal.WEEK_OF_YEAR);
"""
})
}

case class DateFormatClass(left: Expression, right: Expression) extends BinaryExpression
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just DateFormat?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sure you override prettyName and use "date_format"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Name has to be DateFormatClass else it is not possible to build. There was somehow a naming conflict.

with ImplicitCastInputTypes {

override def dataType: DataType = StringType

override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType, StringType)

override def prettyName: String = "date_format"

override protected def nullSafeEval(timestamp: Any, format: Any): Any = {
val sdf = new SimpleDateFormat(format.toString)
UTF8String.fromString(sdf.format(new Date(timestamp.asInstanceOf[Long] / 1000)))
}

override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
val sdf = classOf[SimpleDateFormat].getName
defineCodeGen(ctx, ev, (timestamp, format) => {
s"""UTF8String.fromString((new $sdf($format.toString()))
.format(new java.sql.Date($timestamp / 1000)))"""
})
}
}
Loading