Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,9 @@ public static void populate(ColumnVector col, InternalRow row, int fieldIdx) {
col.getChildColumn(0).putInts(0, capacity, c.months);
col.getChildColumn(1).putLongs(0, capacity, c.microseconds);
} else if (t instanceof DateType) {
Date date = (Date)row.get(fieldIdx, t);
col.putInts(0, capacity, DateTimeUtils.fromJavaDate(date));
col.putInts(0, capacity, row.getInt(fieldIdx));
} else if (t instanceof TimestampType) {
col.putLongs(0, capacity, row.getLong(fieldIdx));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ public InternalRow copy() {
DataType dt = columns[i].dataType();
if (dt instanceof BooleanType) {
row.setBoolean(i, getBoolean(i));
} else if (dt instanceof ByteType) {
row.setByte(i, getByte(i));
} else if (dt instanceof ShortType) {
row.setShort(i, getShort(i));
} else if (dt instanceof IntegerType) {
row.setInt(i, getInt(i));
} else if (dt instanceof LongType) {
Expand All @@ -154,6 +158,8 @@ public InternalRow copy() {
row.setDecimal(i, getDecimal(i, t.precision(), t.scale()), t.precision());
} else if (dt instanceof DateType) {
row.setInt(i, getInt(i));
} else if (dt instanceof TimestampType) {
row.setLong(i, getLong(i));
} else {
throw new RuntimeException("Not implemented. " + dt);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ import org.apache.parquet.schema.{MessageType, MessageTypeParser}
import org.apache.spark.SparkException
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection}
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, UnsafeRow}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

// Write support class for nested groups: ParquetWriter initializes GroupWriteSupport
// with an empty configuration (it is after all not intended to be used in this way?)
Expand Down Expand Up @@ -689,6 +690,52 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
}
}
}

test("VectorizedParquetRecordReader - partition column types") {
withTempPath { dir =>
Seq(1).toDF().repartition(1).write.parquet(dir.getCanonicalPath)

val dataTypes =
Seq(StringType, BooleanType, ByteType, ShortType, IntegerType, LongType,
FloatType, DoubleType, DecimalType(25, 5), DateType, TimestampType)

val constantValues =
Seq(
UTF8String.fromString("a string"),
true,
1.toByte,
2.toShort,
3,
Long.MaxValue,
0.25.toFloat,
0.75D,
Decimal("1234.23456"),
DateTimeUtils.fromJavaDate(java.sql.Date.valueOf("2015-01-01")),
DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123")))

dataTypes.zip(constantValues).foreach { case (dt, v) =>
val schema = StructType(StructField("pcol", dt) :: Nil)
val vectorizedReader = new VectorizedParquetRecordReader
val partitionValues = new GenericMutableRow(Array(v))
val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0)

try {
vectorizedReader.initialize(file, null)
vectorizedReader.initBatch(schema, partitionValues)
vectorizedReader.nextKeyValue()
val row = vectorizedReader.getCurrentValue.asInstanceOf[InternalRow]

// Use `GenericMutableRow` by explicitly copying rather than `ColumnarBatch`
// in order to use get(...) method which is not implemented in `ColumnarBatch`.
val actual = row.copy().get(1, dt)
val expected = v
assert(actual == expected)
} finally {
vectorizedReader.close()
}
}
}
}
}

class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1787,6 +1787,27 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}

test("SPARK-17354: Partitioning by dates/timestamps works with Parquet vectorized reader") {
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
sql(
"""CREATE TABLE order(id INT)
|PARTITIONED BY (pd DATE, pt TIMESTAMP)
|STORED AS PARQUET
""".stripMargin)

sql("set hive.exec.dynamic.partition.mode=nonstrict")
sql(
"""INSERT INTO TABLE order PARTITION(pd, pt)
|SELECT 1 AS id, CAST('1990-02-24' AS DATE) AS pd, CAST('1990-02-24' AS TIMESTAMP) AS pt
""".stripMargin)
val actual = sql("SELECT * FROM order")
val expected = sql(
"SELECT 1 AS id, CAST('1990-02-24' AS DATE) AS pd, CAST('1990-02-24' AS TIMESTAMP) AS pt")
checkAnswer(actual, expected)
sql("DROP TABLE order")
}
}

def testCommandAvailable(command: String): Boolean = {
val attempt = Try(Process(command).run(ProcessLogger(_ => ())).exitValue())
attempt.isSuccess && attempt.get == 0
Expand Down