Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
val catalog = newBasicCatalog()
val tbl1 = catalog.getTable("db2", "tbl1")
val newSchema = StructType(Seq(
StructField("new_field_1", IntegerType),
StructField("col1", IntegerType),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tejasapatil do you still remember why we update it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. This was done because before this PR the test case was removing bucketed columns in the alter operation. We decided to disallow removing bucketing columns and support this if needed in future.

Here is the discussion that we both had about this : #17644 (comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh. If I revert the change to this test case, it does not fail anymore. This is bad because the table properties still say that its bucketed over col1 but col1 is not in the modified table schema. I am taking a look to see what changed.

StructField("new_field_2", StringType),
StructField("a", IntegerType),
StructField("b", StringType)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1072,13 +1072,12 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
if (ctx.skewSpec != null) {
operationNotAllowed("CREATE TABLE ... SKEWED BY", ctx)
}
if (ctx.bucketSpec != null) {
operationNotAllowed("CREATE TABLE ... CLUSTERED BY", ctx)
}

val dataCols = Option(ctx.columns).map(visitColTypeList).getOrElse(Nil)
val partitionCols = Option(ctx.partitionColumns).map(visitColTypeList).getOrElse(Nil)
val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)
val selectQuery = Option(ctx.query).map(plan)
val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec)

// Note: Hive requires partition columns to be distinct from the schema, so we need
// to include the partition columns here explicitly
Expand Down Expand Up @@ -1119,6 +1118,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
tableType = tableType,
storage = storage,
schema = schema,
bucketSpec = bucketSpec,
provider = Some(DDLUtils.HIVE_PROVIDER),
partitionColumnNames = partitionCols.map(_.name),
properties = properties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -903,8 +903,13 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman
}

