-
Notifications
You must be signed in to change notification settings - Fork 176
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add local/remote sub cache to publisher #1611
base: main
Are you sure you want to change the base?
feat: add local/remote sub cache to publisher #1611
Conversation
PR missing one of the required labels: {'dependencies', 'enhancement', 'documentation', 'bug', 'breaking-change', 'new feature', 'internal'} |
a4e9b4d
to
5455ecc
Compare
zenoh/src/api/session.rs
Outdated
@@ -2162,12 +2171,37 @@ impl SessionInner { | |||
#[cfg(feature = "unstable")] source_info: SourceInfo, | |||
attachment: Option<ZBytes>, | |||
) -> ZResult<()> { | |||
const NO_REMOTE_FLAG: u64 = 0b01; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's refactor cache
to be an object with methods like set_no_remotes()
etc to make code cleaner and more readable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The issue is that resolve_put
is shared with Session::put
workflow, in which there is no cache. Because I'm not sure that resolve_put
is inlined, I've written the code in the "most efficient" way using a dummy cached
value . But using a PublisherCache
type when there is no publisher is quite odd...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will try to restructure the code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need some brush-up
a25202e
to
e07d74d
Compare
I've extracted the caching logic into dedicated types. The result is slightly less performant by 1-2% than the previous version — even after adding an other optimization preventing the payload clone if there is no local sub — but I don't think we care, as it's still a lot more performant than the current |
This reverts commit 1753654.
04abf79
to
22c15de
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The z_local_pub_sub_thr
example needs to be removed.
The cache disabling in WeakSession::send_declare
is not enough. For example after connecting to a new router/peer data is routed to this peer until the Declare<Sub>
or Declare<Final>
is received. So cache should be invalidated on new transport and re invalidated on Declare<Sub>
or Declare<Final>
.
All points where cache invalidation should occur need to be identified. And a notification needs to be established to the Session
.
Current
Publisher::put
workflow always try to check if there are both local and remote subscribers, and these operations are costly, e.g. more than 10% of the time spent in checking local subscribers. However, the information is known, at least can easily be retrieved, and it could be used to save a lot of CPU consumption in case of remote-only/local-only workflow.The PR does it by adding a cache to the publisher, to store if there are no remote and/or no local subscribers, in order to shortcut the
put
workflow. The cache is an atomic integer, directly stored in the publisher, with one bit flag for each information. Both remote and local routing functions has been updated to return a boolean saying if there was a matching route/subscriber, in order to update the cache.Also, in order to reset the cache when there are new remote/local subscribers detected, the session keep a
subscription_version
integer field, incremented each time there is a new subscriber (at the same place where matching listener are notified). The version is also stored in the remaining bits of the cache, and if both don't match, the cache is reset with the current version. To not care about version rollover, a 64bit integer is used, which let 62bit used for the version (because of two bit flags) in the cache, so more than enough.