Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stabilize chainHead methods #1538

Merged
merged 3 commits into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion subxt/src/backend/unstable/follow_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl<Hash> FollowStream<Hash> {
let methods = methods.clone();
Box::pin(async move {
// Make the RPC call:
let stream = methods.chainhead_unstable_follow(true).await?;
let stream = methods.chainhead_v1_follow(true).await?;
// Extract the subscription ID:
let Some(sub_id) = stream.subscription_id().map(ToOwned::to_owned) else {
return Err(Error::Other(
Expand Down
2 changes: 1 addition & 1 deletion subxt/src/backend/unstable/follow_stream_unpin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ impl<Hash: BlockHash> FollowStreamUnpin<Hash> {
let methods = methods.clone();
let fut: UnpinFut = Box::pin(async move {
// We ignore any errors trying to unpin at the moment.
let _ = methods.chainhead_unstable_unpin(&sub_id, hash).await;
let _ = methods.chainhead_v1_unpin(&sub_id, hash).await;
});
fut
});
Expand Down
8 changes: 4 additions & 4 deletions subxt/src/backend/unstable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl<T: Config> UnstableBackend<T> {

async move {
let res = methods
.chainhead_unstable_header(&sub_id, block_ref.hash())
.chainhead_v1_header(&sub_id, block_ref.hash())
.await
.transpose()?;

Expand Down Expand Up @@ -286,15 +286,15 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {

async fn block_header(&self, at: T::Hash) -> Result<Option<T::Header>, Error> {
let sub_id = get_subscription_id(&self.follow_handle).await?;
self.methods.chainhead_unstable_header(&sub_id, at).await
self.methods.chainhead_v1_header(&sub_id, at).await
}

async fn block_body(&self, at: T::Hash) -> Result<Option<Vec<Vec<u8>>>, Error> {
let sub_id = get_subscription_id(&self.follow_handle).await?;

// Subscribe to the body response and get our operationId back.
let follow_events = self.follow_handle.subscribe().events();
let status = self.methods.chainhead_unstable_body(&sub_id, at).await?;
let status = self.methods.chainhead_v1_body(&sub_id, at).await?;
let operation_id = match status {
MethodResponse::LimitReached => {
return Err(RpcError::request_rejected("limit reached").into())
Expand Down Expand Up @@ -645,7 +645,7 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
let call_parameters = call_parameters.unwrap_or(&[]);
let status = self
.methods
.chainhead_unstable_call(&sub_id, at, method, call_parameters)
.chainhead_v1_call(&sub_id, at, method, call_parameters)
.await?;
let operation_id = match status {
MethodResponse::LimitReached => {
Expand Down
79 changes: 35 additions & 44 deletions subxt/src/backend/unstable/rpc_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,118 +33,112 @@ impl<T: Config> UnstableRpcMethods<T> {
}
}

/// Subscribe to `chainHead_unstable_follow` to obtain all reported blocks by the chain.
/// Subscribe to `chainHead_v1_follow` to obtain all reported blocks by the chain.
///
/// The subscription ID can be used to make queries for the
/// block's body ([`chainhead_unstable_body`](UnstableRpcMethods::chainhead_unstable_follow)),
/// block's header ([`chainhead_unstable_header`](UnstableRpcMethods::chainhead_unstable_header)),
/// block's storage ([`chainhead_unstable_storage`](UnstableRpcMethods::chainhead_unstable_storage)) and submitting
/// runtime API calls at this block ([`chainhead_unstable_call`](UnstableRpcMethods::chainhead_unstable_call)).
/// block's body ([`chainHead_v1_body`](UnstableRpcMethods::chainhead_v1_follow)),
/// block's header ([`chainHead_v1_header`](UnstableRpcMethods::chainhead_v1_header)),
/// block's storage ([`chainHead_v1_storage`](UnstableRpcMethods::chainhead_v1_storage)) and submitting
/// runtime API calls at this block ([`chainHead_v1_call`](UnstableRpcMethods::chainhead_v1_call)).
///
/// # Note
///
/// When the user is no longer interested in a block, the user is responsible
/// for calling the [`chainhead_unstable_unpin`](UnstableRpcMethods::chainhead_unstable_unpin) method.
/// for calling the [`chainHead_v1_unpin`](UnstableRpcMethods::chainhead_v1_unpin) method.
/// Failure to do so will result in the subscription being stopped by generating the `Stop` event.
pub async fn chainhead_unstable_follow(
pub async fn chainhead_v1_follow(
&self,
with_runtime: bool,
) -> Result<FollowSubscription<T::Hash>, Error> {
let sub = self
.client
.subscribe(
"chainHead_unstable_follow",
"chainHead_v1_follow",
rpc_params![with_runtime],
"chainHead_unstable_unfollow",
"chainHead_v1_unfollow",
)
.await?;

Ok(FollowSubscription { sub, done: false })
}

/// Resumes a storage fetch started with chainHead_unstable_storage after it has generated an
/// Resumes a storage fetch started with chainHead_v1_storage after it has generated an
/// `operationWaitingForContinue` event.
///
/// Has no effect if the operationId is invalid or refers to an operation that has emitted a
/// `{"event": "operationInaccessible"` event, or if the followSubscription is invalid or stale.
pub async fn chainhead_unstable_continue(
pub async fn chainhead_v1_continue(
&self,
follow_subscription: &str,
operation_id: &str,
) -> Result<(), Error> {
self.client
.request(
"chainHead_unstable_continue",
"chainHead_v1_continue",
rpc_params![follow_subscription, operation_id],
)
.await?;

Ok(())
}

/// Stops an operation started with `chainHead_unstable_body`, `chainHead_unstable_call`, or
/// `chainHead_unstable_storage¦. If the operation was still in progress, this interrupts it.
/// Stops an operation started with `chainHead_v1_body`, `chainHead_v1_call`, or
/// `chainHead_v1_storage¦. If the operation was still in progress, this interrupts it.
/// If the operation was already finished, this call has no effect.
///
/// Has no effect if the `followSubscription` is invalid or stale.
pub async fn chainhead_unstable_stop_operation(
pub async fn chainhead_v1_stop_operation(
&self,
follow_subscription: &str,
operation_id: &str,
) -> Result<(), Error> {
self.client
.request(
"chainHead_unstable_stopOperation",
"chainHead_v1_stopOperation",
rpc_params![follow_subscription, operation_id],
)
.await?;

Ok(())
}

/// Call the `chainHead_unstable_body` method and return an operation ID to obtain the block's body.
/// Call the `chainHead_v1_body` method and return an operation ID to obtain the block's body.
///
/// The response events are provided on the `chainHead_follow` subscription and identified by
/// the returned operation ID.
///
/// # Note
///
/// The subscription ID is obtained from an open subscription created by
/// [`chainhead_unstable_follow`](UnstableRpcMethods::chainhead_unstable_follow).
pub async fn chainhead_unstable_body(
/// [`chainHead_v1_follow`](UnstableRpcMethods::chainhead_v1_follow).
pub async fn chainhead_v1_body(
&self,
subscription_id: &str,
hash: T::Hash,
) -> Result<MethodResponse, Error> {
let response = self
.client
.request(
"chainHead_unstable_body",
rpc_params![subscription_id, hash],
)
.request("chainHead_v1_body", rpc_params![subscription_id, hash])
.await?;

Ok(response)
}

/// Get the block's header using the `chainHead_unstable_header` method.
/// Get the block's header using the `chainHead_v1_header` method.
///
/// # Note
///
/// The subscription ID is obtained from an open subscription created by
/// [`chainhead_unstable_follow`](UnstableRpcMethods::chainhead_unstable_follow).
pub async fn chainhead_unstable_header(
/// [`chainHead_v1_follow`](UnstableRpcMethods::chainhead_v1_follow).
pub async fn chainhead_v1_header(
&self,
subscription_id: &str,
hash: T::Hash,
) -> Result<Option<T::Header>, Error> {
// header returned as hex encoded SCALE encoded bytes.
let header: Option<Bytes> = self
.client
.request(
"chainHead_unstable_header",
rpc_params![subscription_id, hash],
)
.request("chainHead_v1_header", rpc_params![subscription_id, hash])
.await?;

let header = header
Expand All @@ -153,16 +147,16 @@ impl<T: Config> UnstableRpcMethods<T> {
Ok(header)
}

/// Call the `chainhead_unstable_storage` method and return an operation ID to obtain the block's storage.
/// Call the `chainHead_v1_storage` method and return an operation ID to obtain the block's storage.
///
/// The response events are provided on the `chainHead_follow` subscription and identified by
/// the returned operation ID.
///
/// # Note
///
/// The subscription ID is obtained from an open subscription created by
/// [`chainhead_unstable_follow`](UnstableRpcMethods::chainhead_unstable_follow).
pub async fn chainhead_unstable_storage(
/// [`chainHead_v1_follow`](UnstableRpcMethods::chainhead_v1_follow).
pub async fn chainhead_v1_storage(
&self,
subscription_id: &str,
hash: T::Hash,
Expand All @@ -180,24 +174,24 @@ impl<T: Config> UnstableRpcMethods<T> {
let response = self
.client
.request(
"chainHead_unstable_storage",
"chainHead_v1_storage",
rpc_params![subscription_id, hash, items, child_key.map(to_hex)],
)
.await?;

Ok(response)
}

/// Call the `chainhead_unstable_storage` method and return an operation ID to obtain the runtime API result.
/// Call the `chainHead_v1_storage` method and return an operation ID to obtain the runtime API result.
///
/// The response events are provided on the `chainHead_follow` subscription and identified by
/// the returned operation ID.
///
/// # Note
///
/// The subscription ID is obtained from an open subscription created by
/// [`chainhead_unstable_follow`](UnstableRpcMethods::chainhead_unstable_follow).
pub async fn chainhead_unstable_call(
/// [`chainHead_v1_follow`](UnstableRpcMethods::chainhead_v1_follow).
pub async fn chainhead_v1_call(
&self,
subscription_id: &str,
hash: T::Hash,
Expand All @@ -207,7 +201,7 @@ impl<T: Config> UnstableRpcMethods<T> {
let response = self
.client
.request(
"chainHead_unstable_call",
"chainHead_v1_call",
rpc_params![subscription_id, hash, function, to_hex(call_parameters)],
)
.await?;
Expand All @@ -220,17 +214,14 @@ impl<T: Config> UnstableRpcMethods<T> {
/// # Note
///
/// The subscription ID is obtained from an open subscription created by
/// [`chainhead_unstable_follow`](UnstableRpcMethods::chainhead_unstable_follow).
pub async fn chainhead_unstable_unpin(
/// [`chainHead_v1_follow`](UnstableRpcMethods::chainhead_v1_follow).
pub async fn chainhead_v1_unpin(
&self,
subscription_id: &str,
hash: T::Hash,
) -> Result<(), Error> {
self.client
.request(
"chainHead_unstable_unpin",
rpc_params![subscription_id, hash],
)
.request("chainHead_v1_unpin", rpc_params![subscription_id, hash])
.await?;

Ok(())
Expand Down
8 changes: 2 additions & 6 deletions subxt/src/backend/unstable/storage_items.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl<T: Config> StorageItems<T> {
// Subscribe to events and make the initial request to get an operation ID.
let follow_events = follow_handle.subscribe().events();
let status = methods
.chainhead_unstable_storage(&sub_id, at, queries, None)
.chainhead_v1_storage(&sub_id, at, queries, None)
.await?;
let operation_id: Arc<str> = match status {
MethodResponse::LimitReached => {
Expand All @@ -59,11 +59,7 @@ impl<T: Config> StorageItems<T> {
let operation_id = operation_id.clone();
let methods = methods.clone();

Box::pin(async move {
methods
.chainhead_unstable_continue(&sub_id, &operation_id)
.await
})
Box::pin(async move { methods.chainhead_v1_continue(&sub_id, &operation_id).await })
})
};

Expand Down
Loading
Loading