Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create a buffered wrapper around BytesStream #1501

Merged
merged 3 commits into from
Dec 17, 2024

Conversation

akoshelev
Copy link
Collaborator

The need for it is driven by the behavior we're observing from Report Collector sending bytes down to individual shards. It writes data as it becomes available and Hyper does not accumulate it before sending. On the receiver side we are seeing chunks of size 1 received and that creates thrashing on sender/receiver side.

This change paves the path to use buffering on RC side.

The need for it is driven by the behavior we're observing from Report Collector sending bytes down to individual shards. It writes data as it becomes available and Hyper does not accumulate it before sending. On the receiver side we are seeing chunks of size 1 received and that creates thrashing on sender/receiver side.

This change paves the path to use buffering on RC side.
Copy link

codecov bot commented Dec 17, 2024

Codecov Report

Attention: Patch coverage is 98.03922% with 3 lines in your changes missing coverage. Please review.

Project coverage is 93.24%. Comparing base (69f0a5d) to head (0b8254f).
Report is 4 commits behind head on main.

Files with missing lines Patch % Lines
ipa-core/src/helpers/transport/stream/buffered.rs 98.03% 3 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1501      +/-   ##
==========================================
+ Coverage   93.02%   93.24%   +0.22%     
==========================================
  Files         237      238       +1     
  Lines       43535    43688     +153     
==========================================
+ Hits        40498    40739     +241     
+ Misses       3037     2949      -88     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

// verify_success(infallible_stream(12, 5), 12).await;
// verify_success(infallible_stream(12, 12), 12).await;
// verify_success(infallible_stream(24, 12), 12).await;
// verify_success(infallible_stream(24, 12), 1).await;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: clean these up

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, I want to get them back

/// done. This may need to be used when writing into HTTP streams as Hyper
/// does not provide any buffering functionality and we turn NODELAY on
#[pin_project]
pub struct BufferedBytesStream<S> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a few nits

To me BufferedBytesStream doesn't tell why this stream is special. I think the key thing is the poll with is being "chunked". I would rather call this BufferedChunkedStream

why not have the trait bound S: BytesStream here as well? Thinking more about documentation/readability

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bounds need to be repeated everywhere if put on the struct - generally we try to avoid that if that's not necessary

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this does not attempt to chunk the inner stream, the whole purpose of it is to accumulate enough bytes (buffer) before sending them down for processing.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But doesn't the output get chunked? This adapter is buffering and chunking... another name would be paginating.

I still think BufferedBytesStream is vague. My 2 cents.

/// Number of bytes released per single poll.
/// All items except the last one are guaranteed to have
/// exactly this number of bytes written to them.
sz: usize,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: chunk_size?

@akoshelev akoshelev merged commit f148eac into private-attribution:main Dec 17, 2024
12 checks passed
akoshelev added a commit to akoshelev/raw-ipa that referenced this pull request Dec 17, 2024
Continuation of private-attribution#1501, we want to avoid submitting reports one by one from RC to each individual shard. That likely leads to fragmentation and we've been observing slow execution on the client side.

This sets up the buffer size to be divisible by TCP MSS, but I don't have any real evidence that this is going to work well. We would need to experiment with it
akoshelev added a commit that referenced this pull request Jan 9, 2025
* Use stream buffering in report collector

Continuation of #1501, we want to avoid submitting reports one by one from RC to each individual shard. That likely leads to fragmentation and we've been observing slow execution on the client side.

This sets up the buffer size to be divisible by TCP MSS, but I don't have any real evidence that this is going to work well. We would need to experiment with it

* Use 8Kb buffers for streaming inside report collector
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants