From f5b33383da822c30cd1feea3ced19ab79367208e Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Fri, 15 Dec 2023 11:11:12 +0100 Subject: [PATCH 1/6] fix(runtime update): wait until upgrade on chain --- subxt/src/backend/rpc/rpc_client.rs | 5 +-- subxt/src/client/online_client.rs | 61 ++++++++++++++++++++++++----- testing/test-runtime/build.rs | 5 +-- 3 files changed, 54 insertions(+), 17 deletions(-) diff --git a/subxt/src/backend/rpc/rpc_client.rs b/subxt/src/backend/rpc/rpc_client.rs index 0e4bb62940..4302b9867e 100644 --- a/subxt/src/backend/rpc/rpc_client.rs +++ b/subxt/src/backend/rpc/rpc_client.rs @@ -230,10 +230,7 @@ impl Stream for RpcSubscription { mod jsonrpsee_helpers { pub use jsonrpsee::{ client_transport::ws::{Receiver, Sender, Url, WsTransportClientBuilder}, - core::{ - client::{Client, ClientBuilder}, - Error, - }, + core::{client::Client, Error}, }; /// Build WS RPC client from URL diff --git a/subxt/src/client/online_client.rs b/subxt/src/client/online_client.rs index 338643ebea..147ab261f0 100644 --- a/subxt/src/client/online_client.rs +++ b/subxt/src/client/online_client.rs @@ -3,6 +3,7 @@ // see LICENSE for license details. use super::{OfflineClient, OfflineClientT}; +use crate::config::Header; use crate::custom_values::CustomValuesClient; use crate::{ backend::{ @@ -437,17 +438,13 @@ impl RuntimeUpdaterStream { Err(err) => return Some(Err(err)), }; - let latest_block_ref = match self.client.backend().latest_finalized_block_ref().await { - Ok(block_ref) => block_ref, - Err(e) => return Some(Err(e)), + let at = match wait_for_runtime_upgrade(&self.client, &runtime_version).await { + Some(Ok(at)) => at, + Some(Err(err)) => return Some(Err(err)), + None => return None, }; - let metadata = match OnlineClient::fetch_metadata( - self.client.backend(), - latest_block_ref.hash(), - ) - .await - { + let metadata = match OnlineClient::fetch_metadata(self.client.backend(), at.hash()).await { Ok(metadata) => metadata, Err(err) => return Some(Err(err)), }; @@ -484,3 +481,49 @@ impl Update { &self.metadata } } + +/// Helper to wait until the runtime upgrade is applied on at finalized block. +async fn wait_for_runtime_upgrade( + client: &OnlineClient, + runtime_version: &RuntimeVersion, +) -> Option> { + use scale_value::At; + + let mut block_sub = match client.backend().stream_finalized_block_headers().await { + Ok(s) => s, + Err(err) => return Some(Err(err)), + }; + + let head = loop { + let (block, block_ref) = match block_sub.next().await? { + Ok(n) => n, + Err(err) => return Some(Err(err)), + }; + + let key: Vec = vec![]; + let addr = crate::dynamic::storage("System", "LastRuntimeUpgrade", key); + + let chunk = match client.storage().at(block_ref).fetch(&addr).await { + Ok(Some(v)) => v, + Ok(None) => continue, + Err(e) => return Some(Err(e)), + }; + + let scale_val = match chunk.to_value() { + Ok(v) => v, + Err(e) => return Some(Err(e)), + }; + + let spec_version = scale_val + .at("spec_version") + .and_then(|v| v.as_u128()) + .expect("specVersion should exist on RuntimeVersion; qed") + as u32; + + if spec_version >= runtime_version.spec_version { + break block; + } + }; + + Some(Ok(head)) +} diff --git a/testing/test-runtime/build.rs b/testing/test-runtime/build.rs index 23643e491d..9a6ae962b6 100644 --- a/testing/test-runtime/build.rs +++ b/testing/test-runtime/build.rs @@ -103,10 +103,7 @@ async fn run() { mod client { pub use jsonrpsee::{ client_transport::ws::{Receiver, Sender, Url, WsTransportClientBuilder}, - core::{ - client::{Client, ClientBuilder}, - Error, - }, + core::{client::Client, Error}, }; pub use jsonrpsee::core::{client::ClientT, rpc_params}; From b02a8af52c49f64bcf9d8d7e2ba7dce00916d9b7 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 4 Jan 2024 16:29:52 +0100 Subject: [PATCH 2/6] address grumbles --- subxt/src/client/online_client.rs | 37 +++++++++++++++++-------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/subxt/src/client/online_client.rs b/subxt/src/client/online_client.rs index 147ab261f0..da0adb7fb9 100644 --- a/subxt/src/client/online_client.rs +++ b/subxt/src/client/online_client.rs @@ -3,13 +3,12 @@ // see LICENSE for license details. use super::{OfflineClient, OfflineClientT}; -use crate::config::Header; use crate::custom_values::CustomValuesClient; use crate::{ backend::{ legacy::LegacyBackend, rpc::RpcClient, Backend, BackendExt, RuntimeVersion, StreamOfResults, }, - blocks::BlocksClient, + blocks::{BlockRef, BlocksClient}, constants::ConstantsClient, error::Error, events::EventsClient, @@ -438,11 +437,11 @@ impl RuntimeUpdaterStream { Err(err) => return Some(Err(err)), }; - let at = match wait_for_runtime_upgrade(&self.client, &runtime_version).await { - Some(Ok(at)) => at, - Some(Err(err)) => return Some(Err(err)), - None => return None, - }; + let at = + match wait_runtime_upgrade_in_finalized_block(&self.client, &runtime_version).await? { + Ok(at) => at, + Err(err) => return Some(Err(err)), + }; let metadata = match OnlineClient::fetch_metadata(self.client.backend(), at.hash()).await { Ok(metadata) => metadata, @@ -483,10 +482,10 @@ impl Update { } /// Helper to wait until the runtime upgrade is applied on at finalized block. -async fn wait_for_runtime_upgrade( +async fn wait_runtime_upgrade_in_finalized_block( client: &OnlineClient, runtime_version: &RuntimeVersion, -) -> Option> { +) -> Option, Error>> { use scale_value::At; let mut block_sub = match client.backend().stream_finalized_block_headers().await { @@ -494,8 +493,8 @@ async fn wait_for_runtime_upgrade( Err(err) => return Some(Err(err)), }; - let head = loop { - let (block, block_ref) = match block_sub.next().await? { + let block_ref = loop { + let (_, block_ref) = match block_sub.next().await? { Ok(n) => n, Err(err) => return Some(Err(err)), }; @@ -503,7 +502,7 @@ async fn wait_for_runtime_upgrade( let key: Vec = vec![]; let addr = crate::dynamic::storage("System", "LastRuntimeUpgrade", key); - let chunk = match client.storage().at(block_ref).fetch(&addr).await { + let chunk = match client.storage().at(block_ref.hash()).fetch(&addr).await { Ok(Some(v)) => v, Ok(None) => continue, Err(e) => return Some(Err(e)), @@ -514,16 +513,20 @@ async fn wait_for_runtime_upgrade( Err(e) => return Some(Err(e)), }; - let spec_version = scale_val + let Some(Ok(spec_version)) = scale_val .at("spec_version") .and_then(|v| v.as_u128()) - .expect("specVersion should exist on RuntimeVersion; qed") - as u32; + .map(|v| u32::try_from(v)) + else { + return Some(Err(Error::Other( + "Decoding `RuntimeVersion::spec_version` as u32 failed".to_string(), + ))); + }; if spec_version >= runtime_version.spec_version { - break block; + break block_ref; } }; - Some(Ok(head)) + Some(Ok(block_ref)) } From 27937518fe804a6f136d68f91a5f883bec022981 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 4 Jan 2024 17:41:28 +0100 Subject: [PATCH 3/6] Update subxt/src/client/online_client.rs --- subxt/src/client/online_client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/subxt/src/client/online_client.rs b/subxt/src/client/online_client.rs index da0adb7fb9..fac5436a60 100644 --- a/subxt/src/client/online_client.rs +++ b/subxt/src/client/online_client.rs @@ -516,7 +516,7 @@ async fn wait_runtime_upgrade_in_finalized_block( let Some(Ok(spec_version)) = scale_val .at("spec_version") .and_then(|v| v.as_u128()) - .map(|v| u32::try_from(v)) + .map(u32::try_from) else { return Some(Err(Error::Other( "Decoding `RuntimeVersion::spec_version` as u32 failed".to_string(), From a294cb38ec015139ef39e0a370e29d5ccad731d3 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 4 Jan 2024 21:10:12 +0100 Subject: [PATCH 4/6] Update subxt/src/client/online_client.rs --- subxt/src/client/online_client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/subxt/src/client/online_client.rs b/subxt/src/client/online_client.rs index fac5436a60..3350b9c786 100644 --- a/subxt/src/client/online_client.rs +++ b/subxt/src/client/online_client.rs @@ -523,7 +523,7 @@ async fn wait_runtime_upgrade_in_finalized_block( ))); }; - if spec_version >= runtime_version.spec_version { + if spec_version > runtime_version.spec_version { break block_ref; } }; From 9388915dbedef70ade29918bfce355b1931802e7 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Fri, 5 Jan 2024 19:19:52 +0100 Subject: [PATCH 5/6] fix nits and debug logs --- subxt/src/client/online_client.rs | 41 ++++++++++++++++++++++++++----- 1 file changed, 35 insertions(+), 6 deletions(-) diff --git a/subxt/src/client/online_client.rs b/subxt/src/client/online_client.rs index da0adb7fb9..108cffbcb5 100644 --- a/subxt/src/client/online_client.rs +++ b/subxt/src/client/online_client.rs @@ -3,6 +3,7 @@ // see LICENSE for license details. use super::{OfflineClient, OfflineClientT}; +use crate::config::Header; use crate::custom_values::CustomValuesClient; use crate::{ backend::{ @@ -430,13 +431,31 @@ pub struct RuntimeUpdaterStream { impl RuntimeUpdaterStream { /// Wait for the next runtime update. pub async fn next(&mut self) -> Option> { - let maybe_runtime_version = self.stream.next().await?; - - let runtime_version = match maybe_runtime_version { + let runtime_version = match self.stream.next().await? { Ok(runtime_version) => runtime_version, Err(err) => return Some(Err(err)), }; + // Remove this block before merging. + { + let b = self + .client + .backend() + .latest_finalized_block_ref() + .await + .unwrap(); + + let h = self + .client + .backend() + .block_header(b.hash()) + .await + .unwrap() + .unwrap(); + + tracing::info!("Runtime upgrade initiated at #{}", h.number().into()); + } + let at = match wait_runtime_upgrade_in_finalized_block(&self.client, &runtime_version).await? { Ok(at) => at, @@ -494,7 +513,7 @@ async fn wait_runtime_upgrade_in_finalized_block( }; let block_ref = loop { - let (_, block_ref) = match block_sub.next().await? { + let (b, block_ref) = match block_sub.next().await? { Ok(n) => n, Err(err) => return Some(Err(err)), }; @@ -502,9 +521,14 @@ async fn wait_runtime_upgrade_in_finalized_block( let key: Vec = vec![]; let addr = crate::dynamic::storage("System", "LastRuntimeUpgrade", key); + // The storage `system::lastRuntimeUpgrade` should always have a version + // + // let chunk = match client.storage().at(block_ref.hash()).fetch(&addr).await { Ok(Some(v)) => v, - Ok(None) => continue, + Ok(None) => { + continue; + } Err(e) => return Some(Err(e)), }; @@ -523,8 +547,13 @@ async fn wait_runtime_upgrade_in_finalized_block( ))); }; - if spec_version >= runtime_version.spec_version { + // We are waiting for the chain to have the same spec version + // as sent out via the runtime subscription. + if spec_version == runtime_version.spec_version { + tracing::info!("block #{} new runtime", b.number().into()); break block_ref; + } else { + tracing::info!("block #{} no new runtime", b.number().into()); } }; From 3c4a34acd613c8bc28662f18f3aeeb30c6715356 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Mon, 8 Jan 2024 12:14:04 +0100 Subject: [PATCH 6/6] remove debug logs --- subxt/src/client/online_client.rs | 33 ++++--------------------------- 1 file changed, 4 insertions(+), 29 deletions(-) diff --git a/subxt/src/client/online_client.rs b/subxt/src/client/online_client.rs index d4e6314fd0..cb45e56796 100644 --- a/subxt/src/client/online_client.rs +++ b/subxt/src/client/online_client.rs @@ -3,7 +3,6 @@ // see LICENSE for license details. use super::{OfflineClient, OfflineClientT}; -use crate::config::Header; use crate::custom_values::CustomValuesClient; use crate::{ backend::{ @@ -436,26 +435,6 @@ impl RuntimeUpdaterStream { Err(err) => return Some(Err(err)), }; - // Remove this block before merging. - { - let b = self - .client - .backend() - .latest_finalized_block_ref() - .await - .unwrap(); - - let h = self - .client - .backend() - .block_header(b.hash()) - .await - .unwrap() - .unwrap(); - - tracing::info!("Runtime upgrade initiated at #{}", h.number().into()); - } - let at = match wait_runtime_upgrade_in_finalized_block(&self.client, &runtime_version).await? { Ok(at) => at, @@ -513,7 +492,7 @@ async fn wait_runtime_upgrade_in_finalized_block( }; let block_ref = loop { - let (b, block_ref) = match block_sub.next().await? { + let (_, block_ref) = match block_sub.next().await? { Ok(n) => n, Err(err) => return Some(Err(err)), }; @@ -521,13 +500,12 @@ async fn wait_runtime_upgrade_in_finalized_block( let key: Vec = vec![]; let addr = crate::dynamic::storage("System", "LastRuntimeUpgrade", key); - // The storage `system::lastRuntimeUpgrade` should always have a version - // - // let chunk = match client.storage().at(block_ref.hash()).fetch(&addr).await { Ok(Some(v)) => v, Ok(None) => { - continue; + // The storage `system::lastRuntimeUpgrade` should always exist. + // + unreachable!("The storage item `system::lastRuntimeUpgrade` should always exist") } Err(e) => return Some(Err(e)), }; @@ -550,10 +528,7 @@ async fn wait_runtime_upgrade_in_finalized_block( // We are waiting for the chain to have the same spec version // as sent out via the runtime subscription. if spec_version == runtime_version.spec_version { - tracing::info!("block #{} new runtime", b.number().into()); break block_ref; - } else { - tracing::info!("block #{} no new runtime", b.number().into()); } };