-
Notifications
You must be signed in to change notification settings - Fork 14.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add an Amazon EMR on EKS provider package #16766
Conversation
Digging into the failures. 🙃 |
All checks passing - will do a squash after I address any review comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments, I'm still struggling with correctly formatting my docs rst files so I did not review that.
airflow/providers/amazon/aws/example_dags/example_emr_eks_job.py
Outdated
Show resolved
Hide resolved
airflow/providers/amazon/aws/example_dags/example_emr_eks_job.py
Outdated
Show resolved
Hide resolved
airflow/providers/amazon/aws/example_dags/example_emr_eks_job.py
Outdated
Show resolved
Hide resolved
1b62666
to
7e94e98
Compare
Trying to get all the tests to green - some of the (unrelated) integration tests are timing out for some reason. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like my concerns were addressed.
3a34e3d
to
26985ff
Compare
airflow/providers/amazon/aws/example_dags/example_emr_eks_job.py
Outdated
Show resolved
Hide resolved
|
||
# TODO: Make this logic a little bit more robust. | ||
# Currently this polls until the state is *not* one of the INTERMEDIATE_STATES | ||
# While that should work in most cases...it might not. :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you tell me a little more about it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After thinking about this a little bit more, I think my concern was more about the logic here solely relying on the INTERMEDIATE_STATES
.
What that means is if the API ever changes (not likely), the logic here could break. I think the only change I would make here would be a more explicit check if the query_state
is actually in a completed state...but that's the current logic anyway because there's either None
state, INTERMEDIATE
state, or COMPLETED
state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Opened followup task #19877
We currently have one operator that allows us to run Spark job on Kubernetes. It works with both EKS and GCP as well as any other Kubernetes platform. - SparkKubernetesOperator. Why would anyone use this operator instead of the generic operator for Kubernetes? |
This one is specifically for the Amazon EMR runtime. AWS customers today run EMR on EC2, but EMR on EKS was introduced last year as a way to run EMR jobs on EKS. In addition to simply running Spark jobs, it also provides the EMR Spark runtime, hosted Spark UI, integration with EMR Studio, and additional functionality like S3/CloudWatch logging, pod templates, and custom containers based on the EMR runtime. You can see more about what EMR on EKS provides here: https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/emr-eks.html |
@mik-laj I updated the documentation to help folks understand what EMR on EKS is and how to use the operator. Let me know what you think! |
while True: | ||
query_state = self.check_query_status(job_id) | ||
if query_state is None: | ||
self.log.info("Try %s: Invalid query state. Retrying again", try_number) | ||
elif query_state in self.INTERMEDIATE_STATES: | ||
self.log.info("Try %s: Query is still in an intermediate state - %s", try_number, query_state) | ||
else: | ||
self.log.info("Try %s: Query execution completed. Final state is %s", try_number, query_state) | ||
final_query_state = query_state | ||
break | ||
if max_tries and try_number >= max_tries: # Break loop if max_tries reached | ||
final_query_state = query_state | ||
break | ||
try_number += 1 | ||
sleep(poll_interval) | ||
return final_query_state |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you thought about jobs that are not expected to terminate in a relatively short time? If I submit a streaming job for spark using this operator, then my job needs to be running for a longer time. Do you need to implement some kind of backoff-algorithm based checks on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wanderijames That's a good point. I don't know if we need to implement a backoff, we do already have the option to change the poll interval. But that's also where I think your PR is nice in that it has the operator to just start the job.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool. Thanks
@ferruzzi @o-nikolas Any opinions on this? |
… and hook for running Spark jobs on EMR on EKS as well as docs and an example DAG
62e5610
to
ff67841
Compare
@ashb Thanks for pinging folks - I just pushed one more update with a little documentation cleanup. |
@dacort Docs have some issues. Can you look at it? |
@mik-laj It doesn't look like the build failures are related to the PR? The errors are in the |
@mik-laj Looks like they succeeded after a rebase! |
The PR is likely OK to be merged with just subset of tests for default Python and Database versions without running the full matrix of tests, because it does not modify the core of Airflow. If the committers decide that the full tests matrix is needed, they will add the label 'full tests needed'. Then you should rebase to the latest main or amend the last commit of the PR, and push it with --force-with-lease. |
Adds a new provider package in
amazon/aws
.EMR on EKS is a new deployment model for EMR that allows you to run Spark jobs on EKS.
This package adds a new operator, sensor, and hook for running and monitoring the jobs as well as docs and an example DAG.
I've tested this locally on the latest
main
branch using breeze.