Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minerkasch/spark #2407

Merged
merged 1 commit into from
Apr 26, 2016
Merged

Minerkasch/spark #2407

merged 1 commit into from
Apr 26, 2016

Conversation

zachradtka
Copy link
Contributor

An AgentCheck that gathers metrics for Apache Spark

The metrics collected are:

Spark Job Metrics
-----------------
spark.job.num_tasks
spark.job.num_active_tasks
spark.job.num_completed_tasks
spark.job.num_skipped_tasks
spark.job.num_failed_tasks
spark.job.num_active_stages
spark.job.num_completed_stages
spark.job.num_skipped_stages
spark.job.num_failed_stages

Spark Stage Metrics
-------------------
spark.stage.num_active_tasks
spark.stage.num_complete_tasks
spark.stage.num_failed_tasks
spark.stage.executor_run_time
spark.stage.input_bytes
spark.stage.input_records
spark.stage.output_bytes
spark.stage.output_records
spark.stage.shuffle_read_bytes
spark.stage.shuffle_read_records
spark.stage.shuffle_write_bytes
spark.stage.shuffle_write_records
spark.stage.memory_bytes_spilled
spark.stage.disk_bytes_spilled

Spark Executor Metrics
----------------------
spark.executor.rdd_blocks
spark.executor.memory_used
spark.executor.disk_used
spark.executor.active_tasks
spark.executor.failed_tasks
spark.executor.completed_tasks
spark.executor.total_tasks
spark.executor.total_duration
spark.executor.total_input_bytes
spark.executor.total_shuffle_read
spark.executor.total_shuffle_write
spark.executor.max_memory

Spark RDD Metrics
-----------------
spark.rdd.num_partitions
spark.rdd.num_cached_partitions
spark.memory_used
spark.rdd.disk_used

'totalDuration': ('spark.executor.total_duration', HISTOGRAM),
'totalInputBytes': ('spark.executor.total_input_bytes', HISTOGRAM),
'totalShuffleRead': ('spark.executor.total_shuffle_read', HISTOGRAM),
'totalShuffleWrite': ('spark.executor.HISTOGRAM', HISTOGRAM),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo in the metric name here

@olivielpeau
Copy link
Member

Thanks @zachradtka! I've added a few comments here and there, but overall the check looks good!

Once you've addressed them we'll test the check on our own Spark setup, I may have more feedback for you then.

@zachradtka
Copy link
Contributor Author

@olivielpeau, Thanks for the change suggestions! All have been rectified.

@ascii766164696D
Copy link

@zachradtka

    spark
    -----
      - instance #0 [ERROR]: '404 Client Error: Not Found'
        Traceback (most recent call last):
          File "/home/hadoop/dd-agent-spark/checks/__init__.py", line 746, in run
            self.check(copy.deepcopy(instance))
          File "/home/hadoop/dd-agent-spark/checks.d/spark.py", line 155, in check
            self._spark_job_metrics(running_apps, cluster_id)
          File "/home/hadoop/dd-agent-spark/checks.d/spark.py", line 220, in _spark_job_metrics
            SPARK_SERVICE_CHECK, app_name, 'jobs')
          File "/home/hadoop/dd-agent-spark/checks.d/spark.py", line 325, in _rest_request_to_json
            response.raise_for_status()
          File "/home/hadoop/dd-agent-spark/venv/local/lib/python2.7/site-packages/requests/models.py", line 834, in raise_for_status
            raise HTTPError(http_error_msg, response=self)
        HTTPError: 404 Client Error: Not Found

Instead of:
http://hostname:20888/proxy/application_1460395170645_0001/api/v1/applications/Zeppelin/jobs

this should be called:
http://hostname:20888/proxy/application_1460395170645_0001/api/v1/applications/application_1460395170645_0001/jobs

@zachradtka
Copy link
Contributor Author

@buryat: Thanks for the feedback.

What version of Spark are you using and what version of YARN are you using?

The URL that you gave me is a bit interesting, because it uses api/v1/[app-id]/jobs. The spark REST API says that the endpoint should be api/v1/applications/[app-id]/jobs. It is a subtle difference, but there should be the word applications in the endpoint. Could there have been a typo in the URL you posted?

