Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementing thread based PrefetcherIterDataPipe #770

Closed

Conversation

VitalyFedyunin
Copy link
Contributor

@VitalyFedyunin VitalyFedyunin commented Sep 9, 2022

VitalyFedyunin added a commit that referenced this pull request Sep 9, 2022
ghstack-source-id: 30bea32365cafafd00f125e744ed9cd76d86ef6c
Pull Request resolved: #770
@facebook-github-bot facebook-github-bot added the CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. label Sep 9, 2022
VitalyFedyunin added a commit that referenced this pull request Sep 9, 2022
ghstack-source-id: 25bee7325112b321f403b2805dd9f6d23bd90af1
Pull Request resolved: #770
VitalyFedyunin added a commit that referenced this pull request Sep 9, 2022
ghstack-source-id: 635a985ea38c220345a1b7c08d5220e7a24c15c1
Pull Request resolved: #770
Copy link
Contributor

@ejguan ejguan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few comments about threading below.

time.sleep(PRODUCER_SLEEP_INTERVAL)

def __iter__(self):
self.reset()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: reset can be omitted.

torchdata/datapipes/iter/util/prefetcher.py Show resolved Hide resolved
Comment on lines +24 to +25
# TODO: Potential optimization is changing buffer from list to dequeue
self.prefetch_buffer = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. I agree changing to deque because it's thread safe.


class _PrefetchData:
def __init__(self, source_datapipe, buffer_size):
self.run_prefetcher = True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And, should we add a thread lock around run_prefetcher?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my thread lock is GIL =)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lol. Kind make sense.

Comment on lines +48 to +51
except communication.iter.InvalidStateResetRequired:
stop_iteration = True
except communication.iter.TerminateRequired:
prefetch_data.run_prefetcher = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to handle those two exceptions? communication is more or less a sub module for MultiprocessingReadingService. I personally feel better to remove those exceptions from a DataPipe

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to it here, because it is a separate thread that needs to be terminated nicely in case of source Datapipe is out of commission.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. This prefetch is not only attached to the main problem but also to child processes.

VitalyFedyunin added a commit that referenced this pull request Sep 21, 2022
ghstack-source-id: 4fc437ef879ad557340db8b547cbdf3625bc5acf
Pull Request resolved: #770
VitalyFedyunin added a commit that referenced this pull request Sep 21, 2022
ghstack-source-id: dd95cfdcba960cea4294bc7207c3ee005cf0fca5
Pull Request resolved: #770
VitalyFedyunin added a commit that referenced this pull request Sep 21, 2022
ghstack-source-id: 5fdc9ba2e6420371c4c4069039bd88fa6b902674
Pull Request resolved: #770
Copy link
Contributor

@ejguan ejguan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM
I assume there will be test added after the corresponding changes are made into PrototypeMPRS.

Copy link
Contributor

@NivekT NivekT left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we intend for this DataPipe to be user-facing? (I'm guessing not?) If it is, adding a docstring and adding this to torchdata.datapipes.iter.rst will be good.

Comment on lines +104 to +105
def reset_iterator(self):
self.reset()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: What is the expected behavior of reset_iterator in PrototypeRS? How is that different from the usual DataPipe reset?

Comment on lines +61 to +62
if self.buffer_size < 1:
yield from self.source_datapipe
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case should not be possible because of the check in __init__

Comment on lines +65 to +66
prefetch_data = _PrefetchData(self.source_datapipe, self.buffer_size)
self.prefetch_data = prefetch_data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Rename these to prefetcher_thread_worker or something?

@VitalyFedyunin
Copy link
Contributor Author

@VitalyFedyunin has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

VitalyFedyunin added a commit that referenced this pull request Sep 26, 2022
ghstack-source-id: 0bbf6ffe1ecb74ffd182c081328d61440fa195d2
Pull Request resolved: #770
@VitalyFedyunin
Copy link
Contributor Author

@VitalyFedyunin has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

VitalyFedyunin added a commit that referenced this pull request Oct 3, 2022
ghstack-source-id: 6b9c54f3ba2786e6c3fe7a47cfd0fe3387451635
Pull Request resolved: #770
@VitalyFedyunin
Copy link
Contributor Author

@VitalyFedyunin has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

@facebook-github-bot facebook-github-bot deleted the gh/VitalyFedyunin/21/head branch October 7, 2022 14:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants