Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -630,12 +630,8 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
intercept[FileNotFoundException] {
withTempPath { dir =>
FileUtils.touch(new File(dir, "test"))
val hadoopConf = spark.sqlContext.sparkContext.hadoopConfiguration
try {
hadoopConf.set(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, "true")
withSQLConf(AvroFileFormat.IgnoreFilesWithoutExtensionProperty -> "true") {
spark.read.format("avro").load(dir.toString)
} finally {
hadoopConf.unset(AvroFileFormat.IgnoreFilesWithoutExtensionProperty)
}
}
}
Expand Down Expand Up @@ -709,15 +705,10 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {

Files.createFile(new File(tempSaveDir, "non-avro").toPath)

val hadoopConf = spark.sqlContext.sparkContext.hadoopConfiguration
val count = try {
hadoopConf.set(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, "true")
withSQLConf(AvroFileFormat.IgnoreFilesWithoutExtensionProperty -> "true") {
val newDf = spark.read.format("avro").load(tempSaveDir)
newDf.count()
} finally {
hadoopConf.unset(AvroFileFormat.IgnoreFilesWithoutExtensionProperty)
assert(newDf.count() == 8)
}
assert(count == 8)
}
}

Expand Down Expand Up @@ -880,20 +871,15 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
Paths.get(new URL(episodesAvro).toURI),
Paths.get(dir.getCanonicalPath, "episodes"))