if (metadata.bucketSpec.isDefined) {
throw new UnsupportedOperationException(
"Creating Hive table with bucket spec is not supported yet.")
val bucketSpec = metadata.bucketSpec.get
builder ++= s"CLUSTERED BY (${bucketSpec.bucketColumnNames.mkString(",")})\n"

if (bucketSpec.sortColumnNames.nonEmpty) {
builder ++= s"SORTED BY (${bucketSpec.sortColumnNames.map(_ + " ASC").mkString(", ")})\n"
}
builder ++= s"INTO ${bucketSpec.numBuckets} BUCKETS\n"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
location = fileIndex,
partitionSchema = partitionSchema,
dataSchema = dataSchema,
// We don't support hive bucketed tables, only ones we write out.
bucketSpec = None,
fileFormat = fileFormat,
options = options)(sparkSession = sparkSession)
Expand Down Expand Up @@ -199,7 +198,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
sparkSession = sparkSession,
paths = rootPath.toString :: Nil,
userSpecifiedSchema = Option(dataSchema),
// We don't support hive bucketed tables, only ones we write out.
bucketSpec = None,
options = options,
className = fileType).resolveRelation(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema}
import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Order}
import org.apache.hadoop.hive.metastore.api.{SerDeInfo, StorageDescriptor}
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.metadata.{Hive, Partition => HivePartition, Table => HiveTable}
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC
import org.apache.hadoop.hive.ql.processors._
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.security.UserGroupInformation
Expand Down Expand Up @@ -373,10 +374,30 @@ private[hive] class HiveClientImpl(
Option(client.getTable(dbName, tableName, false)).map { h =>
// Note: Hive separates partition columns and the schema, but for us the
// partition columns are part of the schema
val cols = h.getCols.asScala.map(fromHiveColumn)
val partCols = h.getPartCols.asScala.map(fromHiveColumn)
val schema = StructType(h.getCols.asScala.map(fromHiveColumn) ++ partCols)
val schema = StructType(cols ++ partCols)

val bucketSpec = if (h.getNumBuckets > 0) {
val sortColumnOrders = h.getSortCols.asScala
// Currently Spark only supports columns to be sorted in ascending order
// but Hive can support both ascending and descending order. If all the columns
// are sorted in ascending order, only then propagate the sortedness information
// to downstream processing / optimizations in Spark
// TODO: In future we can have Spark support columns sorted in descending order
val allAscendingSorted = sortColumnOrders.forall(_.getOrder == HIVE_COLUMN_ORDER_ASC)

val sortColumnNames = if (allAscendingSorted) {
sortColumnOrders.map(_.getCol)
} else {
Seq()
}
Option(BucketSpec(h.getNumBuckets, h.getBucketCols.asScala, sortColumnNames))
} else {
None
}

// Skew spec, storage handler, and bucketing info can't be mapped to CatalogTable (yet)
// Skew spec and storage handler can't be mapped to CatalogTable (yet)
val unsupportedFeatures = ArrayBuffer.empty[String]

if (!h.getSkewedColNames.isEmpty) {
Expand All @@ -387,10 +408,6 @@ private[hive] class HiveClientImpl(
unsupportedFeatures += "storage handler"
}

if (!h.getBucketCols.isEmpty) {
unsupportedFeatures += "bucketing"
}

if (h.getTableType == HiveTableType.VIRTUAL_VIEW && partCols.nonEmpty) {
unsupportedFeatures += "partitioned view"
}
Expand All @@ -408,9 +425,11 @@ private[hive] class HiveClientImpl(
},
schema = schema,
partitionColumnNames = partCols.map(_.name),
// We can not populate bucketing information for Hive tables as Spark SQL has a different
// implementation of hash function from Hive.
bucketSpec = None,
// If the table is written by Spark, we will put bucketing information in table properties,
// and will always overwrite the bucket spec in hive metastore by the bucketing information
// in table properties. This means, if we have bucket spec in both hive metastore and
// table properties, we will trust the one in table properties.
bucketSpec = bucketSpec,
owner = h.getOwner,
createTime = h.getTTable.getCreateTime.toLong * 1000,
lastAccessTime = h.getLastAccessTime.toLong * 1000,
Expand Down Expand Up @@ -870,6 +889,23 @@ private[hive] object HiveClientImpl {
hiveTable.setViewOriginalText(t)
hiveTable.setViewExpandedText(t)
}

table.bucketSpec match {
case Some(bucketSpec) if DDLUtils.isHiveTable(table) =>
Copy link
Contributor

@cloud-fan cloud-fan May 15, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After thinking it more, I think this one is less important.

Our main goal is to allow spark to read bucketed tables written by hive, but not to allow hive to read bucketed tables written by spark.

How about we remove it for now and add it later after more discussion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but not to allow hive to read bucketed tables written by spark.

If Hive is NOT able to read the datasource tables which are bucketed, thats OK as its not compatible with hive. But for hive native tables, the interoperability amongst spark and hive is what I want.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok makes sense

hiveTable.setNumBuckets(bucketSpec.numBuckets)
hiveTable.setBucketCols(bucketSpec.bucketColumnNames.toList.asJava)

if (bucketSpec.sortColumnNames.nonEmpty) {
hiveTable.setSortCols(
bucketSpec.sortColumnNames
.map(col => new Order(col, HIVE_COLUMN_ORDER_ASC))
.toList
.asJava
)
}
case _ =>
}

hiveTable
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,27 @@ case class InsertIntoHiveTable(
}
}

table.bucketSpec match {
case Some(bucketSpec) =>
// Writes to bucketed hive tables are allowed only if user does not care about maintaining
// table's bucketing ie. both "hive.enforce.bucketing" and "hive.enforce.sorting" are
// set to false
val enforceBucketingConfig = "hive.enforce.bucketing"
val enforceSortingConfig = "hive.enforce.sorting"

val message = s"Output Hive table ${table.identifier} is bucketed but Spark" +
"currently does NOT populate bucketed output which is compatible with Hive."

if (hadoopConf.get(enforceBucketingConfig, "true").toBoolean ||
hadoopConf.get(enforceSortingConfig, "true").toBoolean) {
throw new AnalysisException(message)
} else {
logWarning(message + s" Inserting data anyways since both $enforceBucketingConfig and " +
s"$enforceSortingConfig are set to false.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we remove the bucket properties of the table in this case? what does hive do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • In Hive 1.x:
    • If enforcing configs are set: data is populated respecting the tables' bucketing spec
    • If enforcing configs are NOT set: data is populated and not conformant with the tables' bucketing spec
  • In Hive 2.x, these configs are not there and hive would always populate data conformant with table's bucketing.

With Spark, currently the data would be written out in a non conformant way despite of that config being set or not. This PR will go to the model of Hive 1.x. I am working on a next PR which would make things like Hive 2.0.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so after insertion(if not enforcing), the table is still a buckted table but read it will cause wrong result?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In hive: It would lead to wrong result.

In spark (over master and also after this PR): the table scan operation does not take bucketing into account so it would be read as a regular table. So, it won't be read "wrong", its just that we wont take advantage of bucketing.

}
case _ => // do nothing since table has no bucketing
}

val committer = FileCommitProtocol.instantiate(
sparkSession.sessionState.conf.fileCommitProtocolClass,
jobId = java.util.UUID.randomUUID().toString,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,13 +367,32 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle
}

test("create table - clustered by") {
val baseQuery = "CREATE TABLE my_table (id int, name string) CLUSTERED BY(id)"
val query1 = s"$baseQuery INTO 10 BUCKETS"
val query2 = s"$baseQuery SORTED BY(id) INTO 10 BUCKETS"
val e1 = intercept[ParseException] { parser.parsePlan(query1) }
val e2 = intercept[ParseException] { parser.parsePlan(query2) }
assert(e1.getMessage.contains("Operation not allowed"))
assert(e2.getMessage.contains("Operation not allowed"))
val numBuckets = 10
val bucketedColumn = "id"
val sortColumn = "id"
val baseQuery =
s"""
CREATE TABLE my_table (
$bucketedColumn int,
name string)
CLUSTERED BY($bucketedColumn)
"""

val query1 = s"$baseQuery INTO $numBuckets BUCKETS"
val (desc1, _) = extractTableDesc(query1)
assert(desc1.bucketSpec.isDefined)
val bucketSpec1 = desc1.bucketSpec.get
assert(bucketSpec1.numBuckets == numBuckets)
assert(bucketSpec1.bucketColumnNames.head.equals(bucketedColumn))
assert(bucketSpec1.sortColumnNames.isEmpty)

val query2 = s"$baseQuery SORTED BY($sortColumn) INTO $numBuckets BUCKETS"
val (desc2, _) = extractTableDesc(query2)
assert(desc2.bucketSpec.isDefined)
val bucketSpec2 = desc2.bucketSpec.get
assert(bucketSpec2.numBuckets == numBuckets)
assert(bucketSpec2.bucketColumnNames.head.equals(bucketedColumn))
assert(bucketSpec2.sortColumnNames.head.equals(sortColumn))
}

test("create table - skewed by") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,53 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef
}
}

private def testBucketedTable(testName: String)(f: String => Unit): Unit = {
test(s"Hive SerDe table - $testName") {
val hiveTable = "hive_table"

withTable(hiveTable) {
withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
sql(
s"""
|CREATE TABLE $hiveTable (a INT, d INT)
|PARTITIONED BY (b INT, c INT)
|CLUSTERED BY(a)
|SORTED BY(a, d) INTO 256 BUCKETS
|STORED AS TEXTFILE
""".stripMargin)
f(hiveTable)
}
}
}
}

