-
Notifications
You must be signed in to change notification settings - Fork 254
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
rpc: add full support reconnecting rpc client #1505
Conversation
} | ||
} | ||
|
||
impl<Hash: BlockHash> Stream for FollowStreamFinalizedHeads<Hash> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me!As a nit I'd be tempted to put it in a different file like the stream in storage_items.rs
is (or put some of this stuff into a utils file or something) since it's not really a part of the follow stream driver code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yepp, the thingy was that the FollowStreamSubscription wasn't exported outside this and I placed here to avoid sharing it with pub crate
.
I can move it if it ok to make it pub crate :)
// Track the last block to determine how many blocks that were missed when reconnecting. | ||
let mut last_seen_block = None; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Being just an example, I wonder if it's worth this extra complexity to log missing block info vs just printing block details and continuing if disconnected_will_reconnect error :)
I don't mind much either way though!
Co-authored-by: James Wilson <james@jsdw.me>
let backend: LegacyBackend<PolkadotConfig> = | ||
LegacyBackend::builder().build(RpcClient::new(rpc.clone())); | ||
|
||
let api: OnlineClient<PolkadotConfig> = OnlineClient::from_backend(Arc::new(backend)).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just as a note: in the "default" case, one can also just to something like OnlineClient::from_rpc(rpc_client)
, which might be an easier thing to point them at in an example.
(And perhaps they can then figure out how to use custom backends in a separate example)
subxt/src/backend/utils.rs
Outdated
F: FnMut() -> ResubscribeFuture<R> + Send + 'static + Clone, | ||
R: Send + 'static, | ||
{ | ||
loop { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a shame that we are sortof duplicating the try logic in an outer and then inner loop here, I think just to return the initial error in the Result
before we are inside a stream and errors come in the streams.
I wonder if it makes sense to change the signature of this fn to return -> StreamOfResults<R>
(ie no outer Result
) and then the body can be simpler. This sortof makes sense to me anyway because it's a bit odd that we would care about catching the initial future failing but then be happy with any other errors coming through the stream instead.
eg:
// future that will resubscribe to the stream, internally retrying
// if any DisconnectedWillReconnect (I wonder whether `retry` could be
// used to make this bit even shorter too..)
let resubscribe = Box::new(move || {...});
// The extra Box is to encapsulate the retry subscription type
return Ok(StreamOf::new(Box::pin(RetrySubscription {
state: PendingOrStream::Stream(v),
resubscribe,
})));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe best to leave as is for this PR though because changing it above might hjave knock-on effects on the signatures of backend fns or whatever. Just something to ponder!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you will be happier now after I removed the !DRY stuff
@@ -431,6 +506,11 @@ impl<T: Config> Stream for StorageFetchDescendantKeysStream<T> { | |||
return Poll::Ready(Some(Ok(keys))); | |||
} | |||
Err(e) => { | |||
if e.is_disconnected_will_reconnect() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps worth a comment, because this is doing something a bit weird: if a future returns an Err(DisconnectedWillReconnect)
, we expect to keep polling it and it will internally retry itself :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
true, changed it to continue the loop instead
/// A stream that subscribes to finalized blocks | ||
/// and indicates whether a block was missed if was restarted. | ||
#[derive(Debug)] | ||
pub struct FollowStreamFinalizedHeads<Hash: BlockHash, F> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where's the type that this is using that stops it from being able to move out? I think I'm missing it :D
/// and indicates whether a block was missed if was restarted. | ||
#[derive(Debug)] | ||
pub struct FollowStreamFinalizedHeads<Hash: BlockHash, F> { | ||
stream: FollowStreamDriverSubscription<Hash>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This type @jsdw ☝️
I could make it generic but I prefer not to :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that struct is pub
in this file though, so we could import it from a separate file OK I think? :D
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's a pita because the test helpers are not accessible outside this module :(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see! either way it's all good and we can always shuffle stuff about in the future or whatever anyways!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, nice job @niklasad1!
I left some nits and suggestions but nothing that would block it merging :)
This PR improves the reconnecting rpc client so that it works with the unstable RPC backend, which requires re-starting
chainhead_follow
along with other stuff that is required to update the state when a new connection is established.Close #1423
Some notes about the PR:
retry, retry_stream
to retry calls, and subscriptions if one wants to retry such calls if the connection was lost.author_submitAndWatch
are not retried, and it's up to the caller to handle such things.