val hadoopConf = spark.sqlContext.sparkContext.hadoopConfiguration
val count = try {
hadoopConf.set(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, "true")
val hadoopConf = spark.sessionState.newHadoopConf()
withSQLConf(AvroFileFormat.IgnoreFilesWithoutExtensionProperty -> "true") {
val newDf = spark
.read
.option("ignoreExtension", "true")
.format("avro")
.load(s"${dir.getCanonicalPath}/episodes")
newDf.count()
} finally {
hadoopConf.unset(AvroFileFormat.IgnoreFilesWithoutExtensionProperty)
assert(newDf.count() == 8)
}

assert(count == 8)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ private object RecursiveFlag {
*/
def withRecursiveFlag[T](value: Boolean, spark: SparkSession)(f: => T): T = {
val flagName = FileInputFormat.INPUT_DIR_RECURSIVE
// scalastyle:off hadoopconfiguration
val hadoopConf = spark.sparkContext.hadoopConfiguration
// scalastyle:on hadoopconfiguration
val old = Option(hadoopConf.get(flagName))
hadoopConf.set(flagName, value.toString)
try f finally {
Expand Down Expand Up @@ -98,7 +100,9 @@ private object SamplePathFilter {
val sampleImages = sampleRatio < 1
if (sampleImages) {
val flagName = FileInputFormat.PATHFILTER_CLASS
// scalastyle:off hadoopconfiguration
val hadoopConf = spark.sparkContext.hadoopConfiguration
// scalastyle:on hadoopconfiguration
val old = Option(hadoopConf.getClass(flagName, null))
hadoopConf.setDouble(SamplePathFilter.ratioParam, sampleRatio)
hadoopConf.setLong(SamplePathFilter.seedParam, seed)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ class LDASuite extends MLTest with DefaultReadWriteTest {
// There should be 1 checkpoint remaining.
assert(model.getCheckpointFiles.length === 1)
val checkpointFile = new Path(model.getCheckpointFiles.head)
val fs = checkpointFile.getFileSystem(spark.sparkContext.hadoopConfiguration)
val fs = checkpointFile.getFileSystem(spark.sessionState.newHadoopConf())
assert(fs.exists(checkpointFile))
model.deleteCheckpointFiles()
assert(model.getCheckpointFiles.isEmpty)
Expand Down
13 changes: 13 additions & 0 deletions scalastyle-config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,19 @@ This file is divided into 3 sections:
// scalastyle:on println]]></customMessage>
</check>

<check customId="hadoopconfiguration" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
<parameters><parameter name="regex">spark(.sqlContext)?.sparkContext.hadoopConfiguration</parameter></parameters>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just match on hadoopConfiguration ?

Copy link
Member Author

@gengliangwang gengliangwang Jul 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that would be tricky:

  1. In core module the function spark.sessionState.newHadoopConf() can't be accessed.
  2. Run find . -name *.scala|xargs grep hadoopConfiguration |wc -l and there are 100 appearances.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even with such loose policy, we have to wrap code block with "// scalastyle:off hadoopconfiguration" in some test cases for setting the hadoop conf.

<customMessage><![CDATA[
Are you sure that you want to use sparkContext.hadoopConfiguration? In most cases, you should use
spark.sessionState.newHadoopConf() instead, so that the hadoop configurations specified in Spark session
configuration will come into effect.
If you must use sparkContext.hadoopConfiguration, wrap the code block with
// scalastyle:off hadoopconfiguration
spark.sparkContext.hadoopConfiguration...
// scalastyle:on hadoopconfiguration
]]></customMessage>
</check>

<check customId="visiblefortesting" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
<parameters><parameter name="regex">@VisibleForTesting</parameter></parameters>
<customMessage><![CDATA[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class HadoopFileLinesReaderSuite extends SharedSQLContext {

val lines = ranges.map { case (start, length) =>
val file = PartitionedFile(InternalRow.empty, path.getCanonicalPath, start, length)
val hadoopConf = conf.getOrElse(spark.sparkContext.hadoopConfiguration)
val hadoopConf = conf.getOrElse(spark.sessionState.newHadoopConf())
val reader = new HadoopFileLinesReader(file, delimOpt, hadoopConf)

reader.map(_.toString)
Expand Down Expand Up @@ -111,20 +111,20 @@ class HadoopFileLinesReaderSuite extends SharedSQLContext {
}

test("io.file.buffer.size is less than line length") {
val conf = spark.sparkContext.hadoopConfiguration
conf.set("io.file.buffer.size", "2")
withTempPath { path =>
val lines = getLines(path, text = "abcdef\n123456", ranges = Seq((4, 4), (8, 5)))
assert(lines == Seq("123456"))
withSQLConf("io.file.buffer.size" -> "2") {
withTempPath { path =>
val lines = getLines(path, text = "abcdef\n123456", ranges = Seq((4, 4), (8, 5)))
assert(lines == Seq("123456"))
}
}
}

test("line cannot be longer than line.maxlength") {
val conf = spark.sparkContext.hadoopConfiguration
conf.set("mapreduce.input.linerecordreader.line.maxlength", "5")
withTempPath { path =>
val lines = getLines(path, text = "abcdef\n1234", ranges = Seq((0, 15)))
assert(lines == Seq("1234"))
withSQLConf("mapreduce.input.linerecordreader.line.maxlength" -> "5") {
withTempPath { path =>
val lines = getLines(path, text = "abcdef\n1234", ranges = Seq((0, 15)))
assert(lines == Seq("1234"))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,7 @@ class HiveDDLSuite
val part1 = Map("a" -> "1", "b" -> "5")
val part2 = Map("a" -> "2", "b" -> "6")
val root = new Path(catalog.getTableMetadata(tableIdent).location)
val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
val fs = root.getFileSystem(spark.sessionState.newHadoopConf())
// valid
fs.mkdirs(new Path(new Path(root, "a=1"), "b=5"))
fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "a.csv")) // file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1177,13 +1177,18 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd
assert(spark.table("with_parts").filter($"p" === 2).collect().head == Row(1, 2))
}

val originalValue = spark.sparkContext.hadoopConfiguration.get(modeConfKey, "nonstrict")
// Turn off style check since the following test is to modify hadoop configuration on purpose.
// scalastyle:off hadoopconfiguration
val hadoopConf = spark.sparkContext.hadoopConfiguration
// scalastyle:on hadoopconfiguration

val originalValue = hadoopConf.get(modeConfKey, "nonstrict")
try {
spark.sparkContext.hadoopConfiguration.set(modeConfKey, "nonstrict")
hadoopConf.set(modeConfKey, "nonstrict")
sql("INSERT OVERWRITE TABLE with_parts partition(p) select 3, 4")
assert(spark.table("with_parts").filter($"p" === 4).collect().head == Row(3, 4))
} finally {
spark.sparkContext.hadoopConfiguration.set(modeConfKey, originalValue)
hadoopConf.set(modeConfKey, originalValue)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2053,7 +2053,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
val deleteOnExitField = classOf[FileSystem].getDeclaredField("deleteOnExit")
deleteOnExitField.setAccessible(true)

val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val fs = FileSystem.get(spark.sessionState.newHadoopConf())
val setOfPath = deleteOnExitField.get(fs).asInstanceOf[Set[Path]]

val testData = sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString)).toDF()
Expand Down