-
Notifications
You must be signed in to change notification settings - Fork 152
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Guarantee datapipe being reset iterator when all loops have received reset request in the dispatching process #994
Conversation
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
test/test_remote_io.py
Outdated
[["s3://ai2-public-datasets/charades/"], 18], # folder without '/' | ||
[["s3://ai2-public-datasets/charad"], 18], # prefix | ||
[ | ||
( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change 1
def __del__(self): | ||
try: | ||
self.finalize() | ||
except AttributeError: | ||
pass | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change 2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a comment - a little bit surprised we need this given ProtoMultiRS
's finalize
seems to be catching every possible AttributeError
? Maybe the issue is Distributed
? Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ProtoMPRS
doesn't handle all of AttributeError
like the self._worker_processes
from
data/torchdata/dataloader2/reading_service.py
Line 345 in 98222ad
for process, req_queue, res_queue in self._worker_processes: |
Technical speaking, I should remove those try-except clauses in finalize
to simplify the codebase
# Ensure only reset iterator once for the dispatching process | ||
if reset_iterator_counter is not None: | ||
reset_iterator_counter.increment() | ||
while not reset_iterator_counter.is_reached(): | ||
yield True | ||
# Sync between loops within the dispatching process | ||
source_datapipe.reset_iterator() | ||
yield True | ||
reset_iterator_counter.reset() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change 3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might need to do something similar to this for resume
dispatching process. cc: @NivekT
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to confirm I understand - this is to handle the situation where some workers are handling GetNextRequest
while some are trying to reset
? You want all GetNext
to be done before the dispatching process executes reset
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not workers. It only happens to the dispatching process when multiple leaf DataPipes shares the same data source (Round robin demux on the same DataPipe) in a single process.
It handles the case when some loops have received reset
while the others haven't. We want to wait to request getNext
until all loops have received reset
. Otherwise, there will be a case that the data source is reset during the middle of iteration for other loops
I will rerun all tests tmrw when PyTorch nightly has been updated. |
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Hope the nightly gets uploaded and all the CIs get fixed
def __del__(self): | ||
try: | ||
self.finalize() | ||
except AttributeError: | ||
pass | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a comment - a little bit surprised we need this given ProtoMultiRS
's finalize
seems to be catching every possible AttributeError
? Maybe the issue is Distributed
? Thoughts?
if self._reached: | ||
return self._reached |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: These two lines can be removed? But I guess it is slightly faster, so I'm indifferent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lol, you are right. I will remove them tmrw
# Ensure only reset iterator once for the dispatching process | ||
if reset_iterator_counter is not None: | ||
reset_iterator_counter.increment() | ||
while not reset_iterator_counter.is_reached(): | ||
yield True | ||
# Sync between loops within the dispatching process | ||
source_datapipe.reset_iterator() | ||
yield True | ||
reset_iterator_counter.reset() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to confirm I understand - this is to handle the situation where some workers are handling GetNextRequest
while some are trying to reset
? You want all GetNext
to be done before the dispatching process executes reset
?
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
The test takes way more time to finish. However, I can't really reproduce it either on linux or mac Edit: Find the culprit test |
And, I am going to remove all S3 related commits. To fix S3 test, I plan to rely on #997 |
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
Changes
Fix S3 Testsin Fix test_remote_io.py due to mutating public s3 bucket #997gc
gets involvedthread
fromPrefetcher
. This would prevent racing condition when bothfinally
in generator andreset
function are accessing the samethread
.