Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import org.apache.commons.io.FileUtils

import org.apache.spark.SparkException
import org.apache.spark.sql._
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -76,13 +75,6 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
}, new GenericDatumReader[Any]()).getSchema.toString(false)
}

test("resolve avro data source") {
Seq("avro", "com.databricks.spark.avro").foreach { provider =>
assert(DataSource.lookupDataSource(provider, spark.sessionState.conf) ===
classOf[org.apache.spark.sql.avro.AvroFileFormat])
}
}

test("reading from multiple paths") {
val df = spark.read.format("avro").load(episodesAvro, episodesAvro)
assert(df.count == 16)
Expand Down Expand Up @@ -503,7 +495,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
// get the same values back.
withTempPath { tempDir =>
val name = "AvroTest"
val namespace = "org.apache.spark.avro"
val namespace = "com.databricks.spark.avro"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why change the name space in test?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)

val avroDir = tempDir + "/namedAvro"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,6 @@ object DataSource extends Logging {
val nativeOrc = classOf[OrcFileFormat].getCanonicalName
val socket = classOf[TextSocketSourceProvider].getCanonicalName
val rate = classOf[RateStreamProvider].getCanonicalName
val avro = "org.apache.spark.sql.avro.AvroFileFormat"

Map(
"org.apache.spark.sql.jdbc" -> jdbc,
Expand All @@ -593,7 +592,6 @@ object DataSource extends Logging {
"org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm,
"org.apache.spark.ml.source.libsvm" -> libsvm,
"com.databricks.spark.csv" -> csv,
"com.databricks.spark.avro" -> avro,
"org.apache.spark.sql.execution.streaming.TextSocketSourceProvider" -> socket,
"org.apache.spark.sql.execution.streaming.RateSourceProvider" -> rate
)
Expand Down Expand Up @@ -637,6 +635,12 @@ object DataSource extends Logging {
"Hive built-in ORC data source must be used with Hive support enabled. " +
"Please use the native ORC data source by setting 'spark.sql.orc.impl' to " +
"'native'")
} else if (provider1.toLowerCase(Locale.ROOT) == "avro" ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have the same check for kafka?

provider1 == "com.databricks.spark.avro") {
throw new AnalysisException(
s"Failed to find data source: ${provider1.toLowerCase(Locale.ROOT)}. " +
"AVRO is built-in data source since Spark 2.4. Please deploy the application " +
"as per https://spark.apache.org/docs/latest/avro-data-source.html#deploying")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am creating a documentation for AVRO data source. Let's merge this PR after the README is done.

} else {
throw new ClassNotFoundException(
s"Failed to find data source: $provider1. Please find packages at " +
Expand Down
16 changes: 16 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1699,6 +1699,22 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
assert(e.message.contains("Hive built-in ORC data source must be used with Hive support"))

e = intercept[AnalysisException] {
sql(s"select id from `com.databricks.spark.avro`.`file_path`")
}
assert(e.message.contains("Failed to find data source: com.databricks.spark.avro."))

// data source type is case insensitive
e = intercept[AnalysisException] {
sql(s"select id from Avro.`file_path`")
}
assert(e.message.contains("Failed to find data source: avro."))

e = intercept[AnalysisException] {
sql(s"select id from avro.`file_path`")
}
assert(e.message.contains("Failed to find data source: avro."))

e = intercept[AnalysisException] {
sql(s"select id from `org.apache.spark.sql.sources.HadoopFsRelationProvider`.`file_path`")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,19 @@ class ResolvedDataSourceSuite extends SparkFunSuite with SharedSQLContext {
}

test("error message for unknown data sources") {
val error = intercept[ClassNotFoundException] {
val error1 = intercept[AnalysisException] {
getProvidingClass("avro")
}
assert(error1.getMessage.contains("Failed to find data source: avro."))

val error2 = intercept[AnalysisException] {
getProvidingClass("com.databricks.spark.avro")
}
assert(error2.getMessage.contains("Failed to find data source: com.databricks.spark.avro."))

val error3 = intercept[ClassNotFoundException] {
getProvidingClass("asfdwefasdfasdf")
}
assert(error.getMessage.contains("Failed to find data source: asfdwefasdfasdf."))
assert(error3.getMessage.contains("Failed to find data source: asfdwefasdfasdf."))
}
}