Skip to content
Closed
1 change: 0 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@
<scala.binary.version>2.10</scala.binary.version>
<jline.version>${scala.version}</jline.version>
<jline.groupid>org.scala-lang</jline.groupid>
<jodd.version>3.6.3</jodd.version>
<codehaus.jackson.version>1.9.13</codehaus.jackson.version>
<fasterxml.jackson.version>2.4.4</fasterxml.jackson.version>
<snappy.version>1.1.1.7</snappy.version>
Expand Down
12 changes: 11 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,17 @@ object MimaExcludes {
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.streaming.kafka.KafkaTestUtils.waitUntilLeaderOffset"),
// SQL execution is considered private.
excludePackage("org.apache.spark.sql.execution")
excludePackage("org.apache.spark.sql.execution"),
// NanoTime and CatalystTimestampConverter is only used inside catalyst,
// not needed anymore
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.timestamp.NanoTime"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.timestamp.NanoTime$"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.CatalystTimestampConverter"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.CatalystTimestampConverter$")
)
case v if v.startsWith("1.4") =>
Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ package org.apache.spark.sql.catalyst

import java.lang.{Iterable => JavaIterable}
import java.math.{BigDecimal => JavaBigDecimal}
import java.sql.{Timestamp, Date}
import java.sql.{Date, Timestamp}
import java.util.{Map => JavaMap}
import javax.annotation.Nullable

import scala.collection.mutable.HashMap

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -272,18 +272,18 @@ object CatalystTypeConverters {
}

private object DateConverter extends CatalystTypeConverter[Date, Date, Any] {
override def toCatalystImpl(scalaValue: Date): Int = DateUtils.fromJavaDate(scalaValue)
override def toCatalystImpl(scalaValue: Date): Int = DateTimeUtils.fromJavaDate(scalaValue)
override def toScala(catalystValue: Any): Date =
if (catalystValue == null) null else DateUtils.toJavaDate(catalystValue.asInstanceOf[Int])
if (catalystValue == null) null else DateTimeUtils.toJavaDate(catalystValue.asInstanceOf[Int])
override def toScalaImpl(row: InternalRow, column: Int): Date = toScala(row.getInt(column))
}

private object TimestampConverter extends CatalystTypeConverter[Timestamp, Timestamp, Any] {
override def toCatalystImpl(scalaValue: Timestamp): Long =
DateUtils.fromJavaTimestamp(scalaValue)
DateTimeUtils.fromJavaTimestamp(scalaValue)
override def toScala(catalystValue: Any): Timestamp =
if (catalystValue == null) null
else DateUtils.toJavaTimestamp(catalystValue.asInstanceOf[Long])
else DateTimeUtils.toJavaTimestamp(catalystValue.asInstanceOf[Long])
override def toScalaImpl(row: InternalRow, column: Int): Timestamp =
toScala(row.getLong(column))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import java.text.{DateFormat, SimpleDateFormat}
import org.apache.spark.Logging
import org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -115,9 +115,9 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
// UDFToString
private[this] def castToString(from: DataType): Any => Any = from match {
case BinaryType => buildCast[Array[Byte]](_, UTF8String.fromBytes)
case DateType => buildCast[Int](_, d => UTF8String.fromString(DateUtils.toString(d)))
case DateType => buildCast[Int](_, d => UTF8String.fromString(DateTimeUtils.toString(d)))
case TimestampType => buildCast[Long](_,
t => UTF8String.fromString(timestampToString(DateUtils.toJavaTimestamp(t))))
t => UTF8String.fromString(timestampToString(DateTimeUtils.toJavaTimestamp(t))))
case _ => buildCast[Any](_, o => UTF8String.fromString(o.toString))
}

