Skip to content

Commit

Permalink
Issue-83 Fix the repartitioning logic to handle statement IDs (qubole#84
Browse files Browse the repository at this point in the history
)

For UPDATE/DELETE, we were repartitioning based on encoded bucketIds so that all rows with same bucket are processed by the same task.
However, rows can have same bucket but different encoded bucketIds as encoded bucketIds are composed of both bucket+statementId.
Hence, row with same bucket end up going to different tasks which can cause conflict as different task will be writing to the same delete delta bucket file.

Approved-by: Sourabh Goyal <sourabhg@qubole.com>
  • Loading branch information
amoghmargoor authored Jul 24, 2020
1 parent 6abfb63 commit b0a5dce
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 7 deletions.
54 changes: 54 additions & 0 deletions src/it/scala/com/qubole/spark/hiveacid/MergeSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,60 @@ class MergeSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAfterAll
assert(thrown.getMessage().contains("UPDATE on the partition columns are not allowed"))
}

test("Check for Merge update on multi statements with 1 bucket") {
/** In this test following is done:
* ** Insert into target table DF with statement id 1. Insert should just create one bucket file i.e., bucket0000
* ** Insert into target table DF with no statement id. Insert should just create one bucket file i.e., bucket0000
* ** Note encoded bucket id in both the above rows will be different due to difference in statement Id
* ** Try to update one row from each of the above transaction. It is expected that both are updated
*/
val spark = helper.spark
import spark.sqlContext.implicits._
val targetTable = s"$DEFAULT_DBNAME.target_bucket1"
val sourceTable = s"$DEFAULT_DBNAME.source_bucket1"

helper.hiveExecute(s"create table $targetTable (i int) stored as orc tblproperties('transactional'='true')")
val df1 = spark.sparkContext.parallelize(Seq(1, 2, 3)).toDF().repartition(1)
val htable = HiveAcidTable.fromSparkSession(spark, targetTable)
htable.insertInto(df1, Some(1))
val df2 = spark.sparkContext.parallelize(Seq(4, 5, 6)).toDF().repartition(1)
htable.insertInto(df2)
helper.hiveExecute(s"create table $sourceTable (i int) stored as orc tblproperties('transactional' = 'true')")
helper.sparkSQL(s"insert into $sourceTable values (1), (4)")
helper.sparkSQL(s"Merge into $targetTable t using $sourceTable s on t.i = s.i when matched then update set i = s.i + 1")

val res = helper.sparkCollect(s"select * from $targetTable order by i")
val expected = s"2\n2\n3\n5\n5\n6"
helper.compareResult(expected, res)
}

test("Check for Merge update on multi statements with 2 buckets") {
/** In this test following is done:
* ** Insert into target table DF with statement id 1. Insert should just create two bucket file i.e., bucket0000, bucket0001
* ** Insert into target table DF with no statement id. Insert should just create one bucket file i.e., bucket0000, bucket0001
* ** Note encoded bucket id in rows of different transaction will be different due to difference in statement Id.
* ** Try to update all the rows from each of the above transaction. It is expected that all rows are updated.
*/
val spark = helper.spark
import spark.sqlContext.implicits._
val targetTable = s"$DEFAULT_DBNAME.target_bucket2"
val sourceTable = s"$DEFAULT_DBNAME.source_bucket2"

helper.hiveExecute(s"create table $targetTable (i int) stored as orc tblproperties('transactional'='true')")
val df1 = spark.sparkContext.parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).toDF().repartition(2)
val htable = HiveAcidTable.fromSparkSession(spark, targetTable)
htable.insertInto(df1, Some(1))
val df2 = spark.sparkContext.parallelize(Seq(11, 12, 13, 14, 15, 16, 17, 18, 19, 20)).toDF().repartition(2)
htable.insertInto(df2)
helper.hiveExecute(s"create table $sourceTable (i int) stored as orc tblproperties('transactional' = 'true')")
helper.sparkSQL(s"insert into $sourceTable select * from $targetTable")
helper.sparkSQL(s"Merge into $targetTable t using $sourceTable s on t.i = s.i when matched then update set i = s.i + 1")

