Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions core/src/main/scala/org/apache/spark/util/VersionUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ private[spark] object VersionUtils {

private val majorMinorRegex = """^(\d+)\.(\d+)(\..*)?$""".r
private val shortVersionRegex = """^(\d+\.\d+\.\d+)(.*)?$""".r
private val majorMinorPatchRegex = """^(\d+)(?:\.(\d+)(?:\.(\d+)(?:[.-].*)?)?)?$""".r

/**
* Given a Spark version string, return the major version number.
Expand Down Expand Up @@ -63,4 +64,36 @@ private[spark] object VersionUtils {
s" version string, but it could not find the major and minor version numbers.")
}
}

/**
* Extracts the major, minor and patch parts from the input `version`. Note that if minor or patch
* version is missing from the input, this will return 0 for these parts. Returns `None` if the
* input is not of a valid format.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should mention that minor/patch versions are filled in as 0 if they're not found. This is different from the behavior of other methods in this class (e.g. majorMinor will give an error if minor is not present)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good point.

*
* Examples of valid version:
* - 1 (extracts to (1, 0, 0))
* - 2.4 (extracts to (2, 4, 0))
* - 3.2.2 (extracts to (3, 2, 2))
* - 3.2.2.4 (extracts to 3, 2, 2))
* - 3.3.1-SNAPSHOT (extracts to (3, 3, 1))
* - 3.2.2.4SNAPSHOT (extracts to (3, 2, 2), only the first 3 components)
*
* Examples of invalid version:
* - ABC
* - 1X
* - 2.4XYZ
* - 2.4-SNAPSHOT
* - 3.4.5ABC
*
* @return A non-empty option containing a 3-value tuple (major, minor, patch) iff the
* input is a valid version. `None` otherwise.
*/
def majorMinorPatchVersion(version: String): Option[(Int, Int, Int)] = {
majorMinorPatchRegex.findFirstMatchIn(version).map { m =>
val major = m.group(1).toInt
val minor = Option(m.group(2)).map(_.toInt).getOrElse(0)
val patch = Option(m.group(3)).map(_.toInt).getOrElse(0)
(major, minor, patch)
}
}
}
14 changes: 14 additions & 0 deletions core/src/test/scala/org/apache/spark/util/VersionUtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,18 @@ class VersionUtilsSuite extends SparkFunSuite {
}
}
}

test("SPARK-33212: retrieve major/minor/patch version parts") {
assert(VersionUtils.majorMinorPatchVersion("3.2.2").contains((3, 2, 2)))
assert(VersionUtils.majorMinorPatchVersion("3.2.2.4").contains((3, 2, 2)))
assert(VersionUtils.majorMinorPatchVersion("3.2.2-SNAPSHOT").contains((3, 2, 2)))
assert(VersionUtils.majorMinorPatchVersion("3.2.2.4XXX").contains((3, 2, 2)))
assert(VersionUtils.majorMinorPatchVersion("3.2").contains((3, 2, 0)))
assert(VersionUtils.majorMinorPatchVersion("3").contains((3, 0, 0)))

// illegal cases
Seq("ABC", "3X", "3.2-SNAPSHOT", "3.2ABC", "3-ABC", "3.2.4XYZ").foreach { version =>
assert(VersionUtils.majorMinorPatchVersion(version).isEmpty, s"version $version")
}
}
}
11 changes: 11 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2448,6 +2448,17 @@
</rules>
</configuration>
</execution>
<execution>
<id>enforce-no-duplicate-dependencies</id>
<goals>
<goal>enforce</goal>
</goals>
<configuration>
<rules>
<banDuplicatePomDependencyVersions/>
</rules>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.internal.NonClosableMutableURLClassLoader
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.{MutableURLClassLoader, Utils}
import org.apache.spark.util.{MutableURLClassLoader, Utils, VersionUtils}

/** Factory for `IsolatedClientLoader` with specific versions of hive. */
private[hive] object IsolatedClientLoader extends Logging {
Expand Down Expand Up @@ -107,12 +107,19 @@ private[hive] object IsolatedClientLoader extends Logging {
s"Please set ${HiveUtils.HIVE_METASTORE_VERSION.key} with a valid version.")
}

