-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Change File info poller to respond when capacity in channel opens up and decode stream into Vec of structs #715
Conversation
b5fde69
to
69d38a2
Compare
} | ||
} | ||
_ = cleanup_trigger.tick() => self.clean(&self.cache).await?, | ||
result = futures::future::try_join(sender.reserve().map_err(Error::from), self.get_next_file()) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like it's safe to assume that the joined result tuple is treated like collecting an iterator of results into a single result wrapping a collection, i.e. assuming both operations being joined are Ok(_)
the result tuple will be Ok(result_1_inner, result_2_inner)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is my understanding
https://docs.rs/futures/0.3.30/futures/future/fn.try_join.html
This is an attempt to make the file_info_poller more responsive to capacity in the mpsc::channel. It should help consumers not have any delays in processing data when they are backed up.
Also the file_info_poller now converted the stream of bytes into a Vec of the decoded struct. Not sure this is a great idea, but we have seen that holding the stream open to AWS while in the channel waiting for the consumer to process the file can cause "connection reset by peer" errors when the consumer takes a while processing the stream or is really backed up. This change will cause the use of more memory but should allow the consumer to process data faster.