Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,7 @@ case class InsertIntoHiveTable(
val hadoopConf = sessionState.newHadoopConf()
val tmpLocation = getExternalTmpPath(tableLocation, hadoopConf)
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
val isCompressed =
sessionState.conf.getConfString("hive.exec.compress.output", "false").toBoolean
val isCompressed = hadoopConf.get("hive.exec.compress.output", "false").toBoolean

if (isCompressed) {
// Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
Expand Down Expand Up @@ -182,15 +181,13 @@ case class InsertIntoHiveTable(
// Validate partition spec if there exist any dynamic partitions
if (numDynamicPartitions > 0) {
// Report error if dynamic partitioning is not enabled
if (!sessionState.conf.getConfString("hive.exec.dynamic.partition", "true").toBoolean) {
if (!hadoopConf.get("hive.exec.dynamic.partition", "true").toBoolean) {
throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg)
}

// Report error if dynamic partition strict mode is on but no static partition is found
if (numStaticPartitions == 0 &&
sessionState.conf.getConfString(
"hive.exec.dynamic.partition.mode", "strict").equalsIgnoreCase("strict"))
{
hadoopConf.get("hive.exec.dynamic.partition.mode", "strict").equalsIgnoreCase("strict")) {
throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,31 +26,34 @@ import scala.util.Try
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.scalatest.BeforeAndAfter

import org.apache.spark.{SparkException, SparkFiles}
import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
import org.apache.spark.SparkFiles
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSession}
import org.apache.spark.sql.catalyst.expressions.Cast
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.test.{TestHive, TestHiveContext}
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils

case class TestData(a: Int, b: String)

/**
* A set of test cases expressed in Hive QL that are not covered by the tests
* included in the hive distribution.
*/
class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAndAfter {
private val originalTimeZone = TimeZone.getDefault
private val originalLocale = Locale.getDefault

import org.apache.spark.sql.hive.test.TestHive.implicits._

private val originalCrossJoinEnabled = TestHive.conf.crossJoinEnabled

def spark: SparkSession = sparkSession

override def beforeAll() {
super.beforeAll()
TestHive.setCacheTables(true)
Expand Down Expand Up @@ -1199,6 +1202,27 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
}
assertUnsupportedFeature { sql("DROP TEMPORARY MACRO SIGMOID") }
}

test("dynamic partitioning is allowed when hive.exec.dynamic.partition.mode is nonstrict") {
val modeConfKey = "hive.exec.dynamic.partition.mode"
withTable("with_parts") {
sql("CREATE TABLE with_parts(key INT) PARTITIONED BY (p INT)")

withSQLConf(modeConfKey -> "nonstrict") {
sql("INSERT OVERWRITE TABLE with_parts partition(p) select 1, 2")
assert(spark.table("with_parts").filter($"p" === 2).collect().head == Row(1, 2))
}

val originalValue = spark.sparkContext.hadoopConfiguration.get(modeConfKey, "nonstrict")
try {
spark.sparkContext.hadoopConfiguration.set(modeConfKey, "nonstrict")
sql("INSERT OVERWRITE TABLE with_parts partition(p) select 3, 4")
assert(spark.table("with_parts").filter($"p" === 4).collect().head == Row(3, 4))
} finally {
spark.sparkContext.hadoopConfiguration.set(modeConfKey, originalValue)
}
}
}
}

// for SPARK-2180 test
Expand Down