-
Notifications
You must be signed in to change notification settings - Fork 152
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Unblock ProtoMPRS to control determinism of DataPipe in single/multi-processing and dist/non-dist env #827
Conversation
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
): | ||
pass | ||
|
||
|
||
def SpawnProcessForDataPipeline(multiprocessing_ctx, datapipe, call_locally_fn=None, call_on_reset_epoch=None): | ||
def SpawnProcessForDataPipeline(multiprocessing_ctx, datapipe, call_on_process_init=None, call_on_epoch_reset=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I change these argument names to clarify the functionalities.
@@ -174,38 +188,72 @@ def __init__( | |||
self.multiprocessing_context = multiprocessing_context | |||
self.processes = [] | |||
self.datapipes = [] | |||
self.combined_datapipes = None | |||
self.end_datapipe = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I change it to end_datapipe
because we need to store the last DataPipe for both In-process and multiprocessing cases.
) | ||
|
||
# Multiprocessing (num_workers > 0) | ||
if isinstance(self.end_datapipe, _IterateQueueDataPipes): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will merge conflict with my prefetcher fix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will rebase when your PR is landed. I still need to add a test for process-local RNGs
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
7cc917a
to
ad8da32
Compare
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
…processing and dist/non-dist env (pytorch#827) Summary: This PR temporarily extend `PrototypingMultiProcessingReadingService` to fully control the determinism of the pipeline in the combinations of: - Single/Multi-processing - Distributed/Non-distributed When we have `SequentialReadingService` ready to combine `DistributedReadingService` and `PrototypingMultiProcessingReadingService`, a few code should be removed. And, for in-process reading service, we still need a method to isolate global RNGs to prevent data-pipeline interferes randomness against model. For multiprocessing case, it will set the same random seed for `Shuffler` and set different deterministic seeds for global RNGs including `python.random`, `torch` and `numpy` within each subprocess. For distributed case, it will share the same random seed for `Shuffler` across all distributed subprocesses to guarantee the shuffle order before sharding. Test Plan: All tests are executed in the combinations of the above environments - [x] Validate the same seed will generate the same order of data - [x] Validate different seeds will generate different order of data - [x] Validate the data after shuffle and sharding in each worker are mutually exclusive and collectively exhaustive with/without manual seed There is one missing test I will add tmrw - [x] Validate subprocess-local RNGs like `random`, `torch` and `numpy` are properly set with different seeds. Pull Request resolved: pytorch#827 Reviewed By: VitalyFedyunin, NivekT Differential Revision: D40323946 Pulled By: ejguan fbshipit-source-id: 2997d6d5dce87a6c38d5ebdf64a00f9769bb18fa
…processing and dist/non-dist env (#827) Summary: This PR temporarily extend `PrototypingMultiProcessingReadingService` to fully control the determinism of the pipeline in the combinations of: - Single/Multi-processing - Distributed/Non-distributed When we have `SequentialReadingService` ready to combine `DistributedReadingService` and `PrototypingMultiProcessingReadingService`, a few code should be removed. And, for in-process reading service, we still need a method to isolate global RNGs to prevent data-pipeline interferes randomness against model. For multiprocessing case, it will set the same random seed for `Shuffler` and set different deterministic seeds for global RNGs including `python.random`, `torch` and `numpy` within each subprocess. For distributed case, it will share the same random seed for `Shuffler` across all distributed subprocesses to guarantee the shuffle order before sharding. Test Plan: All tests are executed in the combinations of the above environments - [x] Validate the same seed will generate the same order of data - [x] Validate different seeds will generate different order of data - [x] Validate the data after shuffle and sharding in each worker are mutually exclusive and collectively exhaustive with/without manual seed There is one missing test I will add tmrw - [x] Validate subprocess-local RNGs like `random`, `torch` and `numpy` are properly set with different seeds. Pull Request resolved: #827 Reviewed By: VitalyFedyunin, NivekT Differential Revision: D40323946 Pulled By: ejguan fbshipit-source-id: 2997d6d5dce87a6c38d5ebdf64a00f9769bb18fa
…processing and dist/non-dist env (pytorch#827) Summary: This PR temporarily extend `PrototypingMultiProcessingReadingService` to fully control the determinism of the pipeline in the combinations of: - Single/Multi-processing - Distributed/Non-distributed When we have `SequentialReadingService` ready to combine `DistributedReadingService` and `PrototypingMultiProcessingReadingService`, a few code should be removed. And, for in-process reading service, we still need a method to isolate global RNGs to prevent data-pipeline interferes randomness against model. For multiprocessing case, it will set the same random seed for `Shuffler` and set different deterministic seeds for global RNGs including `python.random`, `torch` and `numpy` within each subprocess. For distributed case, it will share the same random seed for `Shuffler` across all distributed subprocesses to guarantee the shuffle order before sharding. Test Plan: All tests are executed in the combinations of the above environments - [x] Validate the same seed will generate the same order of data - [x] Validate different seeds will generate different order of data - [x] Validate the data after shuffle and sharding in each worker are mutually exclusive and collectively exhaustive with/without manual seed There is one missing test I will add tmrw - [x] Validate subprocess-local RNGs like `random`, `torch` and `numpy` are properly set with different seeds. Pull Request resolved: pytorch#827 Reviewed By: VitalyFedyunin, NivekT Differential Revision: D40323946 Pulled By: ejguan fbshipit-source-id: 2997d6d5dce87a6c38d5ebdf64a00f9769bb18fa
This PR temporarily extend
PrototypingMultiProcessingReadingService
to fully control the determinism of the pipeline in the combinations of:When we have
SequentialReadingService
ready to combineDistributedReadingService
andPrototypingMultiProcessingReadingService
, a few code should be removed. And, for in-process reading service, we still need a method to isolate global RNGs to prevent data-pipeline interferes randomness against model.For multiprocessing case, it will set the same random seed for
Shuffler
and set different deterministic seeds for global RNGs includingpython.random
,torch
andnumpy
within each subprocess.For distributed case, it will share the same random seed for
Shuffler
across all distributed subprocesses to guarantee the shuffle order before sharding.Tests:
All tests are executed in the combinations of the above environments
There is one missing test I will add tmrw
random
,torch
andnumpy
are properly set with different seeds.