Expand Down Expand Up @@ -162,7 +162,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
if (periodIdx != -1 && n.length() - periodIdx > 9) {
n = n.substring(0, periodIdx + 10)
}
try DateUtils.fromJavaTimestamp(Timestamp.valueOf(n))
try DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(n))
catch { case _: java.lang.IllegalArgumentException => null }
})
case BooleanType =>
Expand All @@ -176,7 +176,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case ByteType =>
buildCast[Byte](_, b => longToTimestamp(b.toLong))
case DateType =>
buildCast[Int](_, d => DateUtils.toMillisSinceEpoch(d) * 10000)
buildCast[Int](_, d => DateTimeUtils.toMillisSinceEpoch(d) * 10000)
// TimestampWritable.decimalToTimestamp
case DecimalType() =>
buildCast[Decimal](_, d => decimalToTimestamp(d))
Expand Down Expand Up @@ -225,13 +225,13 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
private[this] def castToDate(from: DataType): Any => Any = from match {
case StringType =>
buildCast[UTF8String](_, s =>
try DateUtils.fromJavaDate(Date.valueOf(s.toString))
try DateTimeUtils.fromJavaDate(Date.valueOf(s.toString))
catch { case _: java.lang.IllegalArgumentException => null }
)
case TimestampType =>
// throw valid precision more than seconds, according to Hive.
// Timestamp.nanos is in 0 to 999,999,999, no more than a second.
buildCast[Long](_, t => DateUtils.millisToDays(t / 10000L))
buildCast[Long](_, t => DateTimeUtils.millisToDays(t / 10000L))
// Hive throws this exception as a Semantic Exception
// It is never possible to compare result when hive return with exception,
// so we can return null
Expand Down Expand Up @@ -442,7 +442,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case (DateType, StringType) =>
defineCodeGen(ctx, ev, c =>
s"""${ctx.stringType}.fromString(
org.apache.spark.sql.catalyst.util.DateUtils.toString($c))""")
org.apache.spark.sql.catalyst.util.DateTimeUtils.toString($c))""")
// Special handling required for timestamps in hive test cases since the toString function
// does not match the expected output.
case (TimestampType, StringType) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.sql.{Date, Timestamp}
import org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

Expand All @@ -39,8 +39,8 @@ object Literal {
case d: BigDecimal => Literal(Decimal(d), DecimalType.Unlimited)
case d: java.math.BigDecimal => Literal(Decimal(d), DecimalType.Unlimited)
case d: Decimal => Literal(d, DecimalType.Unlimited)
case t: Timestamp => Literal(DateUtils.fromJavaTimestamp(t), TimestampType)
case d: Date => Literal(DateUtils.fromJavaDate(d), DateType)
case t: Timestamp => Literal(DateTimeUtils.fromJavaTimestamp(t), TimestampType)
case d: Date => Literal(DateTimeUtils.fromJavaDate(d), DateType)
case a: Array[Byte] => Literal(a, BinaryType)
case null => Literal(null, NullType)
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,28 @@

package org.apache.spark.sql.catalyst.util

import java.sql.{Timestamp, Date}
import java.sql.{Date, Timestamp}
import java.text.SimpleDateFormat
import java.util.{Calendar, TimeZone}

import org.apache.spark.sql.catalyst.expressions.Cast

/**
* Helper function to convert between Int value of days since 1970-01-01 and java.sql.Date
* Helper functions for converting between internal and external date and time representations.
* Dates are exposed externally as java.sql.Date and are represented internally as the number of
* dates since the Unix epoch (1970-01-01). Timestamps are exposed externally as java.sql.Timestamp
* and are stored internally as longs, which are capable of storing timestamps with 100 nanosecond
* precision.
*/
object DateUtils {
private val MILLIS_PER_DAY = 86400000
private val HUNDRED_NANOS_PER_SECOND = 10000000L
object DateTimeUtils {
final val MILLIS_PER_DAY = SECONDS_PER_DAY * 1000L

// see http://stackoverflow.com/questions/466321/convert-unix-timestamp-to-julian
final val JULIAN_DAY_OF_EPOCH = 2440587 // and .5
final val SECONDS_PER_DAY = 60 * 60 * 24L
final val HUNDRED_NANOS_PER_SECOND = 1000L * 1000L * 10L
final val NANOS_PER_SECOND = HUNDRED_NANOS_PER_SECOND * 100


// Java TimeZone has no mention of thread safety. Use thread local instance to be safe.
private val LOCAL_TIMEZONE = new ThreadLocal[TimeZone] {
Expand Down Expand Up @@ -117,4 +127,25 @@ object DateUtils {
0L
}
}

/**
* Return the number of 100ns (hundred of nanoseconds) since epoch from Julian day
* and nanoseconds in a day
*/
def fromJulianDay(day: Int, nanoseconds: Long): Long = {
// use Long to avoid rounding errors
val seconds = (day - JULIAN_DAY_OF_EPOCH).toLong * SECONDS_PER_DAY - SECONDS_PER_DAY / 2
seconds * HUNDRED_NANOS_PER_SECOND + nanoseconds / 100L
}

/**
* Return Julian day and nanoseconds in a day from the number of 100ns (hundred of nanoseconds)
*/
def toJulianDay(num100ns: Long): (Int, Long) = {
val seconds = num100ns / HUNDRED_NANOS_PER_SECOND + SECONDS_PER_DAY / 2
val day = seconds / SECONDS_PER_DAY + JULIAN_DAY_OF_EPOCH
val secondsInDay = seconds % SECONDS_PER_DAY
val nanos = (num100ns % HUNDRED_NANOS_PER_SECOND) * 100L
(day.toInt, secondsInDay * NANOS_PER_SECOND + nanos)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.expressions
import java.sql.{Timestamp, Date}

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._

/**
Expand Down Expand Up @@ -156,7 +156,7 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(cast(cast(sd, DateType), StringType), sd)
checkEvaluation(cast(cast(d, StringType), DateType), 0)
checkEvaluation(cast(cast(nts, TimestampType), StringType), nts)
checkEvaluation(cast(cast(ts, StringType), TimestampType), DateUtils.fromJavaTimestamp(ts))
checkEvaluation(cast(cast(ts, StringType), TimestampType), DateTimeUtils.fromJavaTimestamp(ts))

// all convert to string type to check
checkEvaluation(cast(cast(cast(nts, TimestampType), DateType), StringType), sd)
Expand Down Expand Up @@ -301,9 +301,10 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(cast(ts, LongType), 15.toLong)
checkEvaluation(cast(ts, FloatType), 15.002f)
checkEvaluation(cast(ts, DoubleType), 15.002)
checkEvaluation(cast(cast(tss, ShortType), TimestampType), DateUtils.fromJavaTimestamp(ts))
checkEvaluation(cast(cast(tss, IntegerType), TimestampType), DateUtils.fromJavaTimestamp(ts))
checkEvaluation(cast(cast(tss, LongType), TimestampType), DateUtils.fromJavaTimestamp(ts))
checkEvaluation(cast(cast(tss, ShortType), TimestampType), DateTimeUtils.fromJavaTimestamp(ts))
checkEvaluation(cast(cast(tss, IntegerType), TimestampType),
DateTimeUtils.fromJavaTimestamp(ts))
checkEvaluation(cast(cast(tss, LongType), TimestampType), DateTimeUtils.fromJavaTimestamp(ts))
checkEvaluation(
cast(cast(millis.toFloat / 1000, TimestampType), FloatType),
millis.toFloat / 1000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.collection.immutable.HashSet

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types.{IntegerType, BooleanType}


Expand Down Expand Up @@ -167,8 +167,8 @@ class PredicateSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(Literal(true) <=> Literal.create(null, BooleanType), false, row)
checkEvaluation(Literal.create(null, BooleanType) <=> Literal(true), false, row)

val d1 = DateUtils.fromJavaDate(Date.valueOf("1970-01-01"))
val d2 = DateUtils.fromJavaDate(Date.valueOf("1970-01-02"))
val d1 = DateTimeUtils.fromJavaDate(Date.valueOf("1970-01-01"))
val d2 = DateTimeUtils.fromJavaDate(Date.valueOf("1970-01-02"))
checkEvaluation(Literal(d1) < Literal(d2), true)

val ts1 = new Timestamp(12)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.Arrays
import org.scalatest.Matchers

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.PlatformDependent
import org.apache.spark.unsafe.array.ByteArrayMethods
Expand Down Expand Up @@ -83,8 +83,8 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
val row = new SpecificMutableRow(fieldTypes)
row.setLong(0, 0)
row.setString(1, "Hello")
row.update(2, DateUtils.fromJavaDate(Date.valueOf("1970-01-01")))
row.update(3, DateUtils.fromJavaTimestamp(Timestamp.valueOf("2015-05-08 08:10:25")))
row.update(2, DateTimeUtils.fromJavaDate(Date.valueOf("1970-01-01")))
row.update(3, DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2015-05-08 08:10:25")))

val sizeRequired: Int = converter.getSizeRequirement(row)
sizeRequired should be (8 + (8 * 4) +
Expand All @@ -98,9 +98,9 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
unsafeRow.getLong(0) should be (0)
unsafeRow.getString(1) should be ("Hello")
// Date is represented as Int in unsafeRow
DateUtils.toJavaDate(unsafeRow.getInt(2)) should be (Date.valueOf("1970-01-01"))
DateTimeUtils.toJavaDate(unsafeRow.getInt(2)) should be (Date.valueOf("1970-01-01"))
// Timestamp is represented as Long in unsafeRow
DateUtils.toJavaTimestamp(unsafeRow.getLong(3)) should be
DateTimeUtils.toJavaTimestamp(unsafeRow.getLong(3)) should be
(Timestamp.valueOf("2015-05-08 08:10:25"))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,31 @@ import java.sql.Timestamp

import org.apache.spark.SparkFunSuite

class DateUtilsSuite extends SparkFunSuite {
class DateTimeUtilsSuite extends SparkFunSuite {

test("timestamp") {
test("timestamp and 100ns") {
val now = new Timestamp(System.currentTimeMillis())
now.setNanos(100)
val ns = DateUtils.fromJavaTimestamp(now)
assert(ns % 10000000L == 1)
assert(DateUtils.toJavaTimestamp(ns) == now)
val ns = DateTimeUtils.fromJavaTimestamp(now)
assert(ns % 10000000L === 1)
assert(DateTimeUtils.toJavaTimestamp(ns) === now)

List(-111111111111L, -1L, 0, 1L, 111111111111L).foreach { t =>
val ts = DateUtils.toJavaTimestamp(t)
assert(DateUtils.fromJavaTimestamp(ts) == t)
assert(DateUtils.toJavaTimestamp(DateUtils.fromJavaTimestamp(ts)) == ts)
val ts = DateTimeUtils.toJavaTimestamp(t)
assert(DateTimeUtils.fromJavaTimestamp(ts) === t)
assert(DateTimeUtils.toJavaTimestamp(DateTimeUtils.fromJavaTimestamp(ts)) === ts)
}
}

test("100ns and julian day") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any other inputs that are worth testing here? It wouldn't be super hard to fuzz this using the invariant that some of these methods should be inverses.

val (d, ns) = DateTimeUtils.toJulianDay(0)
assert(d === DateTimeUtils.JULIAN_DAY_OF_EPOCH)
assert(ns === DateTimeUtils.SECONDS_PER_DAY / 2 * DateTimeUtils.NANOS_PER_SECOND)
assert(DateTimeUtils.fromJulianDay(d, ns) == 0L)

val t = new Timestamp(61394778610000L) // (2015, 6, 11, 10, 10, 10, 100)
val (d1, ns1) = DateTimeUtils.toJulianDay(DateTimeUtils.fromJavaTimestamp(t))
val t2 = DateTimeUtils.toJavaTimestamp(DateTimeUtils.fromJulianDay(d1, ns1))
assert(t.equals(t2))
}
}
5 changes: 0 additions & 5 deletions sql/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,6 @@
<artifactId>jackson-databind</artifactId>
<version>${fasterxml.jackson.version}</version>
</dependency>
<dependency>
<groupId>org.jodd</groupId>
<artifactId>jodd-core</artifactId>
<version>${jodd.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -148,8 +148,8 @@ object EvaluatePython {

case (ud, udt: UserDefinedType[_]) => toJava(udt.serialize(ud), udt.sqlType)

case (date: Int, DateType) => DateUtils.toJavaDate(date)
case (t: Long, TimestampType) => DateUtils.toJavaTimestamp(t)
case (date: Int, DateType) => DateTimeUtils.toJavaDate(date)
case (t: Long, TimestampType) => DateTimeUtils.toJavaTimestamp(t)
case (s: UTF8String, StringType) => s.toString

// Pyrolite can handle Timestamp and Decimal
Expand Down Expand Up @@ -188,12 +188,12 @@ object EvaluatePython {
}): Row

case (c: java.util.Calendar, DateType) =>
DateUtils.fromJavaDate(new java.sql.Date(c.getTimeInMillis))
DateTimeUtils.fromJavaDate(new java.sql.Date(c.getTimeInMillis))

case (c: java.util.Calendar, TimestampType) =>
c.getTimeInMillis * 10000L
case (t: java.sql.Timestamp, TimestampType) =>
DateUtils.fromJavaTimestamp(t)
DateTimeUtils.fromJavaTimestamp(t)

case (_, udt: UserDefinedType[_]) =>
fromJava(obj, udt.sqlType)
Expand Down
Loading