-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Data] Fix exception in async map #47110
Conversation
output_batch_queue.put(output_batch) | ||
except Exception as e: | ||
exception_queue.put(e) | ||
output_batch_queue.put(None) # Signal to stop processing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could we use a constant/more specific sentinel value instead of generic None
in this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just change it to dict
@Bye-legumes Could you also add a unit test? You can base it off the reproducible in #47102, or the script you used to test. You can add it in |
Added! |
# Here, `out_batch` is a one-row output batch | ||
# from the async generator, corresponding to a | ||
# single row from the input batch. | ||
out_batch = output_batch_queue.get() | ||
if out_batch is None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should also be updated to SENTINEL
i think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right! I fixed! thx!
@@ -318,13 +318,19 @@ def transform_fn( | |||
# generators, and in the main event loop, yield them from | |||
# the queue as they become available. | |||
output_batch_queue = queue.Queue() | |||
exception_queue = queue.Queue() | |||
SENTINEL = dict() # Signal to stop processing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
empty dict can also be a valid UDF return.
I think we can avoid having this extra exception_queue and SENTINEL.
Just check if out_batch = output_batch_queue.get()
is an exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
awesome! That right! I just modify my modification to put exception in the queue
close ray-project#47102 --------- Signed-off-by: zhilong <zhilong.chen@mail.mcgill.ca>
Why are these changes needed?
close #47102
Related issue number
#47102
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.