Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

asyncmapper: shutdown producer on generator close #597

Merged
merged 1 commit into from
Nov 20, 2024
Merged

Conversation

skshetry
Copy link
Member

@skshetry skshetry commented Nov 14, 2024

Fixes the hanging issue when trying to cancel or close the script.

The test on #521 hangs. This is because we run the producer on a separate ThreadPoolExecutor in a loop.

def _produce(self) -> None:
for item in self.iterable:
fut = asyncio.run_coroutine_threadsafe(self.work_queue.put(item), self.loop)
fut.result() # wait until the item is in the queue
async def produce(self) -> None:
await self.to_thread(self._produce)

async def to_thread(self, func, *args):
return await self.loop.run_in_executor(self.pool, func, *args)

When the interpreter shuts down, ThreadPoolExecutor waits for all threads to join(). Since produce() is still running, join() hangs indefinitely.

Test Failure Analysis

Although the script hangs on other platforms too, you might notice that the CI failed only on the Windows tests on #521.

This is because test_query_e2e sends multiple SIGINT signals (or CTRL_C on Windows). On the first SIGINT, Python raises a KeyboardInterrupt and tries to exit, but it gets blocked by thread.join(). On the next SIGINT, join() raises an exception and exits.

The latter only happens on non-Windows platform, while Windows would hang indefinitely on thread.join() due to:

Before this PR, on non-Windows platform, it would take 2 SIGINT signals to terminate the script.

Solution

This PR adjusts produce() loop to check for a "shutdown" signal, and sets the signal on exit (either on success or failure).

produce() also queues items into the work_queue, so that might hang if the queue is full. To solve that, we also drain items from the queue, so that it notices the "shutdown" signal quickly.

Copy link

cloudflare-workers-and-pages bot commented Nov 14, 2024

Deploying datachain-documentation with  Cloudflare Pages  Cloudflare Pages

Latest commit: 14a0842
Status: ✅  Deploy successful!
Preview URL: https://d53f49f5.datachain-documentation.pages.dev
Branch Preview URL: https://issue-40-2.datachain-documentation.pages.dev

View logs

Copy link

codecov bot commented Nov 14, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 88.00%. Comparing base (9fd3155) to head (14a0842).
Report is 1 commits behind head on issue-40.

Additional details and impacted files
@@             Coverage Diff              @@
##           issue-40     #597      +/-   ##
============================================
+ Coverage     87.89%   88.00%   +0.10%     
============================================
  Files           100      100              
  Lines         10041    10052      +11     
  Branches       1365     1367       +2     
============================================
+ Hits           8826     8846      +20     
+ Misses          871      865       -6     
+ Partials        344      341       -3     
Flag Coverage Δ
datachain 87.94% <100.00%> (+0.06%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.


🚨 Try these New Features:

@skshetry skshetry marked this pull request as ready for review November 14, 2024 05:39
@skshetry skshetry marked this pull request as draft November 15, 2024 05:33
@skshetry skshetry marked this pull request as ready for review November 18, 2024 09:31
@skshetry skshetry merged commit 56cc2ad into issue-40 Nov 20, 2024
38 checks passed
@skshetry skshetry deleted the issue-40-2 branch November 20, 2024 02:32
skshetry added a commit that referenced this pull request Nov 20, 2024
* Use threading in AsyncMapper.produce()

* Implement prefetching in .gen() and .map()

* Avoid user code error in name_len()

* asyncmapper: shutdown producer on generator close (#597)

---------

Co-authored-by: skshetry <18718008+skshetry@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants