Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AWS ECS Executor #34381

Merged
merged 62 commits into from
Oct 25, 2023
Merged

AWS ECS Executor #34381

merged 62 commits into from
Oct 25, 2023

Conversation

o-nikolas
Copy link
Contributor

Overview

Introducing the AWS ECS executor!

Over the past couple months myself, @ferruzzi and @syedahsn have been hard at work on an AWS ECS executor (if you've not seen us as much in the community, this is why 😅), based on an initial contribution from @aelzeiny

From the headline sentence in the README:

This is an Airflow executor powered by Amazon Elastic Container Service (ECS). Each task that Airflow schedules for execution is run within its own ECS container.

This is an initial release with most of the basic functionality in place. We have many future upgrades, CLI commands, and other features planned, so stay tuned!

Review Notes

This is certainly a large PR, which in some ways is not ideal. However, it can be difficult to release portions of a component like this. We've tried to scope all the code here to the minimum required for folks to begin using this executor.

When reviewing, I don't think it's required to read every single line of code. I have annotated most modules in the diff view with comments explaining the changes. Please take a look at the portions you think are relevant or that you have some experience in.

Ultimately, (almost) all the code is scoped to the Amazon provider package and it is a net new component, so there is a very limited blast radius. Very little existing user workflows or code should be affected. Included in the list below are the areas of existing code that have been updated, which have the possibility of affecting user workflows.

Some specific changes to pay particular attention to:

  • The new logging environment variable to enable task logs to work correctly in containerized executors. K8s has an approach for this, which we follow quite closely when making it generic. However, we did not convert K8s to this new mechanism to keep the change set minimal and reduce blast radius.
  • Updates to boto user agent tagging in the Base Aws hook.
  • A config yaml is now present in the AWS provider, leveraging @potiuk's changes, this is new code, but the first time we're leveraging this system so it's worth reviewing carefully

Testing

There is extensive unit testing which has near 100% line coverage in most cases:
Screenshot from 2023-09-14 11-43-48

We also performed a lot of manual UAT for things such as:

  • Load testing (500 concurrent tasks, reaching the limit imposed by ECS, see the performance and tuning section of the README)
  • Different platform versions of ECS Fargate
  • Multiple ways to provide AWS credentials to the executor (built into the image, using Airflow connection, etc)
  • Deferrable operators
  • Dynamic task mapping
  • Data Driven Scheduling

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

aelzeiny and others added 30 commits September 13, 2023 16:48
…eExecutor and BatchExecutor launches Airflow tasks on AWS ECS/Fargate service and the AWS Batch service, respectively.
Run static checks and make all the obvious fixes.   Some will still fail until we have a better understanding of the code.
I have not yet checked for coverage or quality. I did the minimum required to: convert the tests, make sure they still pass, and get static checks happy.
Also moved the Executor logic into __init__ for shorter and more logical import statements and updated relevant unit tests.
…references to ECS which meant EC2

Also made some minor docstring tweaks
We do not validate the format of the returned json anywhere else, removing for consistency.  It may not be a terrible idea to implement that universally at a later date.
There is an existing env var AIRFLOW_IS_K8S_EXECUTOR_POD which the
kubernetes executor and pod operator use to enable logging, among other
things. This CR adds a similar, generic, env var that new container
based executors can use.

Note: This change intentionally does _not_ deprecate AIRFLOW_IS_K8S_EXECUTOR_POD
since it is leveraged for other usecases.
Added conn_id and alphabetized the entries for easier use.
Boto3 is a particularly slow import, so import that locally.

Also move helper classes out of main executor module to utils. This
isn't strictly necessary for import speed optimization, but I prefer
this logical separation.
The logging section is the only thing flushed out so far
Also implemented AWS_CONN_ID config setting as it is used for the Hook.
Also include a bullet for updating the airflow.cfg
The subnet config test will fail when run with the other tests in the
module (e.g. just running pytest on the whole test module). This is
because python only loads a module once and the ecs config module runs
its code and sets its values at the module level, so it's behaviour is
unchanged for the remainder of the python session. So the module must be
reloaded to test any changes.
The attribute is expected to be a boto client, but was recently
changed to be a hook, which caused all api requests to fail. Updated the
attribute to be the Hook's conn (which is all we care about).

While I was there I moved all the intialization from start() to
__init__(). I don't see any reason we need to keep them separate
Also fixes wonky dependency on Region to make it more generic and added an extensive docstring explaining on why it is done this way.
If we don't catch exceptions in those interface methods, they bubble up
and kill the scheduler process itself.

Also catch any exception during the attempt to actually run tasks, if
there is any failure, catch it and log as well as add the task back to
the pending queue
Some issues included:
- Some config options which should be nested in embedded dicts/lists
  were left at the root of the run task kwargs dict (for example subnets)
- The run task kwargs "override" was being overridden by us, since name
  and command must be a specific value.
- Unit tests were not catching the above.
@ferruzzi
Copy link
Contributor

working on the merge conflict now

@o-nikolas
Copy link
Contributor Author

@jedcunningham @Taragolis @eladkal,

We've address the feedback and gotten the build green once again. If you folks have time to have another look and perhaps approve if all looks good that would be greatly appreciated.

We've noted the executor as experimental in the docs (thanks again for that suggestion @eladkal), so we can still iterate on this for a while. and we'd like to get the bulk of it merged soon.

Thanks!

Copy link
Contributor

@eladkal eladkal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@eladkal eladkal merged commit 5f4d2b5 into apache:main Oct 25, 2023
1 check passed
@eladkal
Copy link
Contributor

eladkal commented Oct 25, 2023

🎉 🎉 🎉 🎉 🎉 🎉

@Taragolis
Copy link
Contributor

Nice.

Still try to find time to actually try it 🤣

@potiuk
Copy link
Member

potiuk commented Oct 25, 2023

Nice!

@mshober
Copy link

mshober commented Nov 6, 2023

Hi @o-nikolas. Just noticed this PR today. I'd like to give my feedback on this.

I've been using a fork of @aelzeiny's ECS executor for my company which runs about 180k tasks per day on our Airflow environment. We don't use Fargate; it is far too slow and expensive for the amount of tasks that we run. The original ECS Executor was very coupled to Fargate so much of the work I did was related to removing the Fargate specifics.

I'd love to use Airflow's official ECS Executor instead of having to support my own, but the current implementation is not suitable for my use case (and likely others).

Here's a few features that are must-haves for me:

Supporting Capacity Providers

The ECS Executor in this PR does not support Capacity Providers. It requires that users specify a LaunchType, and if you're specifying a LaunchType then you cannot specify a Capacity Provider.

Using Capacity Providers is essential for organizations that use EC2-hosted ECS Clusters. If I can recall correctly, running tasks with EC2 LaunchType will result in launch failures if there is not enough capacity to run them, whereas tasks ran with Capacity Providers will go into a Provisioning state until there is enough capacity available to run the task. Capacity Providers are also very cost efficient since you can use them to dynamically scale your EC2 instances based on demand.

Overriding Additional ECS Task Properties

The executor config is scoped to overrides.containerOverrides. However there are relevant properties outside of overrides.containerOverrides that users may want to change.

For example, our ECS Cluster is actually composed of 3 capacity providers: A General-Purpose Capacity Provider (which is our cluster's default provider and runs on M7g instances), Memory-Optimized (R7g instances) and Compute-Optimized (C7g instances). My version of the ECS Executor allows users to set the appropriate Capacity Provider via the operator's executor_config param so that we can run our jobs in the most cost-efficient environment.

There are several other properties which airflow uses may want to set on a task-level, such as:

Adopting Task Instances

This is a must-have for us, as our deployments replace our scheduler instances.

There is a PR for that feature in @aelzeiny's executor. I had to make some changes to get that working properly. I can assist on a PR for this feature.

Increasing Throughput

The ECS Executor calls the ECS RunTask API sequentially. On our current environment, this leads to a maximum throughput of roughly 4 tasks launched per second per scheduler instance. This can cause issues for larger airflow environments like my own, for example:

  • During peak times tasks often spend a long period in the scheduled state despite there being available capacity in the environment.
  • Larger values for max_tis_per_query can lead to missed heartbeats from the length of time the Executor is spent calling the RunTask API.

I haven't had a chance to implement an improvement for this in my own executor yet, but my thinking was to incorporate the same sync_parallelism logic that is currently used for the CeleryExecutor.

@Taragolis
Copy link
Contributor

@mshober

This executor is experimental which will work with Airflow 2.8 (not released yet)

I'd love to use Airflow's official ECS Executor instead of having to support my own, but the current implementation is not suitable for my use case (and likely others).

The nice part that this Executor is a part of community provider, so everyone could propose the changes and directly contribute improvements by making a PR

@o-nikolas
Copy link
Contributor Author

I'd love to use Airflow's official ECS Executor instead of having to support my own, but the current implementation is not suitable for my use case (and likely others).

Hey thanks for the feedback! We're working on adding more features to this executor and we welcome any PRs for code changes that you've made which you find are working well for you and your organization :)

Supporting Capacity Providers

The ECS Executor in this PR does not support Capacity Providers. It requires that users specify a LaunchType, and if you're specifying a LaunchType then you cannot specify a Capacity Provider.

This is a great request, it should be a good first issue, I'll cut a Github Issue for it. If you have code for it, feel free to submit a PR

The executor config is scoped to overrides.containerOverrides. However there are relevant properties outside of overrides.containerOverrides that users may want to change.

For example, our ECS Cluster is actually composed of 3 capacity providers: A General-Purpose Capacity Provider (which is our cluster's default provider and runs on M7g instances), Memory-Optimized (R7g instances) and Compute-Optimized (C7g instances). My version of the ECS Executor allows users to set the appropriate Capacity Provider via the operator's executor_config param so that we can run our jobs in the most cost-efficient environment.

This is also a good request, we should make it while the executor is in Experimental mode and we can still change that behaviour easily. I'll cut a Github Issue for it. If you have code for it, feel free to submit a PR

Adopting Task Instances

This is a must-have for us, as our deployments replace our scheduler instances.

There is a PR for that feature in @aelzeiny's executor. I had to make some changes to get that working properly. I can assist on a PR for this feature.

Would definitely appreciate a PR!

Increasing Throughput

The ECS Executor calls the ECS RunTask API sequentially. On our current environment, this leads to a maximum throughput of roughly 4 tasks launched per second per scheduler instance. This can cause issues for larger airflow environments like my own, for example:

* During peak times tasks often spend a long period in the scheduled state despite there being available capacity in the environment.

* Larger values for `max_tis_per_query` can lead to missed heartbeats from the length of time the Executor is spent calling the RunTask API.

I haven't had a chance to implement an improvement for this in my own executor yet, but my thinking was to incorporate the same sync_parallelism logic that is currently used for the CeleryExecutor.

Indeed some performance tuning can be done with some of the nobs of Airflow. I personally have gotten some good results scheduling 500 tasks in a few minutes by increasing the max_tis_per_query and relaxing the scheduler heartbeat a little, as well as some other configs. But to get double digit tasks scheduled per second for those very very large scale deployments will indeed likely need some code changes to the executor (again PRs welcome 😀).

@mshober
Copy link

mshober commented Nov 10, 2023

Thanks so much for your feedback @o-nikolas.

I'll hopefully have some time this weekend to tackle some of those tasks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:CLI area:logging area:production-image Production image improvements and fixes area:providers provider:amazon AWS/Amazon - related issues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants