Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.io.{ByteArrayOutputStream, File, PrintStream}
import java.net.URI

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.hadoop.mapred.TextInputFormat
Expand Down Expand Up @@ -697,6 +696,73 @@ class VersionsSuite extends SparkFunSuite with Logging {
assert(versionSpark.table("t1").collect() === Array(Row(2)))
}
}

test(s"$version: Decimal support of Avro Hive serde") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should consider generalizing this test for all supported avro logical types: https://avro.apache.org/docs/1.8.1/spec.html#Logical+Types. Perhaps add a TODO?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Let me post a todo there.

val tableName = "tab1"
// TODO: add the other logical types. For details, see the link:
// https://avro.apache.org/docs/1.8.1/spec.html#Logical+Types
val avroSchema =
"""{
| "name": "test_record",
| "type": "record",
| "fields": [ {
| "name": "f0",
| "type": [
| "null",
| {
| "precision": 38,
| "scale": 2,
| "type": "bytes",
| "logicalType": "decimal"
| }
| ]
| } ]
|}
""".stripMargin

Seq(true, false).foreach { isPartitioned =>
withTable(tableName) {
val partitionClause = if (isPartitioned) "PARTITIONED BY (ds STRING)" else ""
// Creates the (non-)partitioned Avro table
versionSpark.sql(
s"""
|CREATE TABLE $tableName
|$partitionClause
|ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
|STORED AS
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
|TBLPROPERTIES ('avro.schema.literal' = '$avroSchema')
""".stripMargin
)

val errorMsg = "data type mismatch: cannot cast DecimalType(2,1) to BinaryType"

if (isPartitioned) {
val insertStmt = s"INSERT OVERWRITE TABLE $tableName partition (ds='a') SELECT 1.3"
if (version == "0.12" || version == "0.13") {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0.13 or prior does not support the logical type Decimal. See https://issues.apache.org/jira/browse/HIVE-5823

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick question: I was trying to understand why this depends on the metastore version and realized that in case of schema mismatch (such as DecimalType and Binary here), HiveExternalCatalog always respects the table schema from hive over spark SQL. Is it worth having this limitation for all generalized cases?

Copy link
Member Author

@gatorsmile gatorsmile Aug 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Our dear Hive metastore change the schema we inferred. We got the warning messages for 0.12 and 0.13

12:04:09.816 WARN org.apache.spark.sql.hive.test.TestHiveExternalCatalog: The table schema given by Hive metastore(structf0:binary,ds:string) is different from the schema when this table was created by Spark SQL(structf0:decimal(38,2),ds:string). We have to fall back to the table schema from Hive metastore which is not case preserving.

Copy link
Member Author

@gatorsmile gatorsmile Aug 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will submit a separate PR for adding a conf to ignore the schema overwritten by hive metastore.

val e = intercept[AnalysisException](versionSpark.sql(insertStmt)).getMessage
assert(e.contains(errorMsg))
} else {
versionSpark.sql(insertStmt)
assert(versionSpark.table(tableName).collect() ===
versionSpark.sql("SELECT 1.30, 'a'").collect())
}
} else {
val insertStmt = s"INSERT OVERWRITE TABLE $tableName SELECT 1.3"
if (version == "0.12" || version == "0.13") {
val e = intercept[AnalysisException](versionSpark.sql(insertStmt)).getMessage
assert(e.contains(errorMsg))
} else {
versionSpark.sql(insertStmt)
assert(versionSpark.table(tableName).collect() ===
versionSpark.sql("SELECT 1.30").collect())
}
}
}
}
}

// TODO: add more tests.
}
}