Also, could you possibly run zepplin again and give me the output of http://hostname:20888/proxy/application_1460395170645_0001/api/v1/applications? That should give you the list of running Spark applications for that YARN app-id. I am testing with HDP and the id of my Spark applications happen to also be the name of the YARN application. I am guessing that the ids of your Spark applications is the YARN id.

With that I should be able to fix the errors you are seeing.

Thanks!

@ascii766164696D
Copy link

@zachradtka yeah, I'm sorry, I made a typo when I was redacting the hostname. I've corrected my original comment.

I use AWS EMR-4.3.0, it runs Hadoop 2.7.1, Spark 1.6.0.

curl "hostname:20888/proxy/application_1460395170645_0002/api/v1/applications"
[ {
  "id" : "application_1460395170645_0002",
  "name" : "Zeppelin",
  "attempts" : [ {
    "startTime" : "2016-04-12T00:42:28.934GMT",
    "endTime" : "1969-12-31T23:59:59.999GMT",
    "sparkUser" : "",
    "completed" : false
  } ]
} ]

curl "hostname:20888/proxy/application_1460395170645_0002/api/v1/applications/application_1460395170645_0002"
{
  "id" : "application_1460395170645_0002",
  "name" : "Zeppelin",
  "attempts" : [ {
    "startTime" : "2016-04-12T00:42:28.934GMT",
    "endTime" : "1969-12-31T23:59:59.999GMT",
    "sparkUser" : "",
    "completed" : false
  } ]
} 

@zachradtka
Copy link
Contributor Author

@buryat Thanks for the response and for the output.

I updated the integration to get the Spark application ID's from the Spark rest interface. That should fix the problems you were seeing before.

I also updated the unit tests to reflect my code changes.

Please let me know how it goes!

@zachradtka
Copy link
Contributor Author

@olivielpeau
I added the following metrics for the driver:

spark.driver.rdd_blocks
spark.driver.memory_used
spark.driver.disk_used
spark.driver.active_tasks
spark.driver.failed_tasks
spark.driver.completed_tasks
spark.driver.total_tasks
spark.driver.total_duration
spark.driver.total_input_bytes
spark.driver.total_shuffle_read
spark.driver.total_shuffle_write
spark.driver.max_memory

I added the following metrics to easily get the job, stage, and executor count:

spark.job.count
spark.stage.count
spark.executor.count

I also added the ability to add custom tags in the yaml file and required users to enter a cluster_id.

Let me know if there are any more requests.

self.service_check(SPARK_SERVICE_CHECK,
AgentCheck.OK,
tags=['url:%s' % am_address],
message='Connection to ApplicationMaster "%s" was successful' % rm_address)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably want am_address instead of rm_address here

@zachradtka
Copy link
Contributor Author

I added the cluster_id tag pulled from YARN and made the cluster_name optional in the YAML file. I also fixed an incorrectly name tag spark.memory_used -> spark.rdd.memory_used and added a tag to get the number of rdd's: spark.rdd.count.

@ascii766164696D
Copy link

When I specify cluster_name I get the following error:

2016-04-19 21:04:30,813 | ERROR | dd.collector | checks.spark(__init__.py:763) | Check 'spark' instance #1 failed
Traceback (most recent call last):
  File "/home/hadoop/dd-agent-spark/checks/__init__.py", line 746, in run
    self.check(copy.deepcopy(instance))
  File "/home/hadoop/dd-agent-spark/checks.d/spark.py", line 175, in check
    raise Exception('The ResourceManager URL must be specified in the instance configuration')
Exception: The ResourceManager URL must be specified in the instance configuration
Metrics:
[]
Events:
[]
Service Checks:
[{'check': 'spark.resource_manager.can_connect',
  'host_name': 'i-ab0d2330',
  'id': 1,
  'message': u'Connection to ResourceManager "http://ip-10-154-242-131.ec2.internal:8088" was successful',
  'status': 0,
  'tags': ['url:http://ip-10-154-242-131.ec2.internal:8088'],
  'timestamp': 1461099870.813359}]
