diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index ec1627a3898bf..8d11ed9465985 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -630,12 +630,8 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { intercept[FileNotFoundException] { withTempPath { dir => FileUtils.touch(new File(dir, "test")) - val hadoopConf = spark.sqlContext.sparkContext.hadoopConfiguration - try { - hadoopConf.set(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, "true") + withSQLConf(AvroFileFormat.IgnoreFilesWithoutExtensionProperty -> "true") { spark.read.format("avro").load(dir.toString) - } finally { - hadoopConf.unset(AvroFileFormat.IgnoreFilesWithoutExtensionProperty) } } } @@ -709,15 +705,10 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { Files.createFile(new File(tempSaveDir, "non-avro").toPath) - val hadoopConf = spark.sqlContext.sparkContext.hadoopConfiguration - val count = try { - hadoopConf.set(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, "true") + withSQLConf(AvroFileFormat.IgnoreFilesWithoutExtensionProperty -> "true") { val newDf = spark.read.format("avro").load(tempSaveDir) - newDf.count() - } finally { - hadoopConf.unset(AvroFileFormat.IgnoreFilesWithoutExtensionProperty) + assert(newDf.count() == 8) } - assert(count == 8) } } @@ -880,20 +871,15 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { Paths.get(new URL(episodesAvro).toURI), Paths.get(dir.getCanonicalPath, "episodes")) - val hadoopConf = spark.sqlContext.sparkContext.hadoopConfiguration - val count = try { - hadoopConf.set(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, "true") + val hadoopConf = spark.sessionState.newHadoopConf() + withSQLConf(AvroFileFormat.IgnoreFilesWithoutExtensionProperty -> "true") { val newDf = spark .read .option("ignoreExtension", "true") .format("avro") .load(s"${dir.getCanonicalPath}/episodes") - newDf.count() - } finally { - hadoopConf.unset(AvroFileFormat.IgnoreFilesWithoutExtensionProperty) + assert(newDf.count() == 8) } - - assert(count == 8) } } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/image/HadoopUtils.scala b/mllib/src/main/scala/org/apache/spark/ml/image/HadoopUtils.scala index f1579ec5844a4..1fae1dc04ad7b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/image/HadoopUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/image/HadoopUtils.scala @@ -38,7 +38,9 @@ private object RecursiveFlag { */ def withRecursiveFlag[T](value: Boolean, spark: SparkSession)(f: => T): T = { val flagName = FileInputFormat.INPUT_DIR_RECURSIVE + // scalastyle:off hadoopconfiguration val hadoopConf = spark.sparkContext.hadoopConfiguration + // scalastyle:on hadoopconfiguration val old = Option(hadoopConf.get(flagName)) hadoopConf.set(flagName, value.toString) try f finally { @@ -98,7 +100,9 @@ private object SamplePathFilter { val sampleImages = sampleRatio < 1 if (sampleImages) { val flagName = FileInputFormat.PATHFILTER_CLASS + // scalastyle:off hadoopconfiguration val hadoopConf = spark.sparkContext.hadoopConfiguration + // scalastyle:on hadoopconfiguration val old = Option(hadoopConf.getClass(flagName, null)) hadoopConf.setDouble(SamplePathFilter.ratioParam, sampleRatio) hadoopConf.setLong(SamplePathFilter.seedParam, seed) diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala index db92132d18b7b..bbd5408c9fce3 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala @@ -285,7 +285,7 @@ class LDASuite extends MLTest with DefaultReadWriteTest { // There should be 1 checkpoint remaining. assert(model.getCheckpointFiles.length === 1) val checkpointFile = new Path(model.getCheckpointFiles.head) - val fs = checkpointFile.getFileSystem(spark.sparkContext.hadoopConfiguration) + val fs = checkpointFile.getFileSystem(spark.sessionState.newHadoopConf()) assert(fs.exists(checkpointFile)) model.deleteCheckpointFiles() assert(model.getCheckpointFiles.isEmpty) diff --git a/scalastyle-config.xml b/scalastyle-config.xml index e65e3aafe5b5b..da5c3f29c32dc 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -150,6 +150,19 @@ This file is divided into 3 sections: // scalastyle:on println]]> + + spark(.sqlContext)?.sparkContext.hadoopConfiguration + + + @VisibleForTesting val file = PartitionedFile(InternalRow.empty, path.getCanonicalPath, start, length) - val hadoopConf = conf.getOrElse(spark.sparkContext.hadoopConfiguration) + val hadoopConf = conf.getOrElse(spark.sessionState.newHadoopConf()) val reader = new HadoopFileLinesReader(file, delimOpt, hadoopConf) reader.map(_.toString) @@ -111,20 +111,20 @@ class HadoopFileLinesReaderSuite extends SharedSQLContext { } test("io.file.buffer.size is less than line length") { - val conf = spark.sparkContext.hadoopConfiguration - conf.set("io.file.buffer.size", "2") - withTempPath { path => - val lines = getLines(path, text = "abcdef\n123456", ranges = Seq((4, 4), (8, 5))) - assert(lines == Seq("123456")) + withSQLConf("io.file.buffer.size" -> "2") { + withTempPath { path => + val lines = getLines(path, text = "abcdef\n123456", ranges = Seq((4, 4), (8, 5))) + assert(lines == Seq("123456")) + } } } test("line cannot be longer than line.maxlength") { - val conf = spark.sparkContext.hadoopConfiguration - conf.set("mapreduce.input.linerecordreader.line.maxlength", "5") - withTempPath { path => - val lines = getLines(path, text = "abcdef\n1234", ranges = Seq((0, 15))) - assert(lines == Seq("1234")) + withSQLConf("mapreduce.input.linerecordreader.line.maxlength" -> "5") { + withTempPath { path => + val lines = getLines(path, text = "abcdef\n1234", ranges = Seq((0, 15))) + assert(lines == Seq("1234")) + } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 0b3de3d4cd599..728817729dcf7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -783,7 +783,7 @@ class HiveDDLSuite val part1 = Map("a" -> "1", "b" -> "5") val part2 = Map("a" -> "2", "b" -> "6") val root = new Path(catalog.getTableMetadata(tableIdent).location) - val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration) + val fs = root.getFileSystem(spark.sessionState.newHadoopConf()) // valid fs.mkdirs(new Path(new Path(root, "a=1"), "b=5")) fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "a.csv")) // file diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 2ea51791d0f79..741b0124c83b9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -1177,13 +1177,18 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd assert(spark.table("with_parts").filter($"p" === 2).collect().head == Row(1, 2)) } - val originalValue = spark.sparkContext.hadoopConfiguration.get(modeConfKey, "nonstrict") + // Turn off style check since the following test is to modify hadoop configuration on purpose. + // scalastyle:off hadoopconfiguration + val hadoopConf = spark.sparkContext.hadoopConfiguration + // scalastyle:on hadoopconfiguration + + val originalValue = hadoopConf.get(modeConfKey, "nonstrict") try { - spark.sparkContext.hadoopConfiguration.set(modeConfKey, "nonstrict") + hadoopConf.set(modeConfKey, "nonstrict") sql("INSERT OVERWRITE TABLE with_parts partition(p) select 3, 4") assert(spark.table("with_parts").filter($"p" === 4).collect().head == Row(3, 4)) } finally { - spark.sparkContext.hadoopConfiguration.set(modeConfKey, originalValue) + hadoopConf.set(modeConfKey, originalValue) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 828c18a770c80..1a916824c5d9e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -2053,7 +2053,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { val deleteOnExitField = classOf[FileSystem].getDeclaredField("deleteOnExit") deleteOnExitField.setAccessible(true) - val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration) + val fs = FileSystem.get(spark.sessionState.newHadoopConf()) val setOfPath = deleteOnExitField.get(fs).asInstanceOf[Set[Path]] val testData = sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString)).toDF()