def supportsHadoopShadedClient(hadoopVersion: String): Boolean = {
VersionUtils.majorMinorPatchVersion(hadoopVersion).exists {
case (3, 2, v) if v >= 2 => true
case _ => false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe

case (maj, _, _) if maj > 3 => true
case (3, min, _) if min > 2 => true
case (3, 2, patch) if patch >=2 => true

Seems like we can reasonably assume that future versions of Hadoop will support the shaded client?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'd better to wait until the future versions come out before changing this (so that we can verify firs). For instance, Hadoop 3.3.0 currently doesn't support shaded client (due to the hadoop-aws issue). But yeah the Hadoop 3.2.2+ should support the shaded client assuming there's no regression.

Copy link
Contributor

@xkrogen xkrogen Jan 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting... I just worry about forgetting to update this if/when we bump the Hadoop version in the future and causing a regression. Has the hadoop-aws fix made it to be targeted for Hadoop 3.3.1? If so, can we reasonably assume that 3.2.2+, 3.3.1+, and 3.4.0+ will have it?

It seems you're more tied into what's happening in the Hadoop world than I am these days so I'll take your word in either direction. If we decide not to future-proof it, can we create a follow-up JIRA to revisit it once some future release is out at which time we would be confident in putting a wildcard?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think your concern is valid. One thing we can do is perhaps adding a test to make sure that the built-in Hadoop version is always compatible with the shaded client. So that in future if we upgrade Hadoop version & forget to do this, the test will break.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And yes we can assume that 3.2.2+, 3.3.1+ and 3.4.0+ will all have the fix.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent idea on adding a compatibility test for the built-in Hadoop version!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. I agree that we need to change this again.

And yes we can assume that 3.2.2+, 3.3.1+ and 3.4.0+ will all have the fix.

}
}

private def downloadVersion(
version: HiveVersion,
hadoopVersion: String,
ivyPath: Option[String],
remoteRepos: String): Seq[URL] = {
val hadoopJarNames = if (hadoopVersion.startsWith("3")) {
val hadoopJarNames = if (supportsHadoopShadedClient(hadoopVersion)) {
Seq(s"org.apache.hadoop:hadoop-client-api:$hadoopVersion",
s"org.apache.hadoop:hadoop-client-runtime:$hadoopVersion")
} else {
Expand All @@ -123,22 +130,14 @@ private[hive] object IsolatedClientLoader extends Logging {
.map(a => s"org.apache.hive:$a:${version.fullVersion}") ++
Seq("com.google.guava:guava:14.0.1") ++ hadoopJarNames

val extraExclusions = if (hadoopVersion.startsWith("3")) {
// this introduced from lower version of Hive could conflict with jars in Hadoop 3.2+, so
// exclude here in favor of the ones in Hadoop 3.2+
Seq("org.apache.hadoop:hadoop-auth")
} else {
Comment on lines -126 to -130
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to exclude hadoop-auth anymore?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea it's no longer needed. Please see here and here for the context.

Seq.empty
}

val classpaths = quietly {
SparkSubmitUtils.resolveMavenCoordinates(
hiveArtifacts.mkString(","),
SparkSubmitUtils.buildIvySettings(
Some(remoteRepos),
ivyPath),
transitive = true,
exclusions = version.exclusions ++ extraExclusions)
exclusions = version.exclusions)
}
val allFiles = classpaths.map(new File(_)).toSet

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.File
import java.net.URLClassLoader

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.util.VersionInfo

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
Expand Down Expand Up @@ -68,4 +69,19 @@ class HadoopVersionInfoSuite extends SparkFunSuite {
Utils.deleteRecursively(ivyPath)
}
}

test("SPARK-32212: test supportHadoopShadedClient()") {
Seq("3.2.2", "3.2.3", "3.2.2.1", "3.2.2-XYZ", "3.2.2.4-SNAPSHOT").foreach { version =>
assert(IsolatedClientLoader.supportsHadoopShadedClient(version), s"version $version")
}

// negative cases
Seq("3.1.3", "3.2", "3.2.1", "4").foreach { version =>
assert(!IsolatedClientLoader.supportsHadoopShadedClient(version), s"version $version")
}
}

test("SPARK-32212: built-in Hadoop version should support shaded client") {
assert(IsolatedClientLoader.supportsHadoopShadedClient(VersionInfo.getVersion))
}
}