Service Metadata:
[{}, {}]
    spark
    -----
      - instance #0 [OK]
      - instance #1 [ERROR]: 'The ResourceManager URL must be specified in the instance configuration'
        Traceback (most recent call last):
          File "/home/hadoop/dd-agent-spark/checks/__init__.py", line 746, in run
            self.check(copy.deepcopy(instance))
          File "/home/hadoop/dd-agent-spark/checks.d/spark.py", line 175, in check
            raise Exception('The ResourceManager URL must be specified in the instance configuration')
        Exception: The ResourceManager URL must be specified in the instance configuration

      - Collected 0 metrics, 0 events & 1 service check

Config file:

init_config:

instances:
  - resourcemanager_uri: http://ip-10-154-242-131.ec2.internal:8088
  - cluster_name: MySparkCluster

@zachradtka
Copy link
Contributor Author

@buryat

I just checked my setup and I was able to replicate your error and fix it by removing the - before cluster_name.

Try the following config:

init_config:

instances:
  - resourcemanager_uri: http://ip-10-154-242-131.ec2.internal:8088
    cluster_name: MySparkCluster

I am not the best at YAML, but I believe the - indicates a nested series, so anything at the same level would have to have the same indicator. If there is a way to clear this up or some convention you guys use for other agents, please let me know.

@ascii766164696D
Copy link

@zachradtka oh, I'm so sorry, that was mistake completely.
I confirm that it works

@olivielpeau
Copy link
Member

@buryat has some additional feedback on the stage metrics: they're currently useful to know the overall progress of the jobs but they don't capture very well the current state of the active stages since they're aggregated over all the stages (including the non-active stages).

To address this the check could send stage.active metrics that would capture the values of the active stages only, or tag the stage metrics with a state tag (in that case the histograms would be computed separately for each value taken by the state).

@zachradtka What do you think?

@zachradtka
Copy link
Contributor Author

@olivielpeau It would probably be best to tag the metric with the state. I am not sure what complexities that will add. I will look into it though.

- resourcemanager_uri: http://localhost:8088

# An optional friendly name can be specified for the cluster.
# cluster_name: MySparkCluster
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: could you add two spaces of indentation on these two lines (given that they're at the same level as the tags, i.e. at the instance level)

@zachradtka
Copy link
Contributor Author

@olivielpeau I made a few updates:

  • Required cluster_name in YAML and tagged all metrics with cluster_name
  • Removed cluster_id as a tag to eliminate problems with excessive tagging combos
  • Used increment for all of my spark.*.count metrics
  • Utilized increment for the spark.driver.* metrics. There should be only one driver per application.
  • Added status tags for the jobs and stages allowing a greater fidelity for metrics within a particular status.

self._set_metrics_from_json(tags, job, SPARK_JOB_METRICS)

if len(response):
self._set_metric('spark.job.count', INCREMENT, len(response), tags)
Copy link
Member

@olivielpeau olivielpeau Apr 22, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tags that are passed here are the tags of the last job of the loop above, so the status tag won't be accurate.

Instead we should either:

  • increment this count by 1 for every job, and with the status tag (in order to have a count per status)
  • increment this count by len(response) only once per app, and without the status tag (in order to have only a global count for all statuses)

I would go with option 1 since having a count per status sounds useful to me, but let me know what you think.

This applies to the spark.stage.count metric too.

Also, if possible it'd be good to have tests that reflect how different statuses on the jobs and stages should be counted.

@olivielpeau
Copy link
Member

@zachradtka Added in one comment on your latest changes

@zachradtka
Copy link
Contributor Author

@olivielpeau Thanks for the feedback, I should have caught the count with the incorrect tag.

That has been fixed and tests have been added.

@olivielpeau
Copy link
Member

@zachradtka Looks good to me, thanks! Could you squash your commits into one?

@zachradtka
Copy link
Contributor Author

@olivielpeau Commits squashed!

@olivielpeau
Copy link
Member

Merging! 🎉

@olivielpeau olivielpeau merged commit aa73259 into DataDog:master Apr 26, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants