diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllExecutorListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllExecutorListResource.scala deleted file mode 100644 index f4bc752d31f7e..0000000000000 --- a/core/src/main/scala/org/apache/spark/status/api/v1/AllExecutorListResource.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.spark.status.api.v1 - -import javax.ws.rs.{GET, Produces} -import javax.ws.rs.core.MediaType - -import org.apache.spark.ui.SparkUI - -@Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class AllExecutorListResource(ui: SparkUI) { - - @GET - def executorList(): Seq[ExecutorSummary] = ui.store.executorList(false) - -} diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala deleted file mode 100644 index b4fa3e633f6c1..0000000000000 --- a/core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.status.api.v1 - -import java.util.{Arrays, Date, List => JList} -import javax.ws.rs._ -import javax.ws.rs.core.MediaType - -import org.apache.spark.JobExecutionStatus -import org.apache.spark.ui.SparkUI -import org.apache.spark.ui.jobs.UIData.JobUIData - -@Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class AllJobsResource(ui: SparkUI) { - - @GET - def jobsList(@QueryParam("status") statuses: JList[JobExecutionStatus]): Seq[JobData] = { - ui.store.jobsList(statuses) - } - -} diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala deleted file mode 100644 index 2189e1da91841..0000000000000 --- a/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.status.api.v1 - -import javax.ws.rs.{GET, Produces} -import javax.ws.rs.core.MediaType - -import org.apache.spark.storage.{RDDInfo, StorageStatus, StorageUtils} -import org.apache.spark.ui.SparkUI - -@Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class AllRDDResource(ui: SparkUI) { - - @GET - def rddList(): Seq[RDDStorageInfo] = ui.store.rddList() - -} diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala deleted file mode 100644 index 036a7ba12e4dc..0000000000000 --- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.status.api.v1 - -import java.util.{Arrays, Date, List => JList} -import javax.ws.rs.{GET, Produces, QueryParam} -import javax.ws.rs.core.MediaType - -import org.apache.spark.scheduler.{AccumulableInfo => InternalAccumulableInfo, StageInfo} -import org.apache.spark.ui.SparkUI -import org.apache.spark.ui.jobs.UIData.{StageUIData, TaskUIData} -import org.apache.spark.ui.jobs.UIData.{InputMetricsUIData => InternalInputMetrics, OutputMetricsUIData => InternalOutputMetrics, ShuffleReadMetricsUIData => InternalShuffleReadMetrics, ShuffleWriteMetricsUIData => InternalShuffleWriteMetrics, TaskMetricsUIData => InternalTaskMetrics} -import org.apache.spark.util.Distribution - -@Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class AllStagesResource(ui: SparkUI) { - - @GET - def stageList(@QueryParam("status") statuses: JList[StageStatus]): Seq[StageData] = { - ui.store.stageList(statuses) - } - -} diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala index 9d3833086172f..ed9bdc6e1e3c2 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala @@ -44,189 +44,14 @@ import org.apache.spark.ui.SparkUI private[v1] class ApiRootResource extends ApiRequestContext { @Path("applications") - def getApplicationList(): ApplicationListResource = { - new ApplicationListResource(uiRoot) - } + def applicationList(): Class[ApplicationListResource] = classOf[ApplicationListResource] @Path("applications/{appId}") - def getApplication(): OneApplicationResource = { - new OneApplicationResource(uiRoot) - } - - @Path("applications/{appId}/{attemptId}/jobs") - def getJobs( - @PathParam("appId") appId: String, - @PathParam("attemptId") attemptId: String): AllJobsResource = { - withSparkUI(appId, Some(attemptId)) { ui => - new AllJobsResource(ui) - } - } - - @Path("applications/{appId}/jobs") - def getJobs(@PathParam("appId") appId: String): AllJobsResource = { - withSparkUI(appId, None) { ui => - new AllJobsResource(ui) - } - } - - @Path("applications/{appId}/jobs/{jobId: \\d+}") - def getJob(@PathParam("appId") appId: String): OneJobResource = { - withSparkUI(appId, None) { ui => - new OneJobResource(ui) - } - } - - @Path("applications/{appId}/{attemptId}/jobs/{jobId: \\d+}") - def getJob( - @PathParam("appId") appId: String, - @PathParam("attemptId") attemptId: String): OneJobResource = { - withSparkUI(appId, Some(attemptId)) { ui => - new OneJobResource(ui) - } - } - - @Path("applications/{appId}/executors") - def getExecutors(@PathParam("appId") appId: String): ExecutorListResource = { - withSparkUI(appId, None) { ui => - new ExecutorListResource(ui) - } - } - - @Path("applications/{appId}/allexecutors") - def getAllExecutors(@PathParam("appId") appId: String): AllExecutorListResource = { - withSparkUI(appId, None) { ui => - new AllExecutorListResource(ui) - } - } - - @Path("applications/{appId}/{attemptId}/executors") - def getExecutors( - @PathParam("appId") appId: String, - @PathParam("attemptId") attemptId: String): ExecutorListResource = { - withSparkUI(appId, Some(attemptId)) { ui => - new ExecutorListResource(ui) - } - } - - @Path("applications/{appId}/{attemptId}/allexecutors") - def getAllExecutors( - @PathParam("appId") appId: String, - @PathParam("attemptId") attemptId: String): AllExecutorListResource = { - withSparkUI(appId, Some(attemptId)) { ui => - new AllExecutorListResource(ui) - } - } - - @Path("applications/{appId}/stages") - def getStages(@PathParam("appId") appId: String): AllStagesResource = { - withSparkUI(appId, None) { ui => - new AllStagesResource(ui) - } - } - - @Path("applications/{appId}/{attemptId}/stages") - def getStages( - @PathParam("appId") appId: String, - @PathParam("attemptId") attemptId: String): AllStagesResource = { - withSparkUI(appId, Some(attemptId)) { ui => - new AllStagesResource(ui) - } - } - - @Path("applications/{appId}/stages/{stageId: \\d+}") - def getStage(@PathParam("appId") appId: String): OneStageResource = { - withSparkUI(appId, None) { ui => - new OneStageResource(ui) - } - } - - @Path("applications/{appId}/{attemptId}/stages/{stageId: \\d+}") - def getStage( - @PathParam("appId") appId: String, - @PathParam("attemptId") attemptId: String): OneStageResource = { - withSparkUI(appId, Some(attemptId)) { ui => - new OneStageResource(ui) - } - } - - @Path("applications/{appId}/storage/rdd") - def getRdds(@PathParam("appId") appId: String): AllRDDResource = { - withSparkUI(appId, None) { ui => - new AllRDDResource(ui) - } - } - - @Path("applications/{appId}/{attemptId}/storage/rdd") - def getRdds( - @PathParam("appId") appId: String, - @PathParam("attemptId") attemptId: String): AllRDDResource = { - withSparkUI(appId, Some(attemptId)) { ui => - new AllRDDResource(ui) - } - } - - @Path("applications/{appId}/storage/rdd/{rddId: \\d+}") - def getRdd(@PathParam("appId") appId: String): OneRDDResource = { - withSparkUI(appId, None) { ui => - new OneRDDResource(ui) - } - } - - @Path("applications/{appId}/{attemptId}/storage/rdd/{rddId: \\d+}") - def getRdd( - @PathParam("appId") appId: String, - @PathParam("attemptId") attemptId: String): OneRDDResource = { - withSparkUI(appId, Some(attemptId)) { ui => - new OneRDDResource(ui) - } - } - - @Path("applications/{appId}/logs") - def getEventLogs( - @PathParam("appId") appId: String): EventLogDownloadResource = { - try { - // withSparkUI will throw NotFoundException if attemptId exists for this application. - // So we need to try again with attempt id "1". - withSparkUI(appId, None) { _ => - new EventLogDownloadResource(uiRoot, appId, None) - } - } catch { - case _: NotFoundException => - withSparkUI(appId, Some("1")) { _ => - new EventLogDownloadResource(uiRoot, appId, None) - } - } - } - - @Path("applications/{appId}/{attemptId}/logs") - def getEventLogs( - @PathParam("appId") appId: String, - @PathParam("attemptId") attemptId: String): EventLogDownloadResource = { - withSparkUI(appId, Some(attemptId)) { _ => - new EventLogDownloadResource(uiRoot, appId, Some(attemptId)) - } - } + def application(): Class[OneApplicationResource] = classOf[OneApplicationResource] @Path("version") - def getVersion(): VersionResource = { - new VersionResource(uiRoot) - } - - @Path("applications/{appId}/environment") - def getEnvironment(@PathParam("appId") appId: String): ApplicationEnvironmentResource = { - withSparkUI(appId, None) { ui => - new ApplicationEnvironmentResource(ui) - } - } + def version(): VersionInfo = new VersionInfo(org.apache.spark.SPARK_VERSION) - @Path("applications/{appId}/{attemptId}/environment") - def getEnvironment( - @PathParam("appId") appId: String, - @PathParam("attemptId") attemptId: String): ApplicationEnvironmentResource = { - withSparkUI(appId, Some(attemptId)) { ui => - new ApplicationEnvironmentResource(ui) - } - } } private[spark] object ApiRootResource { @@ -293,23 +118,29 @@ private[v1] trait ApiRequestContext { def uiRoot: UIRoot = UIRootFromServletContext.getUiRoot(servletContext) +} - /** - * Get the spark UI with the given appID, and apply a function - * to it. If there is no such app, throw an appropriate exception - */ - def withSparkUI[T](appId: String, attemptId: Option[String])(f: SparkUI => T): T = { +/** + * Base class for resource handlers that use app-specific data. Abstracts away dealing with + * application and attempt IDs, and finding the app's UI. + */ +private[v1] trait BaseAppResource extends ApiRequestContext { + + @PathParam("appId") protected[this] var appId: String = _ + @PathParam("attemptId") protected[this] var attemptId: String = _ + + protected def withUI[T](fn: SparkUI => T): T = { try { - uiRoot.withSparkUI(appId, attemptId) { ui => + uiRoot.withSparkUI(appId, Option(attemptId)) { ui => val user = httpRequest.getRemoteUser() if (!ui.securityManager.checkUIViewPermissions(user)) { throw new ForbiddenException(raw"""user "$user" is not authorized""") } - f(ui) + fn(ui) } } catch { case _: NoSuchElementException => - val appKey = attemptId.map(appId + "/" + _).getOrElse(appId) + val appKey = Option(attemptId).map(appId + "/" + _).getOrElse(appId) throw new NotFoundException(s"no such app: $appKey") } } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationEnvironmentResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationEnvironmentResource.scala deleted file mode 100644 index e702f8aa2ef2d..0000000000000 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationEnvironmentResource.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.status.api.v1 - -import javax.ws.rs._ -import javax.ws.rs.core.MediaType - -import org.apache.spark.ui.SparkUI - -@Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class ApplicationEnvironmentResource(ui: SparkUI) { - - @GET - def getEnvironmentInfo(): ApplicationEnvironmentInfo = { - ui.store.environmentInfo() - } - -} diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala index a0239266d8756..27c780ce81428 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala @@ -23,7 +23,7 @@ import javax.ws.rs.core.MediaType import org.apache.spark.deploy.history.ApplicationHistoryInfo @Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class ApplicationListResource(uiRoot: UIRoot) { +private[v1] class ApplicationListResource extends ApiRequestContext { @GET def appList( diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/EventLogDownloadResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/EventLogDownloadResource.scala deleted file mode 100644 index c84022ddfeef0..0000000000000 --- a/core/src/main/scala/org/apache/spark/status/api/v1/EventLogDownloadResource.scala +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.status.api.v1 - -import java.io.OutputStream -import java.util.zip.ZipOutputStream -import javax.ws.rs.{GET, Produces} -import javax.ws.rs.core.{MediaType, Response, StreamingOutput} - -import scala.util.control.NonFatal - -import org.apache.spark.SparkConf -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.internal.Logging - -@Produces(Array(MediaType.APPLICATION_OCTET_STREAM)) -private[v1] class EventLogDownloadResource( - val uIRoot: UIRoot, - val appId: String, - val attemptId: Option[String]) extends Logging { - val conf = SparkHadoopUtil.get.newConfiguration(new SparkConf) - - @GET - def getEventLogs(): Response = { - try { - val fileName = { - attemptId match { - case Some(id) => s"eventLogs-$appId-$id.zip" - case None => s"eventLogs-$appId.zip" - } - } - - val stream = new StreamingOutput { - override def write(output: OutputStream): Unit = { - val zipStream = new ZipOutputStream(output) - try { - uIRoot.writeEventLogs(appId, attemptId, zipStream) - } finally { - zipStream.close() - } - - } - } - - Response.ok(stream) - .header("Content-Disposition", s"attachment; filename=$fileName") - .header("Content-Type", MediaType.APPLICATION_OCTET_STREAM) - .build() - } catch { - case NonFatal(e) => - Response.serverError() - .entity(s"Event logs are not available for app: $appId.") - .status(Response.Status.SERVICE_UNAVAILABLE) - .build() - } - } -} diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala deleted file mode 100644 index 03f359262332b..0000000000000 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.spark.status.api.v1 - -import javax.ws.rs.{GET, Produces} -import javax.ws.rs.core.MediaType - -import org.apache.spark.ui.SparkUI - -@Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class ExecutorListResource(ui: SparkUI) { - - @GET - def executorList(): Seq[ExecutorSummary] = ui.store.executorList(true) - -} diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala index 18c3e2f407360..bd4df07e7afc6 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala @@ -16,16 +16,150 @@ */ package org.apache.spark.status.api.v1 -import javax.ws.rs.{GET, PathParam, Produces} -import javax.ws.rs.core.MediaType +import java.io.OutputStream +import java.util.{List => JList} +import java.util.zip.ZipOutputStream +import javax.ws.rs.{GET, Path, PathParam, Produces, QueryParam} +import javax.ws.rs.core.{MediaType, Response, StreamingOutput} + +import scala.util.control.NonFatal + +import org.apache.spark.JobExecutionStatus +import org.apache.spark.ui.SparkUI @Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class OneApplicationResource(uiRoot: UIRoot) { +private[v1] class AbstractApplicationResource extends BaseAppResource { + + @GET + @Path("jobs") + def jobsList(@QueryParam("status") statuses: JList[JobExecutionStatus]): Seq[JobData] = { + withUI(_.store.jobsList(statuses)) + } + + @GET + @Path("jobs/{jobId: \\d+}") + def oneJob(@PathParam("jobId") jobId: Int): JobData = withUI { ui => + try { + ui.store.job(jobId) + } catch { + case _: NoSuchElementException => + throw new NotFoundException("unknown job: " + jobId) + } + } + + @GET + @Path("executors") + def executorList(): Seq[ExecutorSummary] = withUI(_.store.executorList(true)) + + @GET + @Path("allexecutors") + def allExecutorList(): Seq[ExecutorSummary] = withUI(_.store.executorList(false)) + + @Path("stages") + def stages(): Class[StagesResource] = classOf[StagesResource] + + @GET + @Path("storage/rdd") + def rddList(): Seq[RDDStorageInfo] = withUI(_.store.rddList()) + + @GET + @Path("storage/rdd/{rddId: \\d+}") + def rddData(@PathParam("rddId") rddId: Int): RDDStorageInfo = withUI { ui => + try { + ui.store.rdd(rddId) + } catch { + case _: NoSuchElementException => + throw new NotFoundException(s"no rdd found w/ id $rddId") + } + } + + @GET + @Path("environment") + def environmentInfo(): ApplicationEnvironmentInfo = withUI(_.store.environmentInfo()) + + @GET + @Path("logs") + @Produces(Array(MediaType.APPLICATION_OCTET_STREAM)) + def getEventLogs(): Response = { + // Retrieve the UI for the application just to do access permission checks. For backwards + // compatibility, this code also tries with attemptId "1" if the UI without an attempt ID does + // not exist. + try { + withUI { _ => } + } catch { + case _: NotFoundException if attemptId == null => + attemptId = "1" + withUI { _ => } + attemptId = null + } + + try { + val fileName = if (attemptId != null) { + s"eventLogs-$appId-$attemptId.zip" + } else { + s"eventLogs-$appId.zip" + } + + val stream = new StreamingOutput { + override def write(output: OutputStream): Unit = { + val zipStream = new ZipOutputStream(output) + try { + uiRoot.writeEventLogs(appId, Option(attemptId), zipStream) + } finally { + zipStream.close() + } + + } + } + + Response.ok(stream) + .header("Content-Disposition", s"attachment; filename=$fileName") + .header("Content-Type", MediaType.APPLICATION_OCTET_STREAM) + .build() + } catch { + case NonFatal(e) => + Response.serverError() + .entity(s"Event logs are not available for app: $appId.") + .status(Response.Status.SERVICE_UNAVAILABLE) + .build() + } + } + + /** + * This method needs to be last, otherwise it clashes with the paths for the above methods + * and causes JAX-RS to not find things. + */ + @Path("{attemptId}") + def applicationAttempt(): Class[OneApplicationAttemptResource] = { + if (attemptId != null) { + throw new NotFoundException(httpRequest.getRequestURI()) + } + classOf[OneApplicationAttemptResource] + } + +} + +private[v1] class OneApplicationResource extends AbstractApplicationResource { + + @GET + def getApp(): ApplicationInfo = { + val app = uiRoot.getApplicationInfo(appId) + app.getOrElse(throw new NotFoundException("unknown app: " + appId)) + } + +} + +private[v1] class OneApplicationAttemptResource extends AbstractApplicationResource { @GET - def getApp(@PathParam("appId") appId: String): ApplicationInfo = { - val apps = uiRoot.getApplicationInfo(appId) - apps.getOrElse(throw new NotFoundException("unknown app: " + appId)) + def getAttempt(): ApplicationAttemptInfo = { + uiRoot.getApplicationInfo(appId) + .flatMap { app => + app.attempts.filter(_.attemptId == attemptId).headOption + } + .getOrElse { + throw new NotFoundException(s"unknown app $appId, attempt $attemptId") + } } } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala deleted file mode 100644 index 3ee884e084c12..0000000000000 --- a/core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.status.api.v1 - -import java.util.NoSuchElementException -import javax.ws.rs.{GET, PathParam, Produces} -import javax.ws.rs.core.MediaType - -import org.apache.spark.ui.SparkUI - -@Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class OneJobResource(ui: SparkUI) { - - @GET - def oneJob(@PathParam("jobId") jobId: Int): JobData = { - try { - ui.store.job(jobId) - } catch { - case _: NoSuchElementException => - throw new NotFoundException("unknown job: " + jobId) - } - } - -} diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/OneRDDResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/OneRDDResource.scala deleted file mode 100644 index ca9758cf0d109..0000000000000 --- a/core/src/main/scala/org/apache/spark/status/api/v1/OneRDDResource.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.status.api.v1 - -import java.util.NoSuchElementException -import javax.ws.rs.{GET, PathParam, Produces} -import javax.ws.rs.core.MediaType - -import org.apache.spark.ui.SparkUI - -@Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class OneRDDResource(ui: SparkUI) { - - @GET - def rddData(@PathParam("rddId") rddId: Int): RDDStorageInfo = { - try { - ui.store.rdd(rddId) - } catch { - case _: NoSuchElementException => - throw new NotFoundException(s"no rdd found w/ id $rddId") - } - } - -} diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala similarity index 77% rename from core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala rename to core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala index 20dd73e916613..bd4dfe3c68885 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.status.api.v1 +import java.util.{List => JList} import javax.ws.rs._ import javax.ws.rs.core.MediaType @@ -27,27 +28,34 @@ import org.apache.spark.ui.SparkUI import org.apache.spark.ui.jobs.UIData.StageUIData @Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class OneStageResource(ui: SparkUI) { +private[v1] class StagesResource extends BaseAppResource { @GET - @Path("") + def stageList(@QueryParam("status") statuses: JList[StageStatus]): Seq[StageData] = { + withUI(_.store.stageList(statuses)) + } + + @GET + @Path("{stageId: \\d+}") def stageData( @PathParam("stageId") stageId: Int, @QueryParam("details") @DefaultValue("true") details: Boolean): Seq[StageData] = { - val ret = ui.store.stageData(stageId, details = details) - if (ret.nonEmpty) { - ret - } else { - throw new NotFoundException(s"unknown stage: $stageId") + withUI { ui => + val ret = ui.store.stageData(stageId, details = details) + if (ret.nonEmpty) { + ret + } else { + throw new NotFoundException(s"unknown stage: $stageId") + } } } @GET - @Path("/{stageAttemptId: \\d+}") + @Path("{stageId: \\d+}/{stageAttemptId: \\d+}") def oneAttemptData( @PathParam("stageId") stageId: Int, @PathParam("stageAttemptId") stageAttemptId: Int, - @QueryParam("details") @DefaultValue("true") details: Boolean): StageData = { + @QueryParam("details") @DefaultValue("true") details: Boolean): StageData = withUI { ui => try { ui.store.stageAttempt(stageId, stageAttemptId, details = details) } catch { @@ -57,12 +65,12 @@ private[v1] class OneStageResource(ui: SparkUI) { } @GET - @Path("/{stageAttemptId: \\d+}/taskSummary") + @Path("{stageId: \\d+}/{stageAttemptId: \\d+}/taskSummary") def taskSummary( @PathParam("stageId") stageId: Int, @PathParam("stageAttemptId") stageAttemptId: Int, @DefaultValue("0.05,0.25,0.5,0.75,0.95") @QueryParam("quantiles") quantileString: String) - : TaskMetricDistributions = { + : TaskMetricDistributions = withUI { ui => val quantiles = quantileString.split(",").map { s => try { s.toDouble @@ -76,14 +84,14 @@ private[v1] class OneStageResource(ui: SparkUI) { } @GET - @Path("/{stageAttemptId: \\d+}/taskList") + @Path("{stageId: \\d+}/{stageAttemptId: \\d+}/taskList") def taskList( @PathParam("stageId") stageId: Int, @PathParam("stageAttemptId") stageAttemptId: Int, @DefaultValue("0") @QueryParam("offset") offset: Int, @DefaultValue("20") @QueryParam("length") length: Int, @DefaultValue("ID") @QueryParam("sortBy") sortBy: TaskSorting): Seq[TaskData] = { - ui.store.taskList(stageId, stageAttemptId, offset, length, sortBy) + withUI(_.store.taskList(stageId, stageAttemptId, offset, length, sortBy)) } } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/VersionResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/VersionResource.scala deleted file mode 100644 index 673da1ce36b57..0000000000000 --- a/core/src/main/scala/org/apache/spark/status/api/v1/VersionResource.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.status.api.v1 - -import javax.ws.rs._ -import javax.ws.rs.core.MediaType - -@Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class VersionResource(ui: UIRoot) { - - @GET - def getVersionInfo(): VersionInfo = new VersionInfo( - org.apache.spark.SPARK_VERSION - ) - -} diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllBatchesResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllBatchesResource.scala deleted file mode 100644 index 3a51ae609303a..0000000000000 --- a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllBatchesResource.scala +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.status.api.v1.streaming - -import java.util.{ArrayList => JArrayList, Arrays => JArrays, Date, List => JList} -import javax.ws.rs.{GET, Produces, QueryParam} -import javax.ws.rs.core.MediaType - -import org.apache.spark.status.api.v1.streaming.AllBatchesResource._ -import org.apache.spark.streaming.ui.StreamingJobProgressListener - -@Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class AllBatchesResource(listener: StreamingJobProgressListener) { - - @GET - def batchesList(@QueryParam("status") statusParams: JList[BatchStatus]): Seq[BatchInfo] = { - batchInfoList(listener, statusParams).sortBy(- _.batchId) - } -} - -private[v1] object AllBatchesResource { - - def batchInfoList( - listener: StreamingJobProgressListener, - statusParams: JList[BatchStatus] = new JArrayList[BatchStatus]()): Seq[BatchInfo] = { - - listener.synchronized { - val statuses = - if (statusParams.isEmpty) JArrays.asList(BatchStatus.values(): _*) else statusParams - val statusToBatches = Seq( - BatchStatus.COMPLETED -> listener.retainedCompletedBatches, - BatchStatus.QUEUED -> listener.waitingBatches, - BatchStatus.PROCESSING -> listener.runningBatches - ) - - val batchInfos = for { - (status, batches) <- statusToBatches - batch <- batches if statuses.contains(status) - } yield { - val batchId = batch.batchTime.milliseconds - val firstFailureReason = batch.outputOperations.flatMap(_._2.failureReason).headOption - - new BatchInfo( - batchId = batchId, - batchTime = new Date(batchId), - status = status.toString, - batchDuration = listener.batchDuration, - inputSize = batch.numRecords, - schedulingDelay = batch.schedulingDelay, - processingTime = batch.processingDelay, - totalDelay = batch.totalDelay, - numActiveOutputOps = batch.numActiveOutputOp, - numCompletedOutputOps = batch.numCompletedOutputOp, - numFailedOutputOps = batch.numFailedOutputOp, - numTotalOutputOps = batch.outputOperations.size, - firstFailureReason = firstFailureReason - ) - } - - batchInfos - } - } -} diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllOutputOperationsResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllOutputOperationsResource.scala deleted file mode 100644 index 0eb649f0e1b72..0000000000000 --- a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllOutputOperationsResource.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.status.api.v1.streaming - -import java.util.Date -import javax.ws.rs.{GET, PathParam, Produces} -import javax.ws.rs.core.MediaType - -import org.apache.spark.status.api.v1.NotFoundException -import org.apache.spark.status.api.v1.streaming.AllOutputOperationsResource._ -import org.apache.spark.streaming.Time -import org.apache.spark.streaming.ui.StreamingJobProgressListener - -@Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class AllOutputOperationsResource(listener: StreamingJobProgressListener) { - - @GET - def operationsList(@PathParam("batchId") batchId: Long): Seq[OutputOperationInfo] = { - outputOperationInfoList(listener, batchId).sortBy(_.outputOpId) - } -} - -private[v1] object AllOutputOperationsResource { - - def outputOperationInfoList( - listener: StreamingJobProgressListener, - batchId: Long): Seq[OutputOperationInfo] = { - - listener.synchronized { - listener.getBatchUIData(Time(batchId)) match { - case Some(batch) => - for ((opId, op) <- batch.outputOperations) yield { - val jobIds = batch.outputOpIdSparkJobIdPairs - .filter(_.outputOpId == opId).map(_.sparkJobId).toSeq.sorted - - new OutputOperationInfo( - outputOpId = opId, - name = op.name, - description = op.description, - startTime = op.startTime.map(new Date(_)), - endTime = op.endTime.map(new Date(_)), - duration = op.duration, - failureReason = op.failureReason, - jobIds = jobIds - ) - } - case None => throw new NotFoundException("unknown batch: " + batchId) - } - }.toSeq - } -} diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllReceiversResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllReceiversResource.scala deleted file mode 100644 index 5a276a9236a0f..0000000000000 --- a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllReceiversResource.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.status.api.v1.streaming - -import java.util.Date -import javax.ws.rs.{GET, Produces} -import javax.ws.rs.core.MediaType - -import org.apache.spark.status.api.v1.streaming.AllReceiversResource._ -import org.apache.spark.streaming.ui.StreamingJobProgressListener - -@Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class AllReceiversResource(listener: StreamingJobProgressListener) { - - @GET - def receiversList(): Seq[ReceiverInfo] = { - receiverInfoList(listener).sortBy(_.streamId) - } -} - -private[v1] object AllReceiversResource { - - def receiverInfoList(listener: StreamingJobProgressListener): Seq[ReceiverInfo] = { - listener.synchronized { - listener.receivedRecordRateWithBatchTime.map { case (streamId, eventRates) => - - val receiverInfo = listener.receiverInfo(streamId) - val streamName = receiverInfo.map(_.name) - .orElse(listener.streamName(streamId)).getOrElse(s"Stream-$streamId") - val avgEventRate = - if (eventRates.isEmpty) None else Some(eventRates.map(_._2).sum / eventRates.size) - - val (errorTime, errorMessage, error) = receiverInfo match { - case None => (None, None, None) - case Some(info) => - val someTime = - if (info.lastErrorTime >= 0) Some(new Date(info.lastErrorTime)) else None - val someMessage = - if (info.lastErrorMessage.length > 0) Some(info.lastErrorMessage) else None - val someError = - if (info.lastError.length > 0) Some(info.lastError) else None - - (someTime, someMessage, someError) - } - - new ReceiverInfo( - streamId = streamId, - streamName = streamName, - isActive = receiverInfo.map(_.active), - executorId = receiverInfo.map(_.executorId), - executorHost = receiverInfo.map(_.location), - lastErrorTime = errorTime, - lastErrorMessage = errorMessage, - lastError = error, - avgEventRate = avgEventRate, - eventRates = eventRates - ) - }.toSeq - } - } -} diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingApp.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingApp.scala index aea75d5a9c8d0..07d8164e1d2c0 100644 --- a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingApp.scala +++ b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingApp.scala @@ -19,24 +19,39 @@ package org.apache.spark.status.api.v1.streaming import javax.ws.rs.{Path, PathParam} -import org.apache.spark.status.api.v1.ApiRequestContext +import org.apache.spark.status.api.v1._ +import org.apache.spark.streaming.ui.StreamingJobProgressListener @Path("/v1") private[v1] class ApiStreamingApp extends ApiRequestContext { @Path("applications/{appId}/streaming") - def getStreamingRoot(@PathParam("appId") appId: String): ApiStreamingRootResource = { - withSparkUI(appId, None) { ui => - new ApiStreamingRootResource(ui) - } + def getStreamingRoot(@PathParam("appId") appId: String): Class[ApiStreamingRootResource] = { + classOf[ApiStreamingRootResource] } @Path("applications/{appId}/{attemptId}/streaming") def getStreamingRoot( @PathParam("appId") appId: String, - @PathParam("attemptId") attemptId: String): ApiStreamingRootResource = { - withSparkUI(appId, Some(attemptId)) { ui => - new ApiStreamingRootResource(ui) + @PathParam("attemptId") attemptId: String): Class[ApiStreamingRootResource] = { + classOf[ApiStreamingRootResource] + } +} + +/** + * Base class for streaming API handlers, provides easy access to the streaming listener that + * holds the app's information. + */ +private[v1] trait BaseStreamingAppResource extends BaseAppResource { + + protected def withListener[T](fn: StreamingJobProgressListener => T): T = withUI { ui => + val listener = ui.getStreamingJobProgressListener match { + case Some(listener) => listener.asInstanceOf[StreamingJobProgressListener] + case None => throw new NotFoundException("no streaming listener attached to " + ui.getAppName) + } + listener.synchronized { + fn(listener) } } + } diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingRootResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingRootResource.scala index 1ccd586c848bd..a2571b910f615 100644 --- a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingRootResource.scala +++ b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingRootResource.scala @@ -17,58 +17,180 @@ package org.apache.spark.status.api.v1.streaming -import javax.ws.rs.Path +import java.util.{Arrays => JArrays, Collections, Date, List => JList} +import javax.ws.rs.{GET, Path, PathParam, Produces, QueryParam} +import javax.ws.rs.core.MediaType import org.apache.spark.status.api.v1.NotFoundException +import org.apache.spark.streaming.Time import org.apache.spark.streaming.ui.StreamingJobProgressListener +import org.apache.spark.streaming.ui.StreamingJobProgressListener._ import org.apache.spark.ui.SparkUI -private[v1] class ApiStreamingRootResource(ui: SparkUI) { - - import org.apache.spark.status.api.v1.streaming.ApiStreamingRootResource._ +@Produces(Array(MediaType.APPLICATION_JSON)) +private[v1] class ApiStreamingRootResource extends BaseStreamingAppResource { + @GET @Path("statistics") - def getStreamingStatistics(): StreamingStatisticsResource = { - new StreamingStatisticsResource(getListener(ui)) + def streamingStatistics(): StreamingStatistics = withListener { listener => + val batches = listener.retainedBatches + val avgInputRate = avgRate(batches.map(_.numRecords * 1000.0 / listener.batchDuration)) + val avgSchedulingDelay = avgTime(batches.flatMap(_.schedulingDelay)) + val avgProcessingTime = avgTime(batches.flatMap(_.processingDelay)) + val avgTotalDelay = avgTime(batches.flatMap(_.totalDelay)) + + new StreamingStatistics( + startTime = new Date(listener.startTime), + batchDuration = listener.batchDuration, + numReceivers = listener.numReceivers, + numActiveReceivers = listener.numActiveReceivers, + numInactiveReceivers = listener.numInactiveReceivers, + numTotalCompletedBatches = listener.numTotalCompletedBatches, + numRetainedCompletedBatches = listener.retainedCompletedBatches.size, + numActiveBatches = listener.numUnprocessedBatches, + numProcessedRecords = listener.numTotalProcessedRecords, + numReceivedRecords = listener.numTotalReceivedRecords, + avgInputRate = avgInputRate, + avgSchedulingDelay = avgSchedulingDelay, + avgProcessingTime = avgProcessingTime, + avgTotalDelay = avgTotalDelay + ) } + @GET @Path("receivers") - def getReceivers(): AllReceiversResource = { - new AllReceiversResource(getListener(ui)) + def receiversList(): Seq[ReceiverInfo] = withListener { listener => + listener.receivedRecordRateWithBatchTime.map { case (streamId, eventRates) => + val receiverInfo = listener.receiverInfo(streamId) + val streamName = receiverInfo.map(_.name) + .orElse(listener.streamName(streamId)).getOrElse(s"Stream-$streamId") + val avgEventRate = + if (eventRates.isEmpty) None else Some(eventRates.map(_._2).sum / eventRates.size) + + val (errorTime, errorMessage, error) = receiverInfo match { + case None => (None, None, None) + case Some(info) => + val someTime = + if (info.lastErrorTime >= 0) Some(new Date(info.lastErrorTime)) else None + val someMessage = + if (info.lastErrorMessage.length > 0) Some(info.lastErrorMessage) else None + val someError = + if (info.lastError.length > 0) Some(info.lastError) else None + + (someTime, someMessage, someError) + } + + new ReceiverInfo( + streamId = streamId, + streamName = streamName, + isActive = receiverInfo.map(_.active), + executorId = receiverInfo.map(_.executorId), + executorHost = receiverInfo.map(_.location), + lastErrorTime = errorTime, + lastErrorMessage = errorMessage, + lastError = error, + avgEventRate = avgEventRate, + eventRates = eventRates + ) + }.toSeq.sortBy(_.streamId) } + @GET @Path("receivers/{streamId: \\d+}") - def getReceiver(): OneReceiverResource = { - new OneReceiverResource(getListener(ui)) + def oneReceiver(@PathParam("streamId") streamId: Int): ReceiverInfo = { + receiversList().find { _.streamId == streamId }.getOrElse( + throw new NotFoundException("unknown receiver: " + streamId)) } + @GET @Path("batches") - def getBatches(): AllBatchesResource = { - new AllBatchesResource(getListener(ui)) + def batchesList(@QueryParam("status") statusParams: JList[BatchStatus]): Seq[BatchInfo] = { + withListener { listener => + val statuses = + if (statusParams.isEmpty) JArrays.asList(BatchStatus.values(): _*) else statusParams + val statusToBatches = Seq( + BatchStatus.COMPLETED -> listener.retainedCompletedBatches, + BatchStatus.QUEUED -> listener.waitingBatches, + BatchStatus.PROCESSING -> listener.runningBatches + ) + + val batchInfos = for { + (status, batches) <- statusToBatches + batch <- batches if statuses.contains(status) + } yield { + val batchId = batch.batchTime.milliseconds + val firstFailureReason = batch.outputOperations.flatMap(_._2.failureReason).headOption + + new BatchInfo( + batchId = batchId, + batchTime = new Date(batchId), + status = status.toString, + batchDuration = listener.batchDuration, + inputSize = batch.numRecords, + schedulingDelay = batch.schedulingDelay, + processingTime = batch.processingDelay, + totalDelay = batch.totalDelay, + numActiveOutputOps = batch.numActiveOutputOp, + numCompletedOutputOps = batch.numCompletedOutputOp, + numFailedOutputOps = batch.numFailedOutputOp, + numTotalOutputOps = batch.outputOperations.size, + firstFailureReason = firstFailureReason + ) + } + + batchInfos.sortBy(- _.batchId) + } } + @GET @Path("batches/{batchId: \\d+}") - def getBatch(): OneBatchResource = { - new OneBatchResource(getListener(ui)) + def oneBatch(@PathParam("batchId") batchId: Long): BatchInfo = { + batchesList(Collections.emptyList()).find { _.batchId == batchId }.getOrElse( + throw new NotFoundException("unknown batch: " + batchId)) } + @GET @Path("batches/{batchId: \\d+}/operations") - def getOutputOperations(): AllOutputOperationsResource = { - new AllOutputOperationsResource(getListener(ui)) + def operationsList(@PathParam("batchId") batchId: Long): Seq[OutputOperationInfo] = { + withListener { listener => + val ops = listener.getBatchUIData(Time(batchId)) match { + case Some(batch) => + for ((opId, op) <- batch.outputOperations) yield { + val jobIds = batch.outputOpIdSparkJobIdPairs + .filter(_.outputOpId == opId).map(_.sparkJobId).toSeq.sorted + + new OutputOperationInfo( + outputOpId = opId, + name = op.name, + description = op.description, + startTime = op.startTime.map(new Date(_)), + endTime = op.endTime.map(new Date(_)), + duration = op.duration, + failureReason = op.failureReason, + jobIds = jobIds + ) + } + case None => throw new NotFoundException("unknown batch: " + batchId) + } + ops.toSeq + } } + @GET @Path("batches/{batchId: \\d+}/operations/{outputOpId: \\d+}") - def getOutputOperation(): OneOutputOperationResource = { - new OneOutputOperationResource(getListener(ui)) + def oneOperation( + @PathParam("batchId") batchId: Long, + @PathParam("outputOpId") opId: OutputOpId): OutputOperationInfo = { + operationsList(batchId).find { _.outputOpId == opId }.getOrElse( + throw new NotFoundException("unknown output operation: " + opId)) } -} + private def avgRate(data: Seq[Double]): Option[Double] = { + if (data.isEmpty) None else Some(data.sum / data.size) + } -private[v1] object ApiStreamingRootResource { - def getListener(ui: SparkUI): StreamingJobProgressListener = { - ui.getStreamingJobProgressListener match { - case Some(listener) => listener.asInstanceOf[StreamingJobProgressListener] - case None => throw new NotFoundException("no streaming listener attached to " + ui.getAppName) - } + private def avgTime(data: Seq[Long]): Option[Long] = { + if (data.isEmpty) None else Some(data.sum / data.size) } + } diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneBatchResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneBatchResource.scala deleted file mode 100644 index d3c689c790cfc..0000000000000 --- a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneBatchResource.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.status.api.v1.streaming - -import javax.ws.rs.{GET, PathParam, Produces} -import javax.ws.rs.core.MediaType - -import org.apache.spark.status.api.v1.NotFoundException -import org.apache.spark.streaming.ui.StreamingJobProgressListener - -@Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class OneBatchResource(listener: StreamingJobProgressListener) { - - @GET - def oneBatch(@PathParam("batchId") batchId: Long): BatchInfo = { - val someBatch = AllBatchesResource.batchInfoList(listener) - .find { _.batchId == batchId } - someBatch.getOrElse(throw new NotFoundException("unknown batch: " + batchId)) - } -} diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneOutputOperationResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneOutputOperationResource.scala deleted file mode 100644 index aabcdb29b0d4c..0000000000000 --- a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneOutputOperationResource.scala +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.status.api.v1.streaming - -import javax.ws.rs.{GET, PathParam, Produces} -import javax.ws.rs.core.MediaType - -import org.apache.spark.status.api.v1.NotFoundException -import org.apache.spark.streaming.ui.StreamingJobProgressListener -import org.apache.spark.streaming.ui.StreamingJobProgressListener._ - -@Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class OneOutputOperationResource(listener: StreamingJobProgressListener) { - - @GET - def oneOperation( - @PathParam("batchId") batchId: Long, - @PathParam("outputOpId") opId: OutputOpId): OutputOperationInfo = { - - val someOutputOp = AllOutputOperationsResource.outputOperationInfoList(listener, batchId) - .find { _.outputOpId == opId } - someOutputOp.getOrElse(throw new NotFoundException("unknown output operation: " + opId)) - } -} diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneReceiverResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneReceiverResource.scala deleted file mode 100644 index c0cc99da3a9c7..0000000000000 --- a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneReceiverResource.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.status.api.v1.streaming - -import javax.ws.rs.{GET, PathParam, Produces} -import javax.ws.rs.core.MediaType - -import org.apache.spark.status.api.v1.NotFoundException -import org.apache.spark.streaming.ui.StreamingJobProgressListener - -@Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class OneReceiverResource(listener: StreamingJobProgressListener) { - - @GET - def oneReceiver(@PathParam("streamId") streamId: Int): ReceiverInfo = { - val someReceiver = AllReceiversResource.receiverInfoList(listener) - .find { _.streamId == streamId } - someReceiver.getOrElse(throw new NotFoundException("unknown receiver: " + streamId)) - } -} diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/StreamingStatisticsResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/StreamingStatisticsResource.scala deleted file mode 100644 index 6cff87be59ca8..0000000000000 --- a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/StreamingStatisticsResource.scala +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.status.api.v1.streaming - -import java.util.Date -import javax.ws.rs.{GET, Produces} -import javax.ws.rs.core.MediaType - -import org.apache.spark.streaming.ui.StreamingJobProgressListener - -@Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class StreamingStatisticsResource(listener: StreamingJobProgressListener) { - - @GET - def streamingStatistics(): StreamingStatistics = { - listener.synchronized { - val batches = listener.retainedBatches - val avgInputRate = avgRate(batches.map(_.numRecords * 1000.0 / listener.batchDuration)) - val avgSchedulingDelay = avgTime(batches.flatMap(_.schedulingDelay)) - val avgProcessingTime = avgTime(batches.flatMap(_.processingDelay)) - val avgTotalDelay = avgTime(batches.flatMap(_.totalDelay)) - - new StreamingStatistics( - startTime = new Date(listener.startTime), - batchDuration = listener.batchDuration, - numReceivers = listener.numReceivers, - numActiveReceivers = listener.numActiveReceivers, - numInactiveReceivers = listener.numInactiveReceivers, - numTotalCompletedBatches = listener.numTotalCompletedBatches, - numRetainedCompletedBatches = listener.retainedCompletedBatches.size, - numActiveBatches = listener.numUnprocessedBatches, - numProcessedRecords = listener.numTotalProcessedRecords, - numReceivedRecords = listener.numTotalReceivedRecords, - avgInputRate = avgInputRate, - avgSchedulingDelay = avgSchedulingDelay, - avgProcessingTime = avgProcessingTime, - avgTotalDelay = avgTotalDelay - ) - } - } - - private def avgRate(data: Seq[Double]): Option[Double] = { - if (data.isEmpty) None else Some(data.sum / data.size) - } - - private def avgTime(data: Seq[Long]): Option[Long] = { - if (data.isEmpty) None else Some(data.sum / data.size) - } -}