val res = helper.sparkCollect(s"select * from $targetTable order by i")
val expected = s"2\n3\n4\n5\n6\n7\n8\n9\n10\n11\n12\n13\n14\n15\n16\n17\n18\n19\n20\n21"
helper.compareResult(expected, res)
}

// Merge test for full acid tables
def mergeTestWithJustInsert(tType: String, isPartitioned: Boolean): Unit = {
val tableNameSpark = if (isPartitioned) {
Expand Down
28 changes: 21 additions & 7 deletions src/main/scala/com/qubole/spark/hiveacid/writer/TableWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,18 @@ package com.qubole.spark.hiveacid.writer

import scala.collection.JavaConverters._
import scala.language.implicitConversions

import com.qubole.spark.hiveacid._
import com.qubole.spark.hiveacid.hive.HiveAcidMetadata
import com.qubole.spark.hiveacid.writer.hive.{HiveAcidFullAcidWriter, HiveAcidInsertOnlyWriter, HiveAcidWriterOptions}
import com.qubole.spark.hiveacid.transaction._
import com.qubole.spark.hiveacid.util.SerializableConfiguration
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.{DataFrame, SparkSession, functions}
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.command.AlterTableAddPartitionCommand
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType

/**
Expand Down Expand Up @@ -151,19 +149,35 @@ private[hiveacid] class TableWriter(sparkSession: SparkSession,

val resultRDD =
operationType match {
// In order to read data from delete delts, hive uses mergesort, which requires
// In order to read data from delete deltas, hive uses mergesort, which requires
// originalWriteId, bucket, and rowId in ascending order, and currentWriteId in descending order.
// We take care of originalWriteId, bucket, and rowId in asc order here. We only write file per bucket-transaction,
// hence currentWriteId remains same throughout the file and doesn't need ordering.
//
// Deleted rowId needs to be in same bucketed file name as the original row. To achieve this,
// we repartition into 4096 partitions (i.e maximum number of buckets).
// we repartition into 4096 partitions (i.e maximum number of buckets) based on bucket Id.
// This ensures all rows of one bucket goes to same partition.
//
// ************** Repartitioning Logic *******************
//
// rowId.bucketId is composed of following.

// top 3 bits - version code.
// next 1 bit - reserved for future
// next 12 bits - the bucket ID
// next 4 bits reserved for future
// remaining 12 bits - the statement ID - 0-based numbering of all statements within a
// transaction. Each leg of a multi-insert statement gets a separate statement ID.
// The reserved bits align it so that it easier to interpret it in Hex.
//
// We need to repartition only on the basis of 12 bits representing bucketID
// We extract by
// rowId.bucketId OR 268369920 (0b00001111111111110000000000000000) >>> (rightshift) by 16 bits
//
// There is still a chance that rows from multiple buckets go to same partition as well, but this is expected to work!
case HiveAcidOperation.DELETE | HiveAcidOperation.UPDATE =>
df.repartition(MAX_NUMBER_OF_BUCKETS, col("rowId.bucketId"))
.toDF.sortWithinPartitions("rowId.bucketId", "rowId.writeId", "rowId.rowId")
df.repartition(MAX_NUMBER_OF_BUCKETS, functions.expr("shiftright(rowId.bucketId & 268369920, 16)"))
.toDF.sortWithinPartitions("rowId.writeId", "rowId.bucketId", "rowId.rowId")
.toDF.queryExecution.executedPlan.execute()
case HiveAcidOperation.INSERT_OVERWRITE | HiveAcidOperation.INSERT_INTO =>
df.queryExecution.executedPlan.execute()
Expand Down

0 comments on commit b0a5dce

Please sign in to comment.