-
Notifications
You must be signed in to change notification settings - Fork 172
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor connection handling #1060
Refactor connection handling #1060
Conversation
Benchmarks as of now: mainasync-nats: publish throughput/32
time: [202.71 µs 208.83 µs 215.22 µs]
thrpt: [14.180 MiB/s 14.614 MiB/s 15.055 MiB/s]
async-nats: publish throughput/1024
time: [331.07 µs 343.49 µs 356.15 µs]
thrpt: [274.20 MiB/s 284.30 MiB/s 294.97 MiB/s]
Found 1 outliers among 30 measurements (3.33%)
1 (3.33%) high mild
async-nats: publish throughput/8192
time: [2.2131 ms 2.3912 ms 2.5751 ms]
thrpt: [303.39 MiB/s 326.72 MiB/s 353.01 MiB/s]
async-nats: publish messages amount/32
time: [200.31 µs 205.57 µs 211.80 µs]
thrpt: [472.13 Kelem/s 486.45 Kelem/s 499.22 Kelem/s]
Found 2 outliers among 30 measurements (6.67%)
2 (6.67%) high mild
async-nats: publish messages amount/1024
time: [325.23 µs 335.40 µs 346.18 µs]
thrpt: [288.86 Kelem/s 298.15 Kelem/s 307.48 Kelem/s]
Found 2 outliers among 30 measurements (6.67%)
1 (3.33%) high mild
1 (3.33%) high severe
async-nats: publish messages amount/8192
time: [1.9831 ms 2.2042 ms 2.4116 ms]
thrpt: [41.466 Kelem/s 45.367 Kelem/s 50.427 Kelem/s]
subscribe amount/32 time: [1.5607 ms 1.6374 ms 1.6978 ms]
thrpt: [58.900 Kelem/s 61.074 Kelem/s 64.075 Kelem/s]
subscribe amount/1024 time: [1.1902 ms 1.2250 ms 1.2595 ms]
thrpt: [79.400 Kelem/s 81.633 Kelem/s 84.018 Kelem/s]
subscribe amount/8192 time: [4.2955 ms 4.3576 ms 4.4296 ms]
thrpt: [22.575 Kelem/s 22.949 Kelem/s 23.280 Kelem/s]
Found 2 outliers among 30 measurements (6.67%)
2 (6.67%) high severe this branch against mainasync-nats: publish throughput/32
time: [190.19 µs 196.71 µs 203.55 µs]
thrpt: [14.992 MiB/s 15.514 MiB/s 16.045 MiB/s]
change:
time: [-8.7393% -4.7959% -0.8225%] (p = 0.03 < 0.05)
thrpt: [+0.8294% +5.0375% +9.5762%]
Change within noise threshold.
Found 1 outliers among 30 measurements (3.33%)
1 (3.33%) high mild
async-nats: publish throughput/1024
time: [308.62 µs 318.89 µs 329.49 µs]
thrpt: [296.38 MiB/s 306.23 MiB/s 316.43 MiB/s]
change:
time: [-8.6554% -2.5471% +3.8930%] (p = 0.46 > 0.05)
thrpt: [-3.7471% +2.6137% +9.4756%]
No change in performance detected.
Found 2 outliers among 30 measurements (6.67%)
1 (3.33%) high mild
1 (3.33%) high severe
async-nats: publish throughput/8192
time: [1.4773 ms 1.5160 ms 1.5551 ms]
thrpt: [502.36 MiB/s 515.34 MiB/s 528.85 MiB/s]
change:
time: [-37.950% -31.428% -24.041%] (p = 0.00 < 0.05)
thrpt: [+31.650% +45.833% +61.161%]
Performance has improved.
Found 3 outliers among 30 measurements (10.00%)
2 (6.67%) high mild
1 (3.33%) high severe
async-nats: publish messages amount/32
time: [192.45 µs 197.46 µs 202.29 µs]
thrpt: [494.35 Kelem/s 506.44 Kelem/s 519.61 Kelem/s]
change:
time: [-11.902% -7.2112% -2.1045%] (p = 0.01 < 0.05)
thrpt: [+2.1497% +7.7717% +13.510%]
Performance has improved.
Found 1 outliers among 30 measurements (3.33%)
1 (3.33%) high mild
async-nats: publish messages amount/1024
time: [309.51 µs 318.95 µs 330.52 µs]
thrpt: [302.55 Kelem/s 313.53 Kelem/s 323.09 Kelem/s]
change:
time: [-10.980% -3.2048% +4.6338%] (p = 0.45 > 0.05)
thrpt: [-4.4286% +3.3109% +12.334%]
No change in performance detected.
Found 3 outliers among 30 measurements (10.00%)
1 (3.33%) high mild
2 (6.67%) high severe
async-nats: publish messages amount/8192
time: [1.4486 ms 1.4860 ms 1.5305 ms]
thrpt: [65.339 Kelem/s 67.293 Kelem/s 69.030 Kelem/s]
change:
time: [-31.125% -23.573% -15.210%] (p = 0.00 < 0.05)
thrpt: [+17.938% +30.843% +45.191%]
Performance has improved.
Found 3 outliers among 30 measurements (10.00%)
1 (3.33%) high mild
2 (6.67%) high severe
subscribe amount/32 time: [4.3207 ms 4.4703 ms 4.6061 ms]
thrpt: [21.710 Kelem/s 22.370 Kelem/s 23.144 Kelem/s]
change:
time: [+173.31% +198.66% +227.86%] (p = 0.00 < 0.05)
thrpt: [-69.499% -66.517% -63.412%]
Performance has regressed.
Found 3 outliers among 30 measurements (10.00%)
1 (3.33%) low mild
2 (6.67%) high mild
subscribe amount/1024 time: [1.3827 ms 1.4205 ms 1.4670 ms]
thrpt: [68.166 Kelem/s 70.398 Kelem/s 72.321 Kelem/s]
change:
time: [+14.539% +18.584% +23.016%] (p = 0.00 < 0.05)
thrpt: [-18.710% -15.672% -12.694%]
Performance has regressed.
Found 2 outliers among 30 measurements (6.67%)
2 (6.67%) high mild
subscribe amount/8192 time: [4.2687 ms 4.3939 ms 4.5255 ms]
thrpt: [22.097 Kelem/s 22.759 Kelem/s 23.426 Kelem/s]
change:
time: [-3.3419% +3.2701% +9.7699%] (p = 0.35 > 0.05)
thrpt: [-8.9003% -3.1665% +3.4574%]
No change in performance detected.
Found 2 outliers among 30 measurements (6.67%)
2 (6.67%) high mild I'll look into what's happening with subscriptions |
c2ceea6
to
712ef12
Compare
There's a problem I didn't take into account before: calling I'm kind of feeling like we could live without it. When it comes to SQL databases for example I don't think I ever needed to flush the connection manually, so why should that apply for NATS? The real missing thing in nats.rs is the ability to wait for acknowledgments from the Core NATS protocol. Right now calling |
53ce952
to
0d9c518
Compare
+OK is only when verbose mode is on, and it isn't tied to any particular request 🤔 |
Ah. I wasn't expecting that 😄. So best case scenario we know when a command was written to the buffer. Nothing more 😞 |
There are definitely use cases where minimum latency is vastly more important than throughput. |
|
How does an HTTP library compare to NATS? If you look back into issues on this repo, you will find people at odds with async's built-in buffering and using Breaking flush as you have described (waiting on future things) I would consider as a breaking change, and one that ought to really be reconsidered. |
As @caspervonb mentioned, you get NATS is pretty specific because you might be publishing just few bytes and want to reach subscribers as soon as possible - with latency almost equal to latency between clients and the server. The very low latency is critical aspect of NATS that we can't compromise. In many NATS use cases, every millisecond counts. That balance between as-high-as-possible throughput and as-low-as-possible latency will probably be impossible with just plain and simple non-flushing behaviour. Maybe async I/O will make it simpler in the future (and for example allow not buffering at all?), but still we probably should not compromise older systems. That's why we were thinking about getting rid of BufWriter and doing our own implementaton, where we can flush with more control. |
I agree that NATS should optimize for real time while also having the option to be more lazy in it's flushing, allowing less but bigger network packets to be sent. The reason why I said manual calls to Currently by default a 1ms flush interval is configured. This means that compared to an implementation which flushes after every write, here worse case scenario you get 1ms extra write latency (except that you're not really getting 1ms. Continue reading). I don't like the timer approach, but I feel like this it not a bad default. For the extreme realtime cases I feel like no timer should be present at all, and instead flushing should happen as soon as at least one command is ready to be sent. Asking the user to manually flush is a giant workaround. There are a bunch of libraries that can autonomously flush without having the user do it manually. The implementation already does write pipelining. Manual flushing actually slows down the library user from publishing quicker. It doesn't make it faster unless they With that said let me explain what this PR really breaks: this PR does not break or remove flushing, it actually gives the entire write path the chance to max out what the TCP connection is capable of (excluding vectored writes which is another very nice improvement which will come later). The only thing it breaks is that if you manually call Before
After
As you can see the actual write performance increased, because while the kernel was flushing we were able to add more data to the available portion of it's buffers (or other userspace buffers we have going on). Plus let's not forget we were able to read this whole time. This actually potentially allows us to send bigger network packets. The only problem is that because |
While a custom The bottleneck is that you're not taking full advantage of what the current async tools are giving you. Reading and writing should happen concurrently between each other and with flushing, otherwise you'll have dead moments where no I/O is happening in userspace (and to some extent in the kernel). And whatever you do you can't live without flushing. Guaranteed when dealing with rustls or other TLS libraries. |
@paolobarbolini I just did a very simple test: #[tokio::test]
async fn request_bench() {
use futures::stream::StreamExt;
let server = nats_server::run_basic_server();
let (tx, rx) = tokio::sync::oneshot::channel();
tokio::spawn({
let url = server.client_url().clone();
async move {
let client = async_nats::connect(url).await.unwrap();
let mut subscription = client.subscribe("request".into()).await.unwrap();
client.flush().await.unwrap();
tx.send(()).unwrap();
while let Some(message) = subscription.next().await {
client
.publish(message.reply.unwrap(), "".into())
.await
.unwrap();
client.flush().await.unwrap();
}
}
});
let client = async_nats::connect(server.client_url()).await.unwrap();
rx.await.unwrap();
let total = std::time::Instant::now();
for _ in 0..100 {
let now = std::time::Instant::now();
let request = client.request("request".into(), "".into()).await.unwrap();
let elapsed = now.elapsed();
println!("request took: {:?}", elapsed);
}
println!("took: {:?}", total.elapsed());
} and run it against your branch and main. main
this PR running 1 test
request took: 1.636792ms
request took: 2.731041ms
request took: 1.58475ms
request took: 1.566958ms
request took: 2.643208ms
request took: 1.454792ms
request took: 2.466917ms
request took: 1.493792ms
request took: 2.574417ms
request took: 1.496417ms
request took: 2.665875ms
request took: 1.534542ms
request took: 2.622291ms
request took: 2.609041ms
request took: 1.47ms
request took: 2.647458ms
request took: 1.451333ms
request took: 2.544708ms
request took: 2.532542ms
request took: 2.517083ms
request took: 2.507375ms
request took: 2.603041ms
request took: 2.570875ms
request took: 1.361167ms
request took: 2.481333ms
request took: 2.571875ms
request took: 2.548333ms
request took: 2.52175ms
request took: 2.557666ms
request took: 1.356833ms
request took: 2.445416ms
request took: 2.476167ms
request took: 2.543875ms
request took: 2.538417ms
request took: 2.460625ms
request took: 2.619833ms
request took: 2.529833ms
request took: 1.352584ms
request took: 2.525375ms
request took: 2.547ms
request took: 2.559417ms
request took: 1.445042ms
request took: 2.46875ms
request took: 1.389834ms
request took: 2.439042ms
request took: 2.509292ms
request took: 2.513542ms
request took: 2.538291ms
request took: 2.528375ms
request took: 2.490459ms
request took: 2.564542ms
request took: 1.363833ms
request took: 2.516792ms
request took: 2.544042ms
request took: 2.559834ms
request took: 1.405125ms
request took: 2.496167ms
request took: 2.517917ms
request took: 2.500666ms
request took: 2.473292ms
request took: 2.540166ms
request took: 2.5195ms
request took: 2.458042ms
request took: 2.512125ms
request took: 2.523209ms
request took: 1.33225ms
request took: 2.49025ms
request took: 2.469333ms
request took: 2.486875ms
request took: 2.546042ms
request took: 2.488375ms
request took: 2.522208ms
request took: 2.511334ms
request took: 2.491667ms
request took: 2.538208ms
request took: 2.559042ms
request took: 2.411417ms
request took: 2.527666ms
request took: 2.467333ms
request took: 2.443625ms
request took: 2.483ms
request took: 2.47725ms
request took: 2.533583ms
request took: 2.491792ms
request took: 2.444958ms
request took: 2.4395ms
request took: 2.453959ms
request took: 2.480542ms
request took: 2.507917ms
request took: 2.518084ms
request took: 2.484042ms
request took: 2.501208ms
request took: 2.491292ms
request took: 2.497958ms
request took: 2.494875ms
request took: 2.515584ms
request took: 2.519917ms
request took: 2.520375ms
request took: 2.465583ms
request took: 2.533583ms
took: 234.451083ms Removing the flush from publish on response makes them both as slow as this PR code. This is a huge difference. So, we are well aware of the fact that there are optimizations to flushing/writing mechanism possible, but this does not seem to be the way. Also note, that most, if not all NATS clients have manual flush option for exactly this reason. |
If you want to write at the highest possible speed remove the timer diff --git a/async-nats/src/lib.rs b/async-nats/src/lib.rs
index d516697..1a3345d 100644
--- a/async-nats/src/lib.rs
+++ b/async-nats/src/lib.rs
@@ -460,7 +460,7 @@ impl ConnectionHandler {
}
if !self.handler.is_flushing && self.handler.connection.needs_flush() {
- self.handler.is_flushing = self.handler.flush_interval.poll_tick(cx).is_ready();
+ self.handler.is_flushing = true;
}
if self.handler.is_flushing { |
0d9c518
to
4d4c294
Compare
Ah, you're right. @paolobarbolini I did a quick check for how flush-less approach could work here #1070 |
I liked the idea of #1070 of only flushing if there's nothing left to write. I've incorporated it as an experiment. It doesn't seem to make much of a difference with the current benchmarks. I've even tried removing Updated benchmarks against EDIT: redid the benchmarks with #1073 on both branches benchmark
|
Try it with a larger buffer, e.g #971 has been showing night and day difference for me. |
Making |
0272762
to
bb0b63b
Compare
Updated benchmarks results
|
bb0b63b
to
db07c2e
Compare
5f7c74c
to
4acb7c8
Compare
I've added some docs |
4acb7c8
to
200ca88
Compare
200ca88
to
cdd3f8a
Compare
cdd3f8a
to
9e0be06
Compare
Benchmarks on a Ryzen 5900X server main (8661572)Results
First part of the PR (5a73aac) against mainThis was the original scope of the PR. It allows concurrent read, write and flush operations, removes flush Results
Full PR (9e0be06) against the first stepThese are the extra 3 commits I did at the end to further optimize writes. Small writes get flattened, while big ones bypass all buffering (at least on our side). When the connection supports it we do vectored writes. All of this allowed us to remove Results
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, I'm liking it. Going to avoid nitpicking as this is rather huge.
There are some unwraps in the write macros, but since this is coming from internal, I suppose its fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, did a final read-through, going to be an approval from me 🎉
LGTM!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did run few more benches on linux with solid results.
That concludes work on that PR.
LGTM!
Great effort, thanks for that!
Sorry you had to wait quite a bit.
9e0be06
to
5f23beb
Compare
🥳. I've fixed the merge conflict |
Can you rebase the branch so commit messages are consistent with others in the repo? (Capitalize first letter, simple imperative mood) @paolobarbolini |
5f23beb
to
264d667
Compare
264d667
to
d11950a
Compare
This is an attempt at refactoring
Connection
andConnectionHandler
to usepoll
instead ofasync
.await
syntax.Advantages:
Poll::Pending
the other operations can be polledselect!
from a very critical path. The issues with it are the following:Future
s polled by it's branches to be cancel safe. Cancel safety requiresDrop
ping of aFuture
to be a no-op. While this seemed to be true it's easy to get wrong.Left to implement:
flush
Fixes #905
Fixes #923
Fixes #582
Closes #869
Closes #935
Closes #971
Closes #1070