Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ private[history] abstract class ApplicationHistoryProvider {
* @return Count of application event logs that are currently under process
*/
def getEventLogsUnderProcess(): Int = {
return 0;
0
}

/**
Expand All @@ -95,7 +95,7 @@ private[history] abstract class ApplicationHistoryProvider {
* @return 0 if this is undefined or unsupported, otherwise the last updated time in millis
*/
def getLastUpdatedTime(): Long = {
return 0;
0
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ object HistoryServer extends Logging {
Utils.initDaemon(log)
new HistoryServerArguments(conf, argStrings)
initSecurity()
val securityManager = new SecurityManager(conf)
val securityManager = createSecurityManager(conf)

val providerName = conf.getOption("spark.history.provider")
.getOrElse(classOf[FsHistoryProvider].getName())
Expand All @@ -289,6 +289,24 @@ object HistoryServer extends Logging {
while(true) { Thread.sleep(Int.MaxValue) }
}

/**
* Create a security manager.
* This turns off security in the SecurityManager, so that the History Server can start
* in a Spark cluster where security is enabled.
* @param config configuration for the SecurityManager constructor
* @return the security manager for use in constructing the History Server.
*/
private[history] def createSecurityManager(config: SparkConf): SecurityManager = {
if (config.getBoolean("spark.acls.enable", config.getBoolean("spark.ui.acls.enable", false))) {
logInfo("Either spark.acls.enable or spark.ui.acls.enable is configured, clearing it and " +
"only using spark.history.ui.acl.enable")
config.set("spark.acls.enable", "false")
config.set("spark.ui.acls.enable", "false")
}

new SecurityManager(config)
}

def initSecurity() {
// If we are accessing HDFS and it has security enabled (Kerberos), we have to login
// from a keytab file so that we can access HDFS beyond the kerberos ticket expiration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,14 +184,27 @@ private[v1] class ApiRootResource extends ApiRequestContext {
@Path("applications/{appId}/logs")
def getEventLogs(
@PathParam("appId") appId: String): EventLogDownloadResource = {
new EventLogDownloadResource(uiRoot, appId, None)
try {
// withSparkUI will throw NotFoundException if attemptId exists for this application.
// So we need to try again with attempt id "1".
withSparkUI(appId, None) { _ =>
new EventLogDownloadResource(uiRoot, appId, None)
}
} catch {
case _: NotFoundException =>
withSparkUI(appId, Some("1")) { _ =>
new EventLogDownloadResource(uiRoot, appId, None)
}
}
}

@Path("applications/{appId}/{attemptId}/logs")
def getEventLogs(
@PathParam("appId") appId: String,
@PathParam("attemptId") attemptId: String): EventLogDownloadResource = {
new EventLogDownloadResource(uiRoot, appId, Some(attemptId))
withSparkUI(appId, Some(attemptId)) { _ =>
new EventLogDownloadResource(uiRoot, appId, Some(attemptId))
}
}

@Path("version")
Expand Down Expand Up @@ -276,7 +289,6 @@ private[v1] trait ApiRequestContext {
case None => throw new NotFoundException("no such app: " + appId)
}
}

}

private[v1] class ForbiddenException(msg: String) extends WebApplicationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -545,12 +545,11 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
assert(jobcount === getNumJobs("/jobs"))

// no need to retain the test dir now the tests complete
logDir.deleteOnExit();

logDir.deleteOnExit()
}

test("ui and api authorization checks") {
val appId = "local-1422981759269"
val appId = "local-1430917381535"
val owner = "irashid"
val admin = "root"
val other = "alice"
Expand All @@ -570,8 +569,11 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers

val port = server.boundPort
val testUrls = Seq(
s"http://localhost:$port/api/v1/applications/$appId/jobs",
s"http://localhost:$port/history/$appId/jobs/")
s"http://localhost:$port/api/v1/applications/$appId/1/jobs",
s"http://localhost:$port/history/$appId/1/jobs/",
s"http://localhost:$port/api/v1/applications/$appId/logs",
s"http://localhost:$port/api/v1/applications/$appId/1/logs",
s"http://localhost:$port/api/v1/applications/$appId/2/logs")

tests.foreach { case (user, expectedCode) =>
testUrls.foreach { url =>
Expand Down