Skip to content

Conversation

@jscheffl
Copy link
Contributor

@jscheffl jscheffl commented Oct 7, 2025

So far the Edge worker running remote is using a single threaded loop to fetch, process, monitor tasks and upload results logs as well as heartbeat. This limits the parallelism achieveable on a Edge Worker - how many tasks can be handled in parallel.

With this PR a major refactor is made to use Python AsyncIO for handling of tasks in order to improve concurrency with many tasks.

Note: Before merging this needs to have proper review and testing as a lot of logic and libraries change with the risk of degraded quality/stability as of implementation glitched. UPDATE: WIP status left, did a lot of (local) testing and I think it is ready now.

Tested with 100 concurrent tasks (mostly sleeping) on my machine and worker used only 10-20% of CPU (compared to 10GB RAM all the task needed). So with the re-implementation in AsyncIO the worker scales very much more than before where I considered 10-15 tasks can be handled.

FYI @dheerajturaga - As also being a user, committer status still pending but looking forward for a review!
@dabla As you do a lot of AsyncIO on your side, looking forward for a review from you as well!
@AutomationDev85 - Would also like your feedback!

@boring-cyborg boring-cyborg bot added area:providers provider:edge Edge Executor / Worker (AIP-69) / edge3 labels Oct 7, 2025
@jscheffl jscheffl force-pushed the feature/make-edge-worker-using-async-loop branch from 16dce9a to e5de8ad Compare October 12, 2025 00:14
@jscheffl jscheffl force-pushed the feature/make-edge-worker-using-async-loop branch from e5de8ad to 5a0ef71 Compare November 11, 2025 21:13
@dabla
Copy link
Contributor

dabla commented Dec 12, 2025

That will be a nice improvement @jscheffl. Looking forward to this!

@dabla
Copy link
Contributor

dabla commented Dec 12, 2025

I've just checked the code and I saw maybe a possible "improvement" regarding following code in worker:

if worker_info.state == EdgeWorkerState.MAINTENANCE_REQUEST:
                logger.info("Maintenance mode requested!")
                EdgeWorker.maintenance_mode = True
            elif (
                worker_info.state in [EdgeWorkerState.IDLE, EdgeWorkerState.RUNNING]
                and EdgeWorker.maintenance_mode
            ):
                logger.info("Exit Maintenance mode requested!")
                EdgeWorker.maintenance_mode = False
            if EdgeWorker.maintenance_mode:
                EdgeWorker.maintenance_comments = worker_info.maintenance_comments
            else:
                EdgeWorker.maintenance_comments = None
            if worker_info.state == EdgeWorkerState.SHUTDOWN_REQUEST:
                logger.info("Shutdown requested!")

We could encapsulate the checks of the state in the WorkerInfo dataclass through properties, then above code would be easier to read and to test also, as then you can test the check of state directly in the WorkerInfo dataclass. So I would add following properties in WorkerInfo dataclass:

@property
def is_maintenance(self) -> bool:
     return self.state == EdgeWorkerState.MAINTENANCE_REQUEST

@property            
def is_running_or_idle(self) -> bool:
     return self.state in [EdgeWorkerState.IDLE, EdgeWorkerState.RUNNING]

@property
def is_shutdown(self) -> bool:
     return self.state == EdgeWorkerState.SHUTDOWN_REQUEST

Then you could rewrite following as below and add dedicated tests in WorkerInfo dataclass for above properties:

if worker_info.is_maintenance:
                logger.info("Maintenance mode requested!")
                EdgeWorker.maintenance_mode = True
            elif (
                worker_info.is_running_or_idle
                and EdgeWorker.maintenance_mode
            ):
                logger.info("Exit Maintenance mode requested!")
                EdgeWorker.maintenance_mode = False
            if EdgeWorker.maintenance_mode:
                EdgeWorker.maintenance_comments = worker_info.maintenance_comments
            else:
                EdgeWorker.maintenance_comments = None
            if worker_info.is_shutdown:
                logger.info("Shutdown requested!")

WDYT?

@jscheffl
Copy link
Contributor Author

WDYT?

Yes, the code is not perfect and the state management has grown over time. I am also not 100% happy about it, if you have a good idea and some time... like with all: contributions are welcome.

@jscheffl jscheffl force-pushed the feature/make-edge-worker-using-async-loop branch 5 times, most recently from ad8fc74 to 53921f2 Compare January 3, 2026 22:38
@dabla
Copy link
Contributor

