Skip to content

Conversation

@attilapiros
Copy link
Contributor

@attilapiros attilapiros commented Jun 25, 2018

What changes were proposed in this pull request?

In this PR metrics are introduced for YARN. As up to now there was no metrics in the YARN module a new metric system is created with the name "applicationMaster".
To support both client and cluster mode the metric system lifecycle is bound to the AM.

How was this patch tested?

Both client and cluster mode was tested manually.
Before the test on one of the YARN node spark-core was removed to cause the allocation failure.
Spark was started as (in case of client mode):

spark2-submit \
  --class org.apache.spark.examples.SparkPi \
  --conf "spark.yarn.blacklist.executor.launch.blacklisting.enabled=true" --conf "spark.blacklist.application.maxFailedExecutorsPerNode=2" --conf "spark.dynamicAllocation.enabled=true" --conf "spark.metrics.conf.*.sink.console.class=org.apache.spark.metrics.sink.ConsoleSink" \
  --master yarn \
  --deploy-mode client \
  original-spark-examples_2.11-2.4.0-SNAPSHOT.jar \
  1000

In both cases the YARN logs contained the new metrics as:

$ yarn logs --applicationId application_1529926424933_0015 
...
-- Gauges ----------------------------------------------------------------------
application_1531751594108_0046.applicationMaster.numContainersPendingAllocate
             value = 0
application_1531751594108_0046.applicationMaster.numExecutorsFailed
             value = 3
application_1531751594108_0046.applicationMaster.numExecutorsRunning
             value = 9
application_1531751594108_0046.applicationMaster.numLocalityAwareTasks
             value = 0
application_1531751594108_0046.applicationMaster.numReleasedContainers
             value = 0
...

@SparkQA
Copy link

SparkQA commented Jun 25, 2018

Test build #92301 has finished for PR 21635 at commit 9b033cc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


private var master: ApplicationMaster = _


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok


private val securityMgr = new SecurityManager(sparkConf)

private[spark] val failureTracker = new FailureTracker(sparkConf, new SystemClock)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to specify the clock here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As other metrics from YarnAllocator will be added this change is reverted.

override val sourceName: String = "yarn_cluster"
override val metricRegistry: MetricRegistry = new MetricRegistry()

metricRegistry.register(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mechanics of adding the metric source are ok, but have you thought of other metrics to expose? YarnAllocator has a lot of things that could be easily hooked up here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, creating metric source with only one metrics seems overkill. Maybe we can mix this into FailureTracker.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added some new metrics. So the current metrics are:

  • yarn.numExecutorsFailed
  • yarn.numExecutorsRunning
  • yarn.numLocalityAwareTasks
  • yarn.numPendingLossReasonRequests
  • yarn.numReleasedContainers

@attilapiros attilapiros changed the title [SPARK-24594][YARN] Introducing metrics for YARN executor allocation problems [SPARK-24594][YARN] Introducing metrics for YARN Jun 26, 2018
@SparkQA
Copy link

SparkQA commented Jun 26, 2018

Test build #92341 has finished for PR 21635 at commit 4968ad6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
"Uncaught exception: " + StringUtils.stringifyException(e))
} finally {
metricsSystem.report()
Copy link
Contributor

@tgravescs tgravescs Jun 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add yarn to the monitoring.md doc as component

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Tom, documentation is added.

@SparkQA
Copy link

SparkQA commented Jun 27, 2018

Test build #92385 has finished for PR 21635 at commit 9735525.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
"Uncaught exception: " + StringUtils.stringifyException(e))
} finally {
metricsSystem.report()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

metricsSystem can be null at this point, can't it? in case of some issue during startup.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, better and more elegant to store metricSystem in an Option.

override def getValue: Int = yarnAllocator.numLocalityAwareTasks
})

}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The size of getPendingAllocate might be an interesting metric, but need to check whether it requires synchronization... and it may be an expensive operation, not sure if the AM client has a better API to get the number of pending requests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I have seen the call goes to YARN and I also was afraid abut its execution time so this is why I finally decided to leave it out.

But I will check whether there is something better to get it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The getPendingAllocate seams to me quite cheap as it just uses local maps (and tables) to calculate a list of ContainerRequests.

@SparkQA
Copy link

SparkQA commented Jun 27, 2018

Test build #92396 has finished for PR 21635 at commit a8f3146.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 27, 2018

Test build #92397 has finished for PR 21635 at commit b0ee4ec.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

})

metricRegistry.register(MetricRegistry.name("numPendingLossReasonRequests"), new Gauge[Int] {
override def getValue: Int = yarnAllocator.getNumPendingLossReasonRequests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how useful this metric is, did you have specific use case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I have no use case for it. I added it as in previous comments it was requested to have more metrics and this one was something easy to collect If it is totally useless then better to remove it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I would leave it out if no one specifically requested it and we can't think of use case. Its easier to add later then to remove.

@SparkQA
Copy link

SparkQA commented Jul 2, 2018

Test build #92534 has finished for PR 21635 at commit 68ba47e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

override def getValue: Int = yarnAllocator.numLocalityAwareTasks
})

