{
@Override
public OutputStream apply(OutputStream stream) {
return stream;
@@ -128,7 +128,7 @@ public DiskBlockObjectWriter answer(InvocationOnMock invocationOnMock) throws Th
(File) args[1],
(SerializerInstance) args[2],
(Integer) args[3],
- new CompressStream(),
+ new WrapStream(),
false,
(ShuffleWriteMetrics) args[4],
(BlockId) args[0]
diff --git a/core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala b/core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala
new file mode 100644
index 000000000000..81eb907ac7ba
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.security
+
+import java.security.PrivilegedExceptionAction
+
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+
+import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config._
+import org.apache.spark.security.CryptoStreamUtils._
+
+class CryptoStreamUtilsSuite extends SparkFunSuite {
+ val ugi = UserGroupInformation.createUserForTesting("testuser", Array("testgroup"))
+
+ test("Crypto configuration conversion") {
+ val sparkKey1 = s"${SPARK_IO_ENCRYPTION_COMMONS_CONFIG_PREFIX}a.b.c"
+ val sparkVal1 = "val1"
+ val cryptoKey1 = s"${COMMONS_CRYPTO_CONF_PREFIX}a.b.c"
+
+ val sparkKey2 = SPARK_IO_ENCRYPTION_COMMONS_CONFIG_PREFIX.stripSuffix(".") + "A.b.c"
+ val sparkVal2 = "val2"
+ val cryptoKey2 = s"${COMMONS_CRYPTO_CONF_PREFIX}A.b.c"
+ val conf = new SparkConf()
+ conf.set(sparkKey1, sparkVal1)
+ conf.set(sparkKey2, sparkVal2)
+ val props = CryptoStreamUtils.toCryptoConf(conf)
+ assert(props.getProperty(cryptoKey1) === sparkVal1)
+ assert(!props.containsKey(cryptoKey2))
+ }
+
+ test("Shuffle encryption is disabled by default") {
+ ugi.doAs(new PrivilegedExceptionAction[Unit]() {
+ override def run(): Unit = {
+ val credentials = UserGroupInformation.getCurrentUser.getCredentials()
+ val conf = new SparkConf()
+ initCredentials(conf, credentials)
+ assert(credentials.getSecretKey(SPARK_IO_TOKEN) === null)
+ }
+ })
+ }
+
+ test("Shuffle encryption key length should be 128 by default") {
+ ugi.doAs(new PrivilegedExceptionAction[Unit]() {
+ override def run(): Unit = {
+ val credentials = UserGroupInformation.getCurrentUser.getCredentials()
+ val conf = new SparkConf()
+ conf.set(IO_ENCRYPTION_ENABLED, true)
+ initCredentials(conf, credentials)
+ var key = credentials.getSecretKey(SPARK_IO_TOKEN)
+ assert(key !== null)
+ val actual = key.length * (java.lang.Byte.SIZE)
+ assert(actual === 128)
+ }
+ })
+ }
+
+ test("Initial credentials with key length in 256") {
+ ugi.doAs(new PrivilegedExceptionAction[Unit]() {
+ override def run(): Unit = {
+ val credentials = UserGroupInformation.getCurrentUser.getCredentials()
+ val conf = new SparkConf()
+ conf.set(IO_ENCRYPTION_KEY_SIZE_BITS, 256)
+ conf.set(IO_ENCRYPTION_ENABLED, true)
+ initCredentials(conf, credentials)
+ var key = credentials.getSecretKey(SPARK_IO_TOKEN)
+ assert(key !== null)
+ val actual = key.length * (java.lang.Byte.SIZE)
+ assert(actual === 256)
+ }
+ })
+ }
+
+ test("Initial credentials with invalid key length") {
+ ugi.doAs(new PrivilegedExceptionAction[Unit]() {
+ override def run(): Unit = {
+ val credentials = UserGroupInformation.getCurrentUser.getCredentials()
+ val conf = new SparkConf()
+ conf.set(IO_ENCRYPTION_KEY_SIZE_BITS, 328)
+ conf.set(IO_ENCRYPTION_ENABLED, true)
+ val thrown = intercept[IllegalArgumentException] {
+ initCredentials(conf, credentials)
+ }
+ }
+ })
+ }
+
+ private[this] def initCredentials(conf: SparkConf, credentials: Credentials): Unit = {
+ if (conf.get(IO_ENCRYPTION_ENABLED)) {
+ SecurityManager.initIOEncryptionKey(conf, credentials)
+ }
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
index 5132384a5ed7..ed9428820ff6 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
@@ -94,7 +94,7 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
args(1).asInstanceOf[File],
args(2).asInstanceOf[SerializerInstance],
args(3).asInstanceOf[Int],
- compressStream = identity,
+ wrapStream = identity,
syncWrites = false,
args(4).asInstanceOf[ShuffleWriteMetrics],
blockId = args(0).asInstanceOf[BlockId]
diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2
index 326271a7e2b2..eaed0889ac36 100644
--- a/dev/deps/spark-deps-hadoop-2.2
+++ b/dev/deps/spark-deps-hadoop-2.2
@@ -27,6 +27,7 @@ commons-collections-3.2.2.jar
commons-compiler-2.7.6.jar
commons-compress-1.4.1.jar
commons-configuration-1.6.jar
+commons-crypto-1.0.0.jar
commons-dbcp-1.4.jar
commons-digester-1.8.jar
commons-httpclient-3.1.jar
diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3
index 1ff6ecb7342b..d68a7f462ba7 100644
--- a/dev/deps/spark-deps-hadoop-2.3
+++ b/dev/deps/spark-deps-hadoop-2.3
@@ -30,6 +30,7 @@ commons-collections-3.2.2.jar
commons-compiler-2.7.6.jar
commons-compress-1.4.1.jar
commons-configuration-1.6.jar
+commons-crypto-1.0.0.jar
commons-dbcp-1.4.jar
commons-digester-1.8.jar
commons-httpclient-3.1.jar
diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4
index 68333849cf4c..346f19767d36 100644
--- a/dev/deps/spark-deps-hadoop-2.4
+++ b/dev/deps/spark-deps-hadoop-2.4
@@ -30,6 +30,7 @@ commons-collections-3.2.2.jar
commons-compiler-2.7.6.jar
commons-compress-1.4.1.jar
commons-configuration-1.6.jar
+commons-crypto-1.0.0.jar
commons-dbcp-1.4.jar
commons-digester-1.8.jar
commons-httpclient-3.1.jar
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index 787d06c3512d..6f4695f345a4 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -34,6 +34,7 @@ commons-collections-3.2.2.jar
commons-compiler-2.7.6.jar
commons-compress-1.4.1.jar
commons-configuration-1.6.jar
+commons-crypto-1.0.0.jar
commons-dbcp-1.4.jar
commons-digester-1.8.jar
commons-httpclient-3.1.jar
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index 386495bf1bbb..7a86a8bd8884 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -34,6 +34,7 @@ commons-collections-3.2.2.jar
commons-compiler-2.7.6.jar
commons-compress-1.4.1.jar
commons-configuration-1.6.jar
+commons-crypto-1.0.0.jar
commons-dbcp-1.4.jar
commons-digester-1.8.jar
commons-httpclient-3.1.jar
diff --git a/docs/configuration.md b/docs/configuration.md
index 2f801961050e..d0c76aaad0b3 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -559,6 +559,29 @@ Apart from these, the following properties are also available, and may be useful
spark.io.compression.codec.
+
+ spark.io.encryption.enabled |
+ false |
+
+ Enable IO encryption. Only supported in YARN mode.
+ |
+
+
+ spark.io.encryption.keySizeBits |
+ 128 |
+
+ IO encryption key size in bits. Supported values are 128, 192 and 256.
+ |
+
+
+ spark.io.encryption.keygen.algorithm |
+ HmacSHA1 |
+
+ The algorithm to use when generating the IO encryption key. The supported algorithms are
+ described in the KeyGenerator section of the Java Cryptography Architecture Standard Algorithm
+ Name Documentation.
+ |
+
#### Spark UI
diff --git a/pom.xml b/pom.xml
index 989658216e5f..f90636a87491 100644
--- a/pom.xml
+++ b/pom.xml
@@ -182,6 +182,7 @@
2.52.0
2.8
1.8
+ 1.0.0
${java.home}
@@ -1839,6 +1840,17 @@
jline
${jline.version}
+
+ org.apache.commons
+ commons-crypto
+ ${commons-crypto.version}
+
+
+ net.java.dev.jna
+ jna
+
+
+
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 7fbbe91de94e..2398f0aea316 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -1003,6 +1003,10 @@ private[spark] class Client(
val securityManager = new SecurityManager(sparkConf)
amContainer.setApplicationACLs(
YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager).asJava)
+
+ if (sparkConf.get(IO_ENCRYPTION_ENABLED)) {
+ SecurityManager.initIOEncryptionKey(sparkConf, credentials)
+ }
setupSecurityToken(amContainer)
amContainer
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/IOEncryptionSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/IOEncryptionSuite.scala
new file mode 100644
index 000000000000..1c60315b21ae
--- /dev/null
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/IOEncryptionSuite.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.yarn
+
+import java.io._
+import java.nio.charset.StandardCharsets
+import java.security.PrivilegedExceptionAction
+import java.util.UUID
+
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Matchers}
+
+import org.apache.spark._
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.config._
+import org.apache.spark.serializer._
+import org.apache.spark.storage._
+
+class IOEncryptionSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
+ with BeforeAndAfterEach {
+ private[this] val blockId = new TempShuffleBlockId(UUID.randomUUID())
+ private[this] val conf = new SparkConf()
+ private[this] val ugi = UserGroupInformation.createUserForTesting("testuser", Array("testgroup"))
+ private[this] val serializer = new KryoSerializer(conf)
+
+ override def beforeAll(): Unit = {
+ System.setProperty("SPARK_YARN_MODE", "true")
+ ugi.doAs(new PrivilegedExceptionAction[Unit]() {
+ override def run(): Unit = {
+ conf.set(IO_ENCRYPTION_ENABLED, true)
+ val creds = new Credentials()
+ SecurityManager.initIOEncryptionKey(conf, creds)
+ SparkHadoopUtil.get.addCurrentUserCredentials(creds)
+ }
+ })
+ }
+
+ override def afterAll(): Unit = {
+ SparkEnv.set(null)
+ System.clearProperty("SPARK_YARN_MODE")
+ }
+
+ override def beforeEach(): Unit = {
+ super.beforeEach()
+ }
+
+ override def afterEach(): Unit = {
+ super.afterEach()
+ conf.set("spark.shuffle.compress", false.toString)
+ conf.set("spark.shuffle.spill.compress", false.toString)
+ }
+
+ test("IO encryption read and write") {
+ ugi.doAs(new PrivilegedExceptionAction[Unit] {
+ override def run(): Unit = {
+ conf.set(IO_ENCRYPTION_ENABLED, true)
+ conf.set("spark.shuffle.compress", false.toString)
+ conf.set("spark.shuffle.spill.compress", false.toString)
+ testYarnIOEncryptionWriteRead()
+ }
+ })
+ }
+
+ test("IO encryption read and write with shuffle compression enabled") {
+ ugi.doAs(new PrivilegedExceptionAction[Unit] {
+ override def run(): Unit = {
+ conf.set(IO_ENCRYPTION_ENABLED, true)
+ conf.set("spark.shuffle.compress", true.toString)
+ conf.set("spark.shuffle.spill.compress", true.toString)
+ testYarnIOEncryptionWriteRead()
+ }
+ })
+ }
+
+ private[this] def testYarnIOEncryptionWriteRead(): Unit = {
+ val plainStr = "hello world"
+ val outputStream = new ByteArrayOutputStream()
+ val serializerManager = new SerializerManager(serializer, conf)
+ val wrappedOutputStream = serializerManager.wrapStream(blockId, outputStream)
+ wrappedOutputStream.write(plainStr.getBytes(StandardCharsets.UTF_8))
+ wrappedOutputStream.close()
+
+ val encryptedBytes = outputStream.toByteArray
+ val encryptedStr = new String(encryptedBytes)
+ assert(plainStr !== encryptedStr)
+
+ val inputStream = new ByteArrayInputStream(encryptedBytes)
+ val wrappedInputStream = serializerManager.wrapStream(blockId, inputStream)
+ val decryptedBytes = new Array[Byte](1024)
+ val len = wrappedInputStream.read(decryptedBytes)
+ val decryptedStr = new String(decryptedBytes, 0, len, StandardCharsets.UTF_8)
+ assert(decryptedStr === plainStr)
+ }
+}