-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Two-token mechanism for task execution to prevent token expiration while tasks wait in executor queues #60108
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Two-token mechanism for task execution to prevent token expiration while tasks wait in executor queues #60108
Conversation
b183c74 to
9c31417
Compare
c707ddc to
4ef9dfe
Compare
|
As per my understanding this was removed in #55506 to use a middleware that refreshes token. Are you running an instance with execution api only separately with api-server? Could this middleware approach be extended for task-sdk calls too? |
|
Hi @tirkarthi, I took a stab at extending that pattern in #60197, handling expired tokens transparently in JWTBearer + middleware so no client-side changes are needed. Would love your thoughts on it. Totally happy to go with whichever approach the team feels is better! |
Would love to hear @ashb or @amoghrajesh 's opinion on this one |
ashb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't do this approach. It lets any Execution API token be resurrected which fundamentally breaks lots of security assumptions -- it amounts to having tokens not expire. That is bad.
Instead what we should do is generate a new token (i.e. ones with extra/different set of JWT claims) that is only valid for the /run endpoint and valid for longer (say 24hours, make it configurable) and this is what gets sent in the workload.
The run endpoint then would set the header to give the running task a "short lived" token (the one we have right now basically) that is usable on the rest of the Execution API. This approach is safer as the existing controls in the /run endpoint already prevent a task being run one than once, which should also prevent against "resurrecting" an expired token and using it to access things like connections etc. And we should validate that the token used on all endpoints but run is explicitly lacking this new claim.
4ef9dfe to
b32da6b
Compare
14a516a to
5915391
Compare
ashb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Much better approach, and on the right track, thanks.
Some changes though:
-
"queue" is not the right thing to use, as these tokens could be used for executing other workloads soon (for instance we have already talked about wanting Dag level callbacks to be executed on the workers, not in the dag processor, which would be done by having a new type from the ExecuteTaskWorkload).
so maybe we have
"scope": "ExecuteTaskWorkload"? -
A little bit of refactoring is needed before we are ready to merge this.
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
Outdated
Show resolved
Hide resolved
e7e3ae1 to
e879863
Compare
b511b8f to
57ac225
Compare
|
Please ping me directly when you want me to re-review this |
|
Hi @ashb Thank you so much for taking the time to review this PR and for your valuable guidance. I truly appreciate the feedback. I've made the updates based on your suggestions, and the PR is ready for another look at your convenience, whenever you have time works perfectly. Thanks again for your patience and support! |
| logger.debug( | ||
| "`api_auth/jwt_secret` is not set, generating a temporary one for in-process execution" | ||
| ) | ||
| conf.set("api_auth", "jwt_secret", urlsafe_b64encode(os.urandom(16)).decode()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
os.urandom shouldn't be used for anything security related -- please use https://docs.python.org/3/library/secrets.html, likely secrets.token_urlsafe(16)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That said, I'm a bit surprised we need to to this at all here -- isn't this done elsewhere already?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From my understanding, this is only for InProcessExecutionAPI which is used when running tasks without a separate API server - it generates a temporary secret if none is configured. I'm still exploring this part of the codebase, so please do correct me if I've misunderstood something.
| from airflow.api_fastapi.execution_api.deps import _container | ||
|
|
||
| class InProcessContainer: | ||
| """Minimal container for in-process execution, bypassing svcs lifecycle.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why can't we use svcs? Why do we need to implement our version of it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still getting familiar with the codebase, but from what I understand, I added this to fix ServiceNotFoundError failures in CI. With InProcessExecutionAPI, the svcs lifespan runs later (when transport is accessed), but services like JWTGenerator are needed before that. This container bypasses the lifecycle and returns pre-created instances from app.state. I may well be missing something - if there's a cleaner pattern you'd recommend, I'd really appreciate the guidance.
|
|
||
| JWTBearerWorkloadDep = Depends(JWTBearerWorkloadScope(path_param_name="task_instance_id")) | ||
|
|
||
| ti_run_router = VersionedAPIRouter(dependencies=[JWTBearerWorkloadDep]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't need this extra router, when we already have router, and we can then declare the JWTBearerWorkloadDep on the path
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sold yet on the way we are handling the dependencies/mutliple routers etc. Let me have a look at what options we have in FastAPI as i feel this should be possible in a nicer manner.
I'm seeing if we can make use of https://fastapi.tiangolo.com/advanced/security/oauth2-scopes/ or something like it.
0e19b1e to
a336f6e
Compare
That sounds like a great approach! I'd be really grateful if you could share your findings on FastAPI OAuth2 scopes - happy to refactor based on your recommendations. Thank you so much for taking the time to look into this! |
Tasks waiting in Celery queue may have their JWT tokens expire before execution starts.
Implements a two-token mechanism for task execution to prevent token expiration while tasks wait in executor queues.
Changes
Fixes: #53713
Summary
Fixes #59553 - Tasks waiting in Celery queue fail when JWT tokens expire before execution starts.
Implements a two-token mechanism for task execution to prevent token expiration while tasks wait in executor queues.
Changes
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.