testBucketedTable("INSERT should NOT fail if strict bucketing is NOT enforced") {
tableName =>
withSQLConf("hive.enforce.bucketing" -> "false", "hive.enforce.sorting" -> "false") {
sql(s"INSERT INTO TABLE $tableName SELECT 1, 4, 2 AS c, 3 AS b")
checkAnswer(sql(s"SELECT a, b, c, d FROM $tableName"), Row(1, 2, 3, 4))
}
}

testBucketedTable("INSERT should fail if strict bucketing / sorting is enforced") {
tableName =>
withSQLConf("hive.enforce.bucketing" -> "true", "hive.enforce.sorting" -> "false") {
intercept[AnalysisException] {
sql(s"INSERT INTO TABLE $tableName SELECT 1, 2, 3, 4")
}
}
withSQLConf("hive.enforce.bucketing" -> "false", "hive.enforce.sorting" -> "true") {
intercept[AnalysisException] {
sql(s"INSERT INTO TABLE $tableName SELECT 1, 2, 3, 4")
}
}
withSQLConf("hive.enforce.bucketing" -> "true", "hive.enforce.sorting" -> "true") {
intercept[AnalysisException] {
sql(s"INSERT INTO TABLE $tableName SELECT 1, 2, 3, 4")
}
}
}

test("SPARK-20594: hive.exec.stagingdir was deleted by Hive") {
// Set hive.exec.stagingdir under the table directory without start with ".".
withSQLConf("hive.exec.stagingdir" -> "./test") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,21 +247,16 @@ class ShowCreateTableSuite extends QueryTest with SQLTestUtils with TestHiveSing
}
}

test("hive bucketing is not supported") {
test("hive bucketing is supported") {
withTable("t1") {
createRawHiveTable(
sql(
s"""CREATE TABLE t1 (a INT, b STRING)
|CLUSTERED BY (a)
|SORTED BY (b)
|INTO 2 BUCKETS
""".stripMargin
)

val cause = intercept[AnalysisException] {
sql("SHOW CREATE TABLE t1")
}

assert(cause.getMessage.contains(" - bucketing"))
checkCreateTable("t1")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,26 @@ class HiveDDLSuite
}
}

test("desc table for Hive table - bucketed + sorted table") {
withTable("tbl") {
sql(s"""
CREATE TABLE tbl (id int, name string)
PARTITIONED BY (ds string)
CLUSTERED BY(id)
SORTED BY(id, name) INTO 1024 BUCKETS
""")

val x = sql("DESC FORMATTED tbl").collect()
assert(x.containsSlice(
Seq(
Row("Num Buckets", "1024", ""),
Row("Bucket Columns", "[`id`]", ""),
Row("Sort Columns", "[`id`, `name`]", "")
)
))
}
}

test("desc table for data source table using Hive Metastore") {
assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "hive")
val tabName = "tab1"
Expand Down