metricRegistry.register(MetricRegistry.name("numPendingAllocate"), new Gauge[Int] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we give these a more clear name. Like numContainersPendingAllocate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure we can do that.

@tgravescs
Copy link
Contributor

one minor naming question otherwise lgtm.

"Uncaught exception: " + StringUtils.stringifyException(e))
} finally {
metricsSystem.foreach { ms =>
ms.report()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any issues with this if AM gets killed badly or OOMs? Basically where we would want to wrap this in try catch and ignore any exceptions from it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of OOM or any interrupt exception I expect Line 309 to catch the exception and log the error. But the exit code can be lost if metricSystems throws new exception so to be on the safe side I add the try catch to avoid this.

@SparkQA
Copy link

SparkQA commented Jul 10, 2018

Test build #92819 has finished for PR 21635 at commit f3781bd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 10, 2018

Test build #92825 has finished for PR 21635 at commit f3781bd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 12, 2018

Test build #92927 has finished for PR 21635 at commit 6751ec5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* `executor`: A Spark executor.
* `driver`: The Spark driver process (the process in which your SparkContext is created).
* `shuffleService`: The Spark shuffle service.
* `yarn`: Spark resource allocations on YARN.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better to change to application master for better understanding?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure we can do that. After this many change I would like to test it on a cluster again.
Soon I will come back with the result.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully retested on cluster.

@SparkQA
Copy link

SparkQA commented Jul 12, 2018

Test build #92934 has finished for PR 21635 at commit c0c4748.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

+1 . @jerryshao

}
} catch {
case e: Exception =>
logInfo("Exception during stopping of the metric system: ", e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to change to warning log if exception occurred.


private[spark] class ApplicationMasterSource(yarnAllocator: YarnAllocator) extends Source {

override val sourceName: String = "applicationMaster"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case this is the metrics output:

-- Gauges ----------------------------------------------------------------------
applicationMaster.numContainersPendingAllocate
             value = 0
applicationMaster.numExecutorsFailed
             value = 3
applicationMaster.numExecutorsRunning
             value = 9
applicationMaster.numLocalityAwareTasks
             value = 0
applicationMaster.numReleasedContainers
             value = 0
...

I would suggest to add application id as a prefix to differentiate between different apps.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah good catch, I was thinking it automatically added the namespace but it looks like that is only on executor and driver instances. Perhaps we should just add it as system that will append in the spark.metrics.namespace setting. for yarn I see the applicationmaster metrics the same as the dag scheduler source, executor allocation manager, etc.. Allowing user to control this makes sense to me. thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tgravescs Would you please explain more, are you going to add a new configuration "spark.metrics.namespace", also how do you use this configuration?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the config spark.metrics.namespace already exists. see the metrics section in http://spark.apache.org/docs/latest/monitoring.html. But if you look at the code https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala#L129 its only applied for executor and driver metrics. I think we should have it apply to the yarn metrics as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. But I think we may not get "spark.app.id" in AM side, instead I think we can get yarn application id, so either we can set this configuration with application id, or directly prepend to the source name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea to make the metric names more app specific. So I will prepend the app ID to the sourcename. And rerun my test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah for the client mode yes there is an order issue with spark.app.id. I'm fine with using the yarn app id since that is essentially what the driver executor use anyway, but I think we should also make it configurable. I like to see these stay consistent. If the user can set the driver/executor metrics with spark.metrics.namespace we should allow them to set the yarn ones so that they all could have similar prefix. Perhaps we add a spark.yarn.metrics.namespace?

application_1530654167152_24008.driver.LiveListenerBus.listenerProcessingTime.org.apache.spark.ExecutorAllocationManager$ExecutorAllocationListener
application_1530654167152_25538.2.executor.recordsRead

* `driver`: The Spark driver process (the process in which your SparkContext is created).
* `shuffleService`: The Spark shuffle service.
* `applicationMaster`: The Spark application master on YARN.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to clarify as "The Spark ApplicationMaster when running on YARN."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated accordingly.

@SparkQA
Copy link

SparkQA commented Jul 19, 2018

Test build #93293 has finished for PR 21635 at commit 7958525.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 19, 2018

Test build #93297 has finished for PR 21635 at commit 6761098.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

+1 @jerryshao

.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("100s")

private[spark] val YARN_METRICS_NAMESPACE = ConfigBuilder("spark.yarn.metrics.namespace")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add this configuration to the yarn doc?

@SparkQA
Copy link

SparkQA commented Jul 23, 2018

Test build #93439 has finished for PR 21635 at commit 0b86788.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@jerryshao jerryshao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, merging to master branch.

@asfgit asfgit closed this in d2436a8 Jul 24, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants