diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index c4f4d8efd6df..72bef9e3aed4 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -77,10 +77,19 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } test("resolve avro data source") { - Seq("avro", "com.databricks.spark.avro").foreach { provider => + val databricksAvro = "com.databricks.spark.avro" + // By default the backward compatibility for com.databricks.spark.avro is enabled. + Seq("avro", "org.apache.spark.sql.avro.AvroFileFormat", databricksAvro).foreach { provider => assert(DataSource.lookupDataSource(provider, spark.sessionState.conf) === classOf[org.apache.spark.sql.avro.AvroFileFormat]) } + + withSQLConf(SQLConf.LEGACY_REPLACE_DATABRICKS_SPARK_AVRO_ENABLED.key -> "false") { + val message = intercept[AnalysisException] { + DataSource.lookupDataSource(databricksAvro, spark.sessionState.conf) + }.getMessage + assert(message.contains(s"Failed to find data source: $databricksAvro")) + } } test("reading from multiple paths") { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index bffdddcf3fdb..94ba02337e64 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1459,6 +1459,13 @@ object SQLConf { .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION) .createWithDefault(Deflater.DEFAULT_COMPRESSION) + val LEGACY_REPLACE_DATABRICKS_SPARK_AVRO_ENABLED = + buildConf("spark.sql.legacy.replaceDatabricksSparkAvro.enabled") + .doc("If it is set to true, the data source provider com.databricks.spark.avro is mapped " + + "to the built-in but external Avro data source module for backward compatibility.") + .booleanConf + .createWithDefault(true) + val LEGACY_SETOPS_PRECEDENCE_ENABLED = buildConf("spark.sql.legacy.setopsPrecedence.enabled") .internal() @@ -1871,6 +1878,9 @@ class SQLConf extends Serializable with Logging { def avroDeflateLevel: Int = getConf(SQLConf.AVRO_DEFLATE_LEVEL) + def replaceDatabricksSparkAvroEnabled: Boolean = + getConf(SQLConf.LEGACY_REPLACE_DATABRICKS_SPARK_AVRO_ENABLED) + def setOpsPrecedenceEnforced: Boolean = getConf(SQLConf.LEGACY_SETOPS_PRECEDENCE_ENABLED) def parallelFileListingInStatsComputation: Boolean = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index b1a10fdb6020..1dcf9f3185de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -571,7 +571,6 @@ object DataSource extends Logging { val nativeOrc = classOf[OrcFileFormat].getCanonicalName val socket = classOf[TextSocketSourceProvider].getCanonicalName val rate = classOf[RateStreamProvider].getCanonicalName - val avro = "org.apache.spark.sql.avro.AvroFileFormat" Map( "org.apache.spark.sql.jdbc" -> jdbc, @@ -593,7 +592,6 @@ object DataSource extends Logging { "org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm, "org.apache.spark.ml.source.libsvm" -> libsvm, "com.databricks.spark.csv" -> csv, - "com.databricks.spark.avro" -> avro, "org.apache.spark.sql.execution.streaming.TextSocketSourceProvider" -> socket, "org.apache.spark.sql.execution.streaming.RateSourceProvider" -> rate ) @@ -616,6 +614,8 @@ object DataSource extends Logging { case name if name.equalsIgnoreCase("orc") && conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "hive" => "org.apache.spark.sql.hive.orc.OrcFileFormat" + case "com.databricks.spark.avro" if conf.replaceDatabricksSparkAvroEnabled => + "org.apache.spark.sql.avro.AvroFileFormat" case name => name } val provider2 = s"$provider1.DefaultSource" @@ -637,6 +637,18 @@ object DataSource extends Logging { "Hive built-in ORC data source must be used with Hive support enabled. " + "Please use the native ORC data source by setting 'spark.sql.orc.impl' to " + "'native'") + } else if (provider1.toLowerCase(Locale.ROOT) == "avro" || + provider1 == "com.databricks.spark.avro" || + provider1 == "org.apache.spark.sql.avro") { + throw new AnalysisException( + s"Failed to find data source: $provider1. Avro is built-in but external data " + + "source module since Spark 2.4. Please deploy the application as per " + + "the deployment section of \"Apache Avro Data Source Guide\".") + } else if (provider1.toLowerCase(Locale.ROOT) == "kafka") { + throw new AnalysisException( + s"Failed to find data source: $provider1. Please deploy the application as " + + "per the deployment section of " + + "\"Structured Streaming + Kafka Integration Guide\".") } else { throw new ClassNotFoundException( s"Failed to find data source: $provider1. Please find packages at " + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala index 95460fa70d8f..0aa67bf1b0d4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala @@ -76,6 +76,24 @@ class ResolvedDataSourceSuite extends SparkFunSuite with SharedSQLContext { classOf[org.apache.spark.sql.execution.datasources.csv.CSVFileFormat]) } + test("avro: show deploy guide for loading the external avro module") { + Seq("avro", "org.apache.spark.sql.avro").foreach { provider => + val message = intercept[AnalysisException] { + getProvidingClass(provider) + }.getMessage + assert(message.contains(s"Failed to find data source: $provider")) + assert(message.contains("Please deploy the application as per the deployment section of")) + } + } + + test("kafka: show deploy guide for loading the external kafka module") { + val message = intercept[AnalysisException] { + getProvidingClass("kafka") + }.getMessage + assert(message.contains("Failed to find data source: kafka")) + assert(message.contains("Please deploy the application as per the deployment section of")) + } + test("error message for unknown data sources") { val error = intercept[ClassNotFoundException] { getProvidingClass("asfdwefasdfasdf")