Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ public class AuthClientBootstrap implements TransportClientBootstrap {

private final TransportConf conf;
private final String appId;
private final String authUser;
private final SecretKeyHolder secretKeyHolder;

public AuthClientBootstrap(
Expand All @@ -65,7 +64,6 @@ public AuthClientBootstrap(
// required by the protocol. At some point, though, it would be better for the actual app ID
// to be provided here.
this.appId = appId;
this.authUser = secretKeyHolder.getSaslUser(appId);
this.secretKeyHolder = secretKeyHolder;
}

Expand Down Expand Up @@ -97,8 +95,8 @@ public void doBootstrap(TransportClient client, Channel channel) {
private void doSparkAuth(TransportClient client, Channel channel)
throws GeneralSecurityException, IOException {

String secretKey = secretKeyHolder.getSecretKey(authUser);
try (AuthEngine engine = new AuthEngine(authUser, secretKey, conf)) {
String secretKey = secretKeyHolder.getSecretKey(appId);
try (AuthEngine engine = new AuthEngine(appId, secretKey, conf)) {
ClientChallenge challenge = engine.challenge();
ByteBuf challengeData = Unpooled.buffer(challenge.encodedLength());
challenge.encode(challengeData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.nio.ByteBuffer;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
Expand Down Expand Up @@ -113,7 +114,11 @@ public void receive(TransportClient client, ByteBuffer message, RpcResponseCallb
// Here we have the client challenge, so perform the new auth protocol and set up the channel.
AuthEngine engine = null;
try {
engine = new AuthEngine(challenge.appId, secretKeyHolder.getSecretKey(challenge.appId), conf);
String secret = secretKeyHolder.getSecretKey(challenge.appId);
Preconditions.checkState(secret != null,
"Trying to authenticate non-registered app %s.", challenge.appId);
LOG.debug("Authenticating challenge for app {}.", challenge.appId);
engine = new AuthEngine(challenge.appId, secret, conf);
ServerResponse response = engine.respond(challenge);
ByteBuf responseData = Unpooled.buffer(response.encodedLength());
response.encode(responseData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public ShuffleSecretManager() {
* fetching shuffle files written by other executors in this application.
*/
public void registerApp(String appId, String shuffleSecret) {
if (!shuffleSecretMap.contains(appId)) {
if (!shuffleSecretMap.containsKey(appId)) {
shuffleSecretMap.put(appId, shuffleSecret);
logger.info("Registered shuffle secret for application {}", appId);
} else {
Expand All @@ -67,7 +67,7 @@ public void registerApp(String appId, ByteBuffer shuffleSecret) {
* This is called when the application terminates.
*/
public void unregisterApp(String appId) {
if (shuffleSecretMap.contains(appId)) {
if (shuffleSecretMap.containsKey(appId)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hah, this is actually fixing a memory leak...

shuffleSecretMap.remove(appId);
logger.info("Unregistered shuffle secret for application {}", appId);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,6 @@ public void initializeApplication(ApplicationInitializationContext context) {
String appId = context.getApplicationId().toString();
try {
ByteBuffer shuffleSecret = context.getApplicationDataForService();
logger.info("Initializing application {}", appId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is to verbose personally. We do log when initializeContainer so I guess its ok to remove this. But if you remove this I think we should remove the one in stopApplication as well since they go hand in hand.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rebuilt my cluster this morning so I don't have the logs anymore... but what I noticed is that this callback gets called multiple times (once per container belonging to the app), while I only remember the stop callback being called when the application actually stops. So it didn't seem like the behavior was as symmetric as one would expect.

But the secret manager also prints an info message on unregistration, so I guess the important things are still logged even if these two messages are removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, here's an edited portion of the log (without this patch, so notice the duplicate registration messages too):

2017-07-24 14:38:11,839 INFO org.apache.spark.network.yarn.YarnShuffleService: Initializing container container_1500931912895_0001_01_000002
2017-07-24 14:38:11,839 INFO org.apache.spark.network.yarn.YarnShuffleService: Initializing application application_1500931912895_0001
2017-07-24 14:38:11,864 INFO org.apache.spark.network.sasl.ShuffleSecretManager: Registered shuffle secret for application application_1500931912895_0001
2017-07-24 14:38:13,506 INFO org.apache.spark.network.yarn.YarnShuffleService: Initializing container container_1500931912895_0001_01_000008
2017-07-24 14:38:13,506 INFO org.apache.spark.network.yarn.YarnShuffleService: Initializing application application_1500931912895_0001
2017-07-24 14:38:13,507 INFO org.apache.spark.network.sasl.ShuffleSecretManager: Registered shuffle secret for application application_1500931912895_0001
2017-07-24 14:38:13,542 INFO org.apache.spark.network.yarn.YarnShuffleService: Initializing container container_1500931912895_0001_01_000007
2017-07-24 14:38:13,542 INFO org.apache.spark.network.yarn.YarnShuffleService: Initializing application application_1500931912895_0001
2017-07-24 14:38:13,542 INFO org.apache.spark.network.sasl.ShuffleSecretManager: Registered shuffle secret for application application_1500931912895_0001
2017-07-24 14:38:14,393 INFO org.apache.spark.network.yarn.YarnShuffleService: Initializing container container_1500931912895_0001_01_000010
2017-07-24 14:38:14,394 INFO org.apache.spark.network.yarn.YarnShuffleService: Initializing application application_1500931912895_0001
2017-07-24 14:38:14,394 INFO org.apache.spark.network.sasl.ShuffleSecretManager: Registered shuffle secret for application application_1500931912895_0001
2017-07-24 14:38:14,394 INFO org.apache.spark.network.yarn.YarnShuffleService: Initializing container container_1500931912895_0001_01_000011
2017-07-24 14:38:14,394 INFO org.apache.spark.network.yarn.YarnShuffleService: Initializing application application_1500931912895_0001
2017-07-24 14:38:14,395 INFO org.apache.spark.network.sasl.ShuffleSecretManager: Registered shuffle secret for application application_1500931912895_0001



2017-07-24 14:39:13,454 INFO org.apache.spark.network.yarn.YarnShuffleService: Stopping container container_1500931912895_0001_01_000007
2017-07-24 14:39:13,512 INFO org.apache.spark.network.yarn.YarnShuffleService: Stopping container container_1500931912895_0001_01_000011
2017-07-24 14:39:13,614 INFO org.apache.spark.network.yarn.YarnShuffleService: Stopping container container_1500931912895_0001_01_000008
2017-07-24 14:39:13,614 INFO org.apache.spark.network.yarn.YarnShuffleService: Stopping container container_1500931912895_0001_01_000010
2017-07-24 14:39:13,616 INFO org.apache.spark.network.yarn.YarnShuffleService: Stopping container container_1500931912895_0001_01_000002
2017-07-24 14:39:13,627 INFO org.apache.spark.network.yarn.YarnShuffleService: Stopping application application_1500931912895_0001

if (isAuthenticationEnabled()) {
AppId fullId = new AppId(appId);
if (db != null) {
Expand All @@ -262,7 +261,6 @@ public void initializeApplication(ApplicationInitializationContext context) {
public void stopApplication(ApplicationTerminationContext context) {
String appId = context.getApplicationId().toString();
try {
logger.info("Stopping application {}", appId);
if (isAuthenticationEnabled()) {
AppId fullId = new AppId(appId);
if (db != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.scalatest.Matchers

import org.apache.spark._
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.network.shuffle.ShuffleTestAccessor
import org.apache.spark.network.yarn.{YarnShuffleService, YarnTestAccessor}
import org.apache.spark.tags.ExtendedYarnTest
Expand All @@ -46,28 +48,58 @@ class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite {
yarnConfig
}

protected def extraSparkConf(): Map[String, String] = {
val shuffleServicePort = YarnTestAccessor.getShuffleServicePort
val shuffleService = YarnTestAccessor.getShuffleServiceInstance
logInfo("Shuffle service port = " + shuffleServicePort)

Map(
"spark.shuffle.service.enabled" -> "true",
"spark.shuffle.service.port" -> shuffleServicePort.toString,
MAX_EXECUTOR_FAILURES.key -> "1"
)
}

test("external shuffle service") {
val shuffleServicePort = YarnTestAccessor.getShuffleServicePort
val shuffleService = YarnTestAccessor.getShuffleServiceInstance

val registeredExecFile = YarnTestAccessor.getRegisteredExecutorFile(shuffleService)

logInfo("Shuffle service port = " + shuffleServicePort)
val result = File.createTempFile("result", null, tempDir)
val finalState = runSpark(
false,
mainClassName(YarnExternalShuffleDriver.getClass),
appArgs = Seq(result.getAbsolutePath(), registeredExecFile.getAbsolutePath),
extraConf = Map(
"spark.shuffle.service.enabled" -> "true",
"spark.shuffle.service.port" -> shuffleServicePort.toString
)
extraConf = extraSparkConf()
)
checkResult(finalState, result)
assert(YarnTestAccessor.getRegisteredExecutorFile(shuffleService).exists())
}
}

/**
* Integration test for the external shuffle service with auth on.
*/
@ExtendedYarnTest
class YarnShuffleAuthSuite extends YarnShuffleIntegrationSuite {

override def newYarnConfig(): YarnConfiguration = {
val yarnConfig = super.newYarnConfig()
yarnConfig.set(NETWORK_AUTH_ENABLED.key, "true")
yarnConfig.set(NETWORK_ENCRYPTION_ENABLED.key, "true")
yarnConfig
}

override protected def extraSparkConf(): Map[String, String] = {
super.extraSparkConf() ++ Map(
NETWORK_AUTH_ENABLED.key -> "true",
NETWORK_ENCRYPTION_ENABLED.key -> "true"
)
}

}

private object YarnExternalShuffleDriver extends Logging with Matchers {

val WAIT_TIMEOUT_MILLIS = 10000
Expand Down