Skip to content

Commit

Permalink
[DataPipe] Fix FullSync shutdown hanging issue while paused
Browse files Browse the repository at this point in the history
ghstack-source-id: 098b11f30792ceca6b68b93fd98d091a5e6cad19
Pull Request resolved: #1153
  • Loading branch information
NivekT committed May 5, 2023
1 parent 11bb5b8 commit 70778db
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 0 deletions.
8 changes: 8 additions & 0 deletions test/test_distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,14 @@ def _test_fullsync(rank, world_size, backend, q):
except Exception as e:
assert isinstance(e, PrefetchTimeoutError)

# Test that reset/shutdown does not hang while paused
dp3 = dp.fullsync()
it = iter(dp3)
next(it)
dp3.pause()
it2 = iter(dp3) # Reset
next(it2)

_finalize_distributed_queue(rank, q)

@world_size_parametrize
Expand Down
1 change: 1 addition & 0 deletions torchdata/datapipes/iter/util/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ def return_next(self):
return data

def shutdown(self):
self._paused = False
self._executor.shutdown(wait=True)

def pause(self):
Expand Down

0 comments on commit 70778db

Please sign in to comment.