Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,19 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
}

test("resolve avro data source") {
Seq("avro", "com.databricks.spark.avro").foreach { provider =>
val databricksAvro = "com.databricks.spark.avro"
// By default the backward compatibility for com.databricks.spark.avro is enabled.
Seq("avro", "org.apache.spark.sql.avro.AvroFileFormat", databricksAvro).foreach { provider =>
assert(DataSource.lookupDataSource(provider, spark.sessionState.conf) ===
classOf[org.apache.spark.sql.avro.AvroFileFormat])
}

withSQLConf(SQLConf.LEGACY_REPLACE_DATABRICKS_SPARK_AVRO_ENABLED.key -> "false") {
val message = intercept[AnalysisException] {
DataSource.lookupDataSource(databricksAvro, spark.sessionState.conf)
}.getMessage
assert(message.contains(s"Failed to find data source: $databricksAvro"))
}
}

test("reading from multiple paths") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1459,6 +1459,13 @@ object SQLConf {
.checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION)
.createWithDefault(Deflater.DEFAULT_COMPRESSION)

val LEGACY_REPLACE_DATABRICKS_SPARK_AVRO_ENABLED =
buildConf("spark.sql.legacy.replaceDatabricksSparkAvro.enabled")
.doc("If it is set to true, the data source provider com.databricks.spark.avro is mapped " +
"to the built-in but external Avro data source module for backward compatibility.")
.booleanConf
.createWithDefault(true)

val LEGACY_SETOPS_PRECEDENCE_ENABLED =
buildConf("spark.sql.legacy.setopsPrecedence.enabled")
.internal()
Expand Down Expand Up @@ -1871,6 +1878,9 @@ class SQLConf extends Serializable with Logging {

def avroDeflateLevel: Int = getConf(SQLConf.AVRO_DEFLATE_LEVEL)

def replaceDatabricksSparkAvroEnabled: Boolean =
getConf(SQLConf.LEGACY_REPLACE_DATABRICKS_SPARK_AVRO_ENABLED)

def setOpsPrecedenceEnforced: Boolean = getConf(SQLConf.LEGACY_SETOPS_PRECEDENCE_ENABLED)

def parallelFileListingInStatsComputation: Boolean =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,6 @@ object DataSource extends Logging {
val nativeOrc = classOf[OrcFileFormat].getCanonicalName
val socket = classOf[TextSocketSourceProvider].getCanonicalName
val rate = classOf[RateStreamProvider].getCanonicalName
val avro = "org.apache.spark.sql.avro.AvroFileFormat"

Map(
"org.apache.spark.sql.jdbc" -> jdbc,
Expand All @@ -593,7 +592,6 @@ object DataSource extends Logging {
"org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm,
"org.apache.spark.ml.source.libsvm" -> libsvm,
"com.databricks.spark.csv" -> csv,
"com.databricks.spark.avro" -> avro,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gengliangwang, not a big deal but how about adding the entry at 618 here conditionally since this is called backward compatibility map?

Copy link
Member Author

@gengliangwang gengliangwang Aug 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon I did add it in the backwardCompatibilityMap at first. But later on I find that the configuration won't be effective in run time, since the backwardCompatibilityMap is a val. (We can change backwardCompatibilityMap to method to resolve that.)

Also the code looks ugly.

val retMap = Map(...)
if(...) {
 retMap + (k -> v)
} else {
 retMap
}
// it would be worse if we have more configurations.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah okie makes sense if there's a reason.

"org.apache.spark.sql.execution.streaming.TextSocketSourceProvider" -> socket,
"org.apache.spark.sql.execution.streaming.RateSourceProvider" -> rate
)
Expand All @@ -616,6 +614,8 @@ object DataSource extends Logging {
case name if name.equalsIgnoreCase("orc") &&
conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "hive" =>
"org.apache.spark.sql.hive.orc.OrcFileFormat"
case "com.databricks.spark.avro" if conf.replaceDatabricksSparkAvroEnabled =>
"org.apache.spark.sql.avro.AvroFileFormat"
case name => name
}
val provider2 = s"$provider1.DefaultSource"
Expand All @@ -637,6 +637,18 @@ object DataSource extends Logging {
"Hive built-in ORC data source must be used with Hive support enabled. " +
"Please use the native ORC data source by setting 'spark.sql.orc.impl' to " +
"'native'")
} else if (provider1.toLowerCase(Locale.ROOT) == "avro" ||
provider1 == "com.databricks.spark.avro" ||
provider1 == "org.apache.spark.sql.avro") {
throw new AnalysisException(
s"Failed to find data source: $provider1. Avro is built-in but external data " +
"source module since Spark 2.4. Please deploy the application as per " +
"the deployment section of \"Apache Avro Data Source Guide\".")
} else if (provider1.toLowerCase(Locale.ROOT) == "kafka") {
throw new AnalysisException(
s"Failed to find data source: $provider1. Please deploy the application as " +
"per the deployment section of " +
"\"Structured Streaming + Kafka Integration Guide\".")
} else {
throw new ClassNotFoundException(
s"Failed to find data source: $provider1. Please find packages at " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,24 @@ class ResolvedDataSourceSuite extends SparkFunSuite with SharedSQLContext {
classOf[org.apache.spark.sql.execution.datasources.csv.CSVFileFormat])
}

test("avro: show deploy guide for loading the external avro module") {
Seq("avro", "org.apache.spark.sql.avro").foreach { provider =>
val message = intercept[AnalysisException] {
getProvidingClass(provider)
}.getMessage
assert(message.contains(s"Failed to find data source: $provider"))
assert(message.contains("Please deploy the application as per the deployment section of"))
}
}

test("kafka: show deploy guide for loading the external kafka module") {
val message = intercept[AnalysisException] {
getProvidingClass("kafka")
}.getMessage
assert(message.contains("Failed to find data source: kafka"))
assert(message.contains("Please deploy the application as per the deployment section of"))
}

test("error message for unknown data sources") {
val error = intercept[ClassNotFoundException] {
getProvidingClass("asfdwefasdfasdf")
Expand Down