Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Raise import error if a task uses a non-existent pool #21829

Closed
wants to merge 4 commits into from

Conversation

ephraimbuddy
Copy link
Contributor

Closes: #20788

This PR proposes to raise import error if a task in a DAG is using
a non-existent pool. This will free the scheduler from continuously
trying to schedule a task with a non-existing pool


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.

@boring-cyborg boring-cyborg bot added the area:Scheduler including HA (high availability) scheduler label Feb 26, 2022
airflow/models/dag.py Outdated Show resolved Hide resolved
airflow/models/dag.py Outdated Show resolved Hide resolved
@@ -43,7 +43,7 @@ def fail():
dag1_task1 = DummyOperator(
task_id='test_backfill_pooled_task',
dag=dag1,
pool='test_backfill_pooled_task_pool',
pool='default_pool',
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the name of the pool is relevant here, so I changed this to an existing pool name. With git blame, I saw that when this file was created(#1271), there was no default_pool for tasks.

Happy to hear your thoughts on this

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyone knows the context when #1225 was filed? Was it possible to schedule a task without a pool back then?

@@ -2625,6 +2625,17 @@ def validate_schedule_and_params(self):
"DAG Schedule must be None, if there are any required params without default values"
)

@provide_session
def validate_task_pools(self, session=NEW_SESSION):
Copy link
Member

@uranusjr uranusjr Feb 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets annotate this? Also probably want to add :meta private: so this does not become public API.

airflow/models/dag.py Outdated Show resolved Hide resolved
ephraimbuddy and others added 3 commits February 28, 2022 18:36
This PR proposes to raise import error if a task in a DAG is using
a non-existent pool. This will free the scheduler from continuously
trying to schedule a task with a non-existing pool

fixup! Raise import error if a task uses a non-existent pool

fixup! Raise import error if a task uses a non-existent pool
Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>
Copy link
Contributor

@dstandish dstandish left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a good idea to improve messaging around this but I'm not sure the best way. I think that ideally we allow the dag to exist, but maybe just bubble up a "flash" message alerting with the offending dag and task.

One reason is, if you are developing locally, you might want to use airflow dags list to verify your dag parses ok. Or you might want to run airflow dags test or airflow tasks test to run your task, without creating the pool. And what if perhaps you remove the pool to "temporarily disable" a set of tasks. Maybe in this case it's better to alert about "misconfiguration" but allow the dag to remain in the system.

WDYT?

@ReadytoRocc might also have helpful thoughts here, I know he has thought a bit about pools.

Related: maybe we should also warn if the configured pool has size zero?

airflow/models/dag.py Outdated Show resolved Hide resolved
@ephraimbuddy
Copy link
Contributor Author

I think it's a good idea to improve messaging around this but I'm not sure the best way. I think that ideally we allow the dag to exist, but maybe just bubble up a "flash" message alerting with the offending dag and task.

The problem with the dag existing is that the scheduler would keep on trying to queue it. If you check the linked issue, it blocks some other tasks from being queued.

One reason is, if you are developing locally, you might want to use airflow dags list to verify your dag parses ok. Or you might want to run airflow dags test or airflow tasks test to run your task, without creating the pool. And what if perhaps you remove the pool to "temporarily disable" a set of tasks. Maybe in this case it's better to alert about "misconfiguration" but allow the dag to remain in the system.

I thought about not creating dagrun if a task has wrong pool but it looks like raising error at that point could keep the scheduler crash looping. Also, since by default, if a task doesn't have a pool, it's assigned the default_pool, I think it's OK to raise import error if a task has a wrong pool.
This is something that can cause issues for the scheduler and it is a user error, that's why I think raising the error is better.

Co-authored-by: Daniel Standish <15932138+dstandish@users.noreply.github.com>
@dstandish
Copy link
Contributor

I think it's a good idea to improve messaging around this but I'm not sure the best way. I think that ideally we allow the dag to exist, but maybe just bubble up a "flash" message alerting with the offending dag and task.

The problem with the dag existing is that the scheduler would keep on trying to queue it. If you check the linked issue, it blocks some other tasks from being queued.

Ah i see. Well, alternatively we could amend the query to filter on pool in pools or join to pools table.

One reason is, if you are developing locally, you might want to use airflow dags list to verify your dag parses ok. Or you might want to run airflow dags test or airflow tasks test to run your task, without creating the pool. And what if perhaps you remove the pool to "temporarily disable" a set of tasks. Maybe in this case it's better to alert about "misconfiguration" but allow the dag to remain in the system.

I thought about not creating dagrun if a task has wrong pool but it looks like raising error at that point could keep the scheduler crash looping.

I'm not sure we'd need to throw an error there. Couldn't we just not create the dag run? Or in any case, if we filter on "has existent pool" in the query above, it wouldn't even be considered for scheduling.

Perhaps alongside import_errors we could add an attribute configuration_errors or something to DagBag and then use this to bubble up a flash alert like here.

If we had a configuration_errors thing, this would not be a hard error but something we'd want to warn user about. In this scenario we could also warn if pool has size 0. Thinking out loud a bit here.

Also, since by default, if a task doesn't have a pool, it's assigned the default_pool, I think it's OK to raise import error if a task has a wrong pool. This is something that can cause issues for the scheduler and it is a user error, that's why I think raising the error is better.

Yeah I'm just thinking about the local developer experience.

idea

You know, maybe another way to this concern while keeping your approach would be something like AIRFLOW__SCHEDULER__IGNORE_POOLS which you could set True for local dev and then this would be non-issue for local dev. Then locally the dag parses, and it runs, and no need to worry about pools.

another idea

What if instead of failing in this scenario, we create the pool with default size = 10 or something?

@dstandish
Copy link
Contributor

dstandish commented Mar 2, 2022

Other comment along same theme as my other comments, at prev company I had unit tests testing "all dags parse without error". If we make this change, then we'd have to add a step to our CI testing process to create the pools (and recreate them locally to run tests locally). Small thing but might but could be inconvenient for folks. Disableable pools (or autocreate pools) would avoid this.

@ephraimbuddy
Copy link
Contributor Author

Ah i see. Well, alternatively we could amend the query to filter on pool in pools or join to pools table.

In this case, the user won't know why the scheduler is not scheduling the task with non-existent pool.

I'm not sure we'd need to throw an error there. Couldn't we just not create the dag run? Or in any case, if we filter on "has existent pool" in the query above, it wouldn't even be considered for scheduling.

The Dag would keep coming up for scheduler to create dagrun even when ignored and would block other eligible dags from creating dagruns

Perhaps alongside import_errors we could add an attribute configuration_errors or something to DagBag and then use this to bubble up a flash alert like here.

If we had a configuration_errors thing, this would not be a hard error but something we'd want to warn user about. In this scenario we could also warn if pool has size 0. Thinking out loud a bit here.

We currently have import error is pool size is less than 0, see here. That's still why I think we should have this as import error. I could have verified the pool name below the above verification but because we still commit when we use provide_session, the scheduler would crash.

I may not be understanding fully well the idea you are proposing. If you don't mind, you can create a PR so we can compare the two?

@dstandish
Copy link
Contributor

dstandish commented Mar 3, 2022

I may not be understanding fully well the idea you are proposing. If you don't mind, you can create a PR so we can compare the two?

I'm just concerned that making this an import error will negatively impact developer experience, and I'm trying to help think about alternatives that achieve similar goals. Because I think locally it's common you won't have all the pools, so it feels ilke it should be a warning of some kind but not a hard error (and if you just do a warning, presumably the concerns in the scheduler can be addressed by modifying queries to filter out TIs with bad pools).

But I think (1) being able to disable consideration (in the scheduler) of pools would make this work. And another thing that I think would work, and which would be really nice in general, would be (2) to make it so you didn't need to create the pools in the db e.g. if they could be defined in airflow.cfg or if you could do AIRFLOW_POOL_MY_POOL=1. With either or both of these in place, I think raising would be ok. It's easier to manage for local developers than having to add the pools to the db, and it would not be affected negatively by resetting the db.

@ephraimbuddy
Copy link
Contributor Author

I'm just concerned that making this an import error will negatively impact developer experience, and I'm trying to help think about alternatives that achieve similar goals. Because I think locally it's common you won't have all the pools, so it feels ilke it should be a warning of some kind but not a hard error (and if you just do a warning, presumably the concerns in the scheduler can be addressed by modifying queries to filter out TIs with bad pools).

I understand and I'm not arguing about it.

But I think (1) being able to disable consideration (in the scheduler) of pools would make this work. And another thing that I think would work, and which would be really nice in general, would be (2) to make it so you didn't need to create the pools in the db e.g. if they could be defined in airflow.cfg or if you could do AIRFLOW_POOL_MY_POOL=1. With either or both of these in place, I think raising would be ok. It's easier to manage for local developers than having to add the pools to the db, and it would not be affected negatively by resetting the db.

I'll spend some time on it today to make a different PRs based on the suggestions

@ashb
Copy link
Member

ashb commented Mar 3, 2022

The local development story could be (partially?) fixed by having dags test or tasks test not care about pools, but "normal" parsing should care and error?

@ephraimbuddy
Copy link
Contributor Author

ephraimbuddy commented Mar 3, 2022

The local development story could be (partially?) fixed by having dags test or tasks test not care about pools, but "normal" parsing should care and error?

Just saw that dags test requires a pool. That if the pool is not existing, the task is not run. It fails at

pool = session.query(models.Pool).filter(models.Pool.pool == task.pool).first()
if not pool:
raise PoolNotFound(f'Unknown pool: {task.pool}')

tasks test also gives the error that dependencies are not met if the pool is not existing:

[2022-03-03 15:51:31,106] {taskinstance.py:1044} INFO - Dependencies not met for <TaskInstance: example_bash_operator.run_this_last __airflow_in_memory_dagrun__ [None]>, dependency 'Pool Slots Available' FAILED: ("Tasks using non-existent pool '%s' will not be scheduled", 'test-pool')

Tested on main

However, dags list is different. It shows all dags even if they have wrong pool

@github-actions
Copy link

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Apr 18, 2022
@ephraimbuddy ephraimbuddy deleted the nonexistent-pool branch April 21, 2022 22:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:Scheduler including HA (high availability) scheduler stale Stale PRs per the .github/workflows/stale.yml policy file
Projects
None yet
Development

Successfully merging this pull request may close these issues.

AIrflow Scheduler does not schedule any tasks when >max running tasks queued with non-existant pool
5 participants