-
Notifications
You must be signed in to change notification settings - Fork 93
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ref(actix): Migrate the RelayCache actor #1485
Conversation
relays: HashMap::new(), | ||
relay_channels: HashMap::new(), | ||
senders: HashMap::new(), | ||
fetch_channel: mpsc::channel(1), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to spawn a long(er) running background task to fetch relay infos and insert them into our map. It cannot hold mutable access to the map during that time, since there can be get requests in between.
My solution is to have a dedicated mpsc
to update the internal state when the fetch result is ready. Since there can only be a single fetch at any given time, this channel can have a bounded capacity of 1
.
An alternative solution to this would be an optional future similar to SleepHandle
. I have an experimental implementation of that here, although I don't think it is any better.
pub struct RelayCacheService { | ||
static_relays: HashMap<RelayId, RelayInfo>, | ||
relays: HashMap<RelayId, RelayState>, | ||
senders: HashMap<RelayId, Vec<Sender<GetRelayResult>>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a substantial change. Instead of spawning a long-running future for every request call, we simply queue up the incoming message's Sender
. If the relay info is already cached, this is skipped, of course.
This requires us to allocate for each incoming (pending) request, although it should still cause less overhead than maintaining a tokio task each. While queued, these senders do not consume CPU.
} | ||
} | ||
|
||
impl Handler<GetRelays> for RelayCache { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has been moved to the call site in the public_keys
endpoint handler, allowing us to avoid spawning yet another task over here.
if poll.is_ready() { | ||
self.reset(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At some point during the implementation, I forgot to reset the sleep handle after polling it successfully. That sent the service into an endless loop. That is a dangerous API. Usually, the expectation can be that once the sleep has been polled successfully, it is handled and can be reset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please update the doc comment to explain that the reset happens automatically
bd8fd3c
to
9bf64b4
Compare
Some(result) = self.fetch_channel.1.recv() => self.handle_fetch_result(result), | ||
() = &mut self.delay => self.fetch_relays(), | ||
Some(message) = rx.recv() => self.get_or_fetch(message.0, message.1), | ||
else => break, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this break log a sentry error? relay is now very broken, some stuff will not happen and we'll need to debug why. it'd be nice to have a sentry error for this rather than having to look for that info message that says this thing has stopped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This break is in fact dead code and cannot be reached because self.delay
will always be pending. That we have to change in a follow-up for all services at some point in the future when we revise the shutdown strategy.
if poll.is_ready() { | ||
self.reset(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please update the doc comment to explain that the reset happens automatically
The `ProjectCache` and `RelayCache` actors both follow a common pattern: They maintain a map of cached objects that they populate asynchronously. There is a message in their public interface to retrieve data from this cache. Since the `RelayCache` was ported in #1485, it keeps a list of senders for each waiting caller of that message. Once the cache entry has been resolved, it loops through all senders and passed a clone of the value on. This has the downside that it doesn't only require resources on the requesting side, but also in the cache actor itself. A better approach is to break this into a two-step process: 1. The service creates a shared channel that will be populated once 2. Recipients attach to the shared channel and wait for the response The recipient therefore first has to await the receive end of the shared channel, and then await the value from that channel. Of course, if the value is already in the cache, this can be skipped, and the value can be sent directly to the recipient. # Implementation Details This PR implements this with two nested `oneshot` channels. The outer channel resolves an enum, which is either the result value, or a shared inner channel that resolves the value. We implement this as service response behavior, so that the building blocks for this (senders, channels, ...) can be reused between multiple services. This can be used for some of the messages, while other messages have regular async behavior. The response future type is opaque, so users of such a service do not notice the two-step behavior; they simply resolve the final value. This PR updates the `RelayCacheService` to use this new response behavior. Internally, the service still needs to hold two maps: - A map for the actual raw data that it pulls responses from. This data can be different or more elaborate than what's exposed via public messages. - A map for open channels. Entries in this map can be removed as soon as the cache value is resolved, because after that the service can short-circuit and respond with the value directly.
Updates the Relay cache to run as Tokio service. The upstream query is moved to a background task with its own
mpsc
. All other operations, such as updating the internal map and retrieving relay information, run sequentially in the service.The service operates exactly as before, including an pre-existing race condition when a relay is queried while a fetch is running.
#skip-changelog