dabla commented Jan 6, 2026

Really like the refactorings being done here, looking forward to test it once it's done ;-)

@jscheffl
Copy link
Contributor Author

jscheffl commented Jan 6, 2026

Almost ready to review, need to hunt for one bug that I saw (and had no time, was distracted by family) hope I can make it ready for review by EOB today.

@jscheffl jscheffl force-pushed the feature/make-edge-worker-using-async-loop branch from 53921f2 to 507ba20 Compare January 6, 2026 20:37
@jscheffl jscheffl requested a review from dabla January 6, 2026 20:40
@jscheffl jscheffl marked this pull request as ready for review January 6, 2026 20:41
@dheerajturaga
Copy link
Member

@jscheffl Im unable to login to airflow on this branch. I lauched airflow with breeze start-airflow -d -e --executor EdgeExecutor and im seeing this error

2026-01-07T12:02:29.527089Z [error    ] JWT token is not valid: Not enough segments [airflow.api_fastapi.auth.managers.base_auth_manager] loc=base_auth_manager.py:114
INFO:     172.18.0.1:39458 - "POST /auth/token HTTP/1.1" 500 Internal Server Error
ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File "/usr/python/lib/python3.10/site-packages/uvicorn/protocols/http/httptools_impl.py", line 416, in run_asgi
    result = await app(  # type: ignore[func-returns-value]
  File "/usr/python/lib/python3.10/site-packages/fastapi/applications.py", line 1135, in __call__
    await super().__call__(scope, receive, send)
  File "/usr/python/lib/python3.10/site-packages/starlette/applications.py", line 107, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/usr/python/lib/python3.10/site-packages/starlette/middleware/errors.py", line 186, in __call__
    raise exc
  File "/usr/python/lib/python3.10/site-packages/starlette/middleware/errors.py", line 164, in __call__
    await self.app(scope, receive, _send)
  File "/usr/python/lib/python3.10/site-packages/starlette/middleware/gzip.py", line 29, in __call__
    await responder(scope, receive, send)
  File "/usr/python/lib/python3.10/site-packages/starlette/middleware/gzip.py", line 130, in __call__
    await super().__call__(scope, receive, send)
  File "/usr/python/lib/python3.10/site-packages/starlette/middleware/gzip.py", line 46, in __call__
    await self.app(scope, receive, self.send_with_compression)
  File "/usr/python/lib/python3.10/site-packages/starlette/middleware/base.py", line 198, in __call__
    raise app_exc
  File "/usr/python/lib/python3.10/site-packages/starlette/middleware/base.py", line 144, in coro
    await self.app(scope, receive_or_disconnect, send_no_error)
  File "/usr/python/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 63, in __call__
    await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
  File "/usr/python/lib/python3.10/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
    raise exc
  File "/usr/python/lib/python3.10/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app
    await app(scope, receive, sender)
  File "/usr/python/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
    await self.app(scope, receive, send)
  File "/usr/python/lib/python3.10/site-packages/starlette/routing.py", line 716, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/usr/python/lib/python3.10/site-packages/starlette/routing.py", line 736, in app
    await route.handle(scope, receive, send)
  File "/usr/python/lib/python3.10/site-packages/starlette/routing.py", line 462, in handle
    await self.app(scope, receive, send)
  File "/usr/python/lib/python3.10/site-packages/fastapi/applications.py", line 1135, in __call__
    await super().__call__(scope, receive, send)
  File "/usr/python/lib/python3.10/site-packages/starlette/applications.py", line 107, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/usr/python/lib/python3.10/site-packages/starlette/middleware/errors.py", line 186, in __call__
    raise exc
  File "/usr/python/lib/python3.10/site-packages/starlette/middleware/errors.py", line 164, in __call__
    await self.app(scope, receive, _send)
  File "/usr/python/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 63, in __call__
    await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
  File "/usr/python/lib/python3.10/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
    raise exc
  File "/usr/python/lib/python3.10/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app
    await app(scope, receive, sender)
  File "/usr/python/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
    await self.app(scope, receive, send)
  File "/usr/python/lib/python3.10/site-packages/starlette/routing.py", line 716, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/usr/python/lib/python3.10/site-packages/starlette/routing.py", line 736, in app
    await route.handle(scope, receive, send)
  File "/usr/python/lib/python3.10/site-packages/starlette/routing.py", line 290, in handle
    await self.app(scope, receive, send)
  File "/usr/python/lib/python3.10/site-packages/fastapi/routing.py", line 115, in app
    await wrap_app_handling_exceptions(app, request)(scope, receive, send)
  File "/usr/python/lib/python3.10/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
    raise exc
  File "/usr/python/lib/python3.10/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app
    await app(scope, receive, sender)
  File "/usr/python/lib/python3.10/site-packages/fastapi/routing.py", line 101, in app
    response = await f(request)
  File "/usr/python/lib/python3.10/site-packages/fastapi/routing.py", line 345, in app
    solved_result = await solve_dependencies(
  File "/usr/python/lib/python3.10/site-packages/fastapi/dependencies/utils.py", line 643, in solve_dependencies
    solved = await call(**solved_result.values)
  File "/opt/airflow/airflow-core/src/airflow/api_fastapi/auth/managers/simple/utils.py", line 34, in parse_login_body
    body = await request.json()
  File "/usr/python/lib/python3.10/site-packages/starlette/requests.py", line 251, in json
    self._json = json.loads(body)
  File "/usr/python/lib/python3.10/json/__init__.py", line 346, in loads
    return _default_decoder.decode(s)
  File "/usr/python/lib/python3.10/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/python/lib/python3.10/json/decoder.py", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
2026-01-07T12:02:29.543434Z [error    ] JWT token is not valid: Not enough segments [airflow.api_fastapi.auth.managers.base_auth_manager] loc=base_auth_manager.py:114
INFO:     172.18.0.1:39466 - "GET / HTTP/1.1" 200 OK
2026-01-07T12:02:29.713162Z [error    ] JWT token is not valid: Not enough segments [airflow.api_fastapi.auth.managers.base_auth_manager] loc=base_auth_manager.py:114
2026-01-07T12:02:29.713782Z [error    ] JWT token is not valid: Not enough segments [airflow.api_fastapi.auth.managers.base_auth_manager] loc=base_auth_manager.py:114
INFO:     172.18.0.1:39466 - "GET /ui/config HTTP/1.1" 403 Forbidden
2026-01-07T12:02:29.727864Z [error    ] JWT token is not valid: Not enough segments [airflow.api_fastapi.auth.managers.base_auth_manager] loc=base_auth_manager.py:114
INFO:     172.18.0.1:39466 - "GET /api/v2/auth/login?next=http%3A%2F%2Flocalhost%3A28080%2F HTTP/1.1" 307 Temporary Redirect
2026-01-07T12:02:29.733658Z [error    ] JWT token is not valid: Not enough segments [airflow.api_fastapi.auth.managers.base_auth_manager] loc=base_auth_manager.py:114

@jscheffl jscheffl requested a review from dheerajturaga January 7, 2026 18:30
@jscheffl
Copy link
Contributor Author

jscheffl commented Jan 7, 2026

aaah @dheerajturaga now I am able to add you as reviewer.. seems the write permissions have landed! Cool!

@jscheffl jscheffl force-pushed the feature/make-edge-worker-using-async-loop branch from fe8bca1 to f0bf876 Compare January 7, 2026 21:11
@jscheffl
Copy link
Contributor Author

jscheffl commented Jan 7, 2026

@jscheffl Im unable to login to airflow on this branch. I lauched airflow with breeze start-airflow -d -e --executor EdgeExecutor and im seeing this error

Actually I can confirm I had the same. Switched to FabAuthManager and then it worked. Somehow I was not able to reproduce this in main and therefore rebased my PR to latest main. Not able to get the error again. And I am not sure how this can be produced - for sure not with EdgeExecutor as this PR does not change anything on auth managers.

What I saw is that the cookie _tokenwas set to value undefined (as string, not the variable!) and then somehow the parsing failed. But I am not sure which side-condition can cause this and also can not see a PR that fixed it. If somebody is able to reproduce, would like to contribute a fix.
From error stack it seems the browser sent a POST response with mime type JSON but not with JSON document body.

@dheerajturaga
Copy link
Member

@jscheffl Im unable to login to airflow on this branch. I lauched airflow with breeze start-airflow -d -e --executor EdgeExecutor and im seeing this error

Actually I can confirm I had the same. Switched to FabAuthManager and then it worked. Somehow I was not able to reproduce this in main and therefore rebased my PR to latest main. Not able to get the error again. And I am not sure how this can be produced - for sure not with EdgeExecutor as this PR does not change anything on auth managers.

What I saw is that the cookie _tokenwas set to value undefined (as string, not the variable!) and then somehow the parsing failed. But I am not sure which side-condition can cause this and also can not see a PR that fixed it. If somebody is able to reproduce, would like to contribute a fix. From error stack it seems the browser sent a POST response with mime type JSON but not with JSON document body.

Yup, the rebase fixed this issue

@dheerajturaga
Copy link
Member

Feature is working, I was able to run tasks on the edge worker aswell as test to see if the worker is responding correctly to my request to change states from the edge worker dashboard.

I have not had much luck scale testing this however, having 100 concurrent tasks running seems to overload my laptop (behavior is consistent without this PR aswell)

@jscheffl jscheffl force-pushed the feature/make-edge-worker-using-async-loop branch from f0bf876 to 5221219 Compare January 8, 2026 21:25
@jscheffl
Copy link
Contributor Author

jscheffl commented Jan 8, 2026

I have not had much luck scale testing this however, having 100 concurrent tasks running seems to overload my laptop (behavior is consistent without this PR aswell)

Haha, yeah, running ~125 tasks took ~12GB RAM on top of the rest on my laptop. (each task just a python sleep()) - But well at least factor 10 more than the previous implementation.

@dabla
Copy link
Contributor

dabla commented Jan 8, 2026

I have not had much luck scale testing this however, having 100 concurrent tasks running seems to overload my laptop (behavior is consistent without this PR aswell)

Haha, yeah, running ~125 tasks took ~12GB RAM on top of the rest on my laptop. (each task just a python sleep()) - But well at least factor 10 more than the previous implementation.

How come the sudden increase? That’s huge

@jscheffl
Copy link
Contributor Author

jscheffl commented Jan 8, 2026

I have not had much luck scale testing this however, having 100 concurrent tasks running seems to overload my laptop (behavior is consistent without this PR aswell)

Haha, yeah, running ~125 tasks took ~12GB RAM on top of the rest on my laptop. (each task just a python sleep()) - But well at least factor 10 more than the previous implementation.

How come the sudden increase? That’s huge

The worker forks a Python process per supervisor and the supervisor forks another Python process to separate the workload. 125 running tasks mean 1+125*2 == 251 processes. 125 sleeping, 125+1 sending heartbeats to API.

Subtracting the worker I think this is very lean actually, 12GB/125 workload == ~98MB per workload == <50MB per Python interpreter (whereas benefitting from COW as process manager shows ~250MB/process in the task mananger)

I assume some memory can be save if we also implement the gc-freeze like in #58365 - Did not apply in this PR but would be the same here.

A major saving would be if the supervisor would be adding async capabilities such that the supervising runs all in one process for multiple tasks in async loops - but this would be less crash resistent and a major rework of the supervisor. Maybe also in the future...

@dabla
Copy link
Contributor

dabla commented Jan 9, 2026

I have not had much luck scale testing this however, having 100 concurrent tasks running seems to overload my laptop (behavior is consistent without this PR aswell)

Haha, yeah, running ~125 tasks took ~12GB RAM on top of the rest on my laptop. (each task just a python sleep()) - But well at least factor 10 more than the previous implementation.

How come the sudden increase? That’s huge

The worker forks a Python process per supervisor and the supervisor forks another Python process to separate the workload. 125 running tasks mean 1+125*2 == 251 processes. 125 sleeping, 125+1 sending heartbeats to API.

Subtracting the worker I think this is very lean actually, 12GB/125 workload == ~98MB per workload == <50MB per Python interpreter (whereas benefitting from COW as process manager shows ~250MB/process in the task mananger)

I assume some memory can be save if we also implement the gc-freeze like in #58365 - Did not apply in this PR but would be the same here.

A major saving would be if the supervisor would be adding async capabilities such that the supervising runs all in one process for multiple tasks in async loops - but this would be less crash resistent and a major rework of the supervisor. Maybe also in the future...

Makes sense.

@dheerajturaga dheerajturaga merged commit 56d70e7 into apache:main Jan 9, 2026
130 checks passed
@jscheffl
Copy link
Contributor Author

jscheffl commented Jan 9, 2026

@dheerajturaga Thanks for merging and congrats to first merged PR!

OscarLigthart pushed a commit to OscarLigthart/airflow that referenced this pull request Jan 9, 2026
* First draft of async edge worker

* Further async implementation

* Remove TODO

* Uuups, remove example

* Full rewrite in Python async

* Fix pytests after async re-write

* Revert async implementation to use multiprocessing for supervisor

* Ensure running processes are not killed on CTRL+C

* Fix signal handler, make it proper async

* Fix pytest

* Review feedback by dabla
@dheerajturaga
Copy link
Member

@jscheffl , I was able to scale test apache-airflow-providers-edge3 3.0.0rc1 (this pull request) and here are my findings.

I was able to load the edge worker with 100 concurrent tasks, I see that the memory utilization spikes to 5GB and slowly starts shrinking as tasks continue to finish. This is significantly better than celery which jumps to 20+GB at 100 concurrent tasks!

Also, as the number of tasks queued on the worker increases, the memory consumption of the worker also increases. with 100 tasks running and 400 tasks queued on the same worker, the memory spikes to 13GB.

Overall, this is a significant improvement! thanks for implementing!

I was also able to push the boundaries with 400 parallel tasks and I see memory consumption of 18GB. However some tasks start to fail due to socket limitations. I think we can safely claim a concurrency of 100 if the machine allows.

@dabla
Copy link
Contributor

dabla commented Jan 15, 2026

@jscheffl , I was able to scale test apache-airflow-providers-edge3 3.0.0rc1 (this pull request) and here are my findings.

I was able to load the edge worker with 100 concurrent tasks, I see that the memory utilization spikes to 5GB and slowly starts shrinking as tasks continue to finish. This is significantly better than celery which jumps to 20+GB at 100 concurrent tasks!

Also, as the number of tasks queued on the worker increases, the memory consumption of the worker also increases. with 100 tasks running and 400 tasks queued on the same worker, the memory spikes to 13GB.

Overall, this is a significant improvement! thanks for implementing!

I was also able to push the boundaries with 400 parallel tasks and I see memory consumption of 18GB. However some tasks start to fail due to socket limitations. I think we can safely claim a concurrency of 100 if the machine allows.

@dheerajturaga Interesting thanks for sharing this is very valuable information as I haven’t got time to test it yet.

@jscheffl
Copy link
Contributor Author

Overall, this is a significant improvement! thanks for implementing!

Cool! Thanks for confirming my tests! Yes, this was my longer outstanding aim!

But as LocalExecutor was also optimized via gc.freeze I assume memory can be optimized still down a bit. And with the limits on sockets... yeah in such a level of hundreds of tasks I assume some kernel parameters demand tuning as well. But for 400 concurrent tasks I assume we rather need to start thinking about supporting deferred mode on edge as well which would be a major next cool increment!

Would it not be cool if the edge worker fetches the DeferredException and puts the triggerer actions into the async loop of the worker... almost zero effort and memory needed and then returns work back to normal supervisor when returnign from deferred state? Would need some changes on core or SDK as well...

But yeah we need to have some dreams...

@dabla
Copy link
Contributor

dabla commented Jan 15, 2026

Overall, this is a significant improvement! thanks for implementing!

Cool! Thanks for confirming my tests! Yes, this was my longer outstanding aim!

But as LocalExecutor was also optimized via gc.freeze I assume memory can be optimized still down a bit. And with the limits on sockets... yeah in such a level of hundreds of tasks I assume some kernel parameters demand tuning as well. But for 400 concurrent tasks I assume we rather need to start thinking about supporting deferred mode on edge as well which would be a major next cool increment!

Would it not be cool if the edge worker fetches the DeferredException and puts the triggerer actions into the async loop of the worker... almost zero effort and memory needed and then returns work back to normal supervisor when returnign from deferred state? Would need some changes on core or SDK as well...

But yeah we need to have some dreams...

That would just be awesome Jens!!! The edge executor is already a huge step forward, I’m sure one day that “dream” will become reality ;-)

@potiuk
Copy link
Member

potiuk commented Jan 18, 2026

That would just be awesome Jens!!! The edge executor is already a huge step forward, I’m sure one day that “dream” will become reality ;-)

Good dreams :) .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:edge Edge Executor / Worker (AIP-69) / edge3

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants