-
Notifications
You must be signed in to change notification settings - Fork 301
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Async type engine #2752
Async type engine #2752
Conversation
Signed-off-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
…dle the fact that it itself can be called from an async function, thus enabling the TypeEngine to return a future, which it currently doesn't handle Signed-off-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
Signed-off-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
flytekit/core/type_engine.py
Outdated
return fut | ||
else: | ||
coro = self.async_to_literal(ctx, python_val, python_type, expected) | ||
return asyncio.run(coro) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
everywhere where run is called will need to be swapped out for whatever async running mechanism we decide on.
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #2752 +/- ##
===========================================
- Coverage 78.51% 46.84% -31.68%
===========================================
Files 262 257 -5
Lines 23411 23227 -184
Branches 3976 4005 +29
===========================================
- Hits 18382 10881 -7501
- Misses 4314 12231 +7917
+ Partials 715 115 -600 ☔ View full report in Codecov by Sentry. |
Signed-off-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
Change some functions in base_task and promise to async Add some helper functions Use a contextvars aware executor Signed-off-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
Signed-off-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
Signed-off-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
Signed-off-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
Signed-off-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
Signed-off-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
Signed-off-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
Signed-off-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
Signed-off-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
Signed-off-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
Signed-off-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
Signed-off-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
Signed-off-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there an issue/write up that describes the user or developer facing benefits of having a async type engine?
@@ -29,6 +29,7 @@ | |||
from flytekit.tools.module_loader import load_object_from_module | |||
from flytekit.types.pickle import pickle | |||
from flytekit.types.pickle.pickle import FlytePickleTransformer | |||
from flytekit.utils.asyn import loop_manager |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be imported as run_sync
and then use that? (I prefer to hide the loop manager as much as possible)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i guess so. i was hoping to one day figure out type hinting, which doesn't work today in either case, but i think will never work with the run sync way. I tried briefly getting functool wraps to work but alas to no avail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ohh it was missing a paramspec. it works now. and changed promise.py and base_task.py to just use the run_sync.
translate_inputs_to_literals = loop_manager.synced(_translate_inputs_to_literals) | ||
|
||
|
||
async def resolve_attr_path_in_promise(p: Promise) -> Promise: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we be concerned with backward compatibility with changing a function from sync to async?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this particular function is pretty limited in terms of callees so hopefully that's okay.
flytekit/core/promise.py
Outdated
if "no running event loop" not in str(e): | ||
logger.error(f"Unknown RuntimeError {str(e)}") | ||
raise | ||
binding_data = asyncio.run( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the issue with using sync_run
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
Signed-off-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
Signed-off-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
Signed-off-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
Not currently no, I did some preliminary local testing and noticed an improvement so it's something I think is worth doing. I'm planning on putting together more performance testing in general though for the release notes covering a variety of scenarios. But ultimately speed is the main driver ofc. |
Signed-off-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
Signed-off-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
Signed-off-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
Signed-off-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
Signed-off-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is amazing. Really. The first step in the next round of innovation in flytekit. Thank you for working on this.
Signed-off-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
Signed-off-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
Changes
This PR converts the core functions in the TypeEngine to async, adding synchronized functions where needed.
async_to_python_value
to `TypeEngine`.flytekit.logger
) that will copy contextvars. Without this, we would lose theFlyteContext
information.tools
to a new folder calledutils
to avoid circular dependencies (tools already import core).Notes
Noticed a couple issues while developing this. Most of these notes are more relevant to the pr that introduced a #2784 already merged.
Event loop overriding
We ran into this first with a bug in eager mode where we noticed that when certain libraries were used in conjunction with eager mode, those libraries would break. What was happening was that that library had on load, created and installed an event loop on the main thread. When flytekit called
asyncio.run
to run an eager workflow,run
removed and effectively destroyed that event loop, replacing it with its own, and thereafter cleaning it up. The fact that any library, as well as user code, at any point, is liable to callasyncio.run
means that at any point, the loop that flytekit relies on can be destroyed if not properly protected.Inherent recursive behavior
The nature of what the type engine does is very recursive. For example, the Union transformer recursively may call the List transformer which may then call another transformer. Because we don't want to force users to only write async transformers from now on, and because users may provide for their own list/union/dict-like transformers that recursively call the type engine, a common scenario that we have to account for is
This seemingly innocuous pattern is actually a bit tricky to account for. Typically when you call async code for the first time, you'd use for something like
asyncio.run
orloop.run_until_complete
. These root level async entrypoints are only available if they haven't been called before (or have finished). That is there can only be one of these calls, for a given thread, at any point. At the point of the second async call in the scenario above, the invoking synchronous function cannot call these root async calls again.To get around this there's two main solutions.
asyncio.get_running_loop
must raise aRuntimeError
). Other synchronous functions, like the second one in the above scenario, must also return anasyncio.Future
, in addition to the original return type of the function.The advantage of the first approach is single-threaded-ness, along with not introducing additional blocking. The advantage of the second approach is that you don't have functions that randomly can't be called from async functions, and other functions that randomly return a Union of a future. The approach that we decided to pursue with the loop manager is the second approach. Using the loop manager code, instead of calling
asyncio.run
we can callloop_manager.run_sync
everywhere. Haven't yet been able to get signal handlers to work to handle loop termination but that should be okay since we don't need that yet.Still todo (post-merge):
How was this patch tested?
Unit testing and tested locally and by running workflows on dev cluster. The scope of this PR is such that it will need a fair bit of alpha candidate testing. Something that we can do post-merge, esp in conjunction with some of the other larger changes slated for this cycle.
Setup process
Screenshots
Check all the applicable boxes
Related PRs
Docs link