Skip to content

Commit

Permalink
[PLA-2115] Adds relaychain transactions functionality (#75)
Browse files Browse the repository at this point in the history
  • Loading branch information
leonardocustodio authored Dec 10, 2024
1 parent 85943df commit 1556eb5
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 42 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "wallet-daemon"
version = "2.0.3"
version = "2.0.4"
edition = "2021"

[profile.release]
Expand Down
96 changes: 63 additions & 33 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,43 @@ use subxt::backend::rpc::reconnecting_rpc_client::{ExponentialBackoff, RpcClient
use subxt::{OnlineClient, PolkadotConfig};
use wallet_daemon::config_loader::{load_config, load_wallet};
use wallet_daemon::{
set_multitenant, write_seed, DeriveWalletJob, SubscriptionJob, TransactionJob,
set_multitenant, write_seed, DeriveWalletJob, SubscriptionJob, SubscriptionParams,
TransactionJob,
};

async fn setup_client(
url: &str,
) -> (
Arc<OnlineClient<PolkadotConfig>>,
SubscriptionJob,
Arc<SubscriptionParams>,
) {
let rpc_client = RpcClient::builder()
.retry_policy(
ExponentialBackoff::from_millis(100)
.max_delay(Duration::from_secs(10))
.take(3),
)
.build(url.to_string())
.await
.unwrap();

let online_client = OnlineClient::<PolkadotConfig>::from_rpc_client(rpc_client)
.await
.unwrap();

let runtime_updater = online_client.updater();
tokio::spawn(async move {
let _ = runtime_updater.perform_runtime_updates().await;
});

let client = Arc::new(online_client);
let subscription = SubscriptionJob::create_job(Arc::clone(&client));
let sub_params = subscription.get_params();

(client, subscription, sub_params)
}

#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args: Vec<String> = env::args().skip(1).collect();
Expand All @@ -23,40 +57,29 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
}

let (keypair, matrix_url, _relay_url, platform_url, platform_token) =
let (keypair, matrix_url, relay_url, platform_url, platform_token) =
load_wallet(load_config()).await;
let signing = hex::encode(keypair.public_key().0);

tracing_subscriber::fmt::init();

// Check if we are connecting to a multitenant platform
set_multitenant(signing, platform_url.clone(), platform_token.clone()).await;
// Setup matrix client and parameters
let (matrix_client, matrix_subscription, matrix_sub_params) = setup_client(&matrix_url).await;
// Setup relay client and parameters
let (relay_client, relay_subscription, relay_sub_params) = setup_client(&relay_url).await;

let rpc_client = RpcClient::builder()
.retry_policy(
ExponentialBackoff::from_millis(100)
.max_delay(Duration::from_secs(10))
.take(3),
)
.build(matrix_url.clone())
.await
.unwrap();

let chain_client = OnlineClient::<PolkadotConfig>::from_rpc_client(rpc_client)
.await
.unwrap();

let update_task = chain_client.updater();
tokio::spawn(async move {
let _ = update_task.perform_runtime_updates().await;
});

let chain_client = Arc::new(chain_client);
let subscription_job = SubscriptionJob::create_job(Arc::clone(&chain_client));
let params = subscription_job.get_params();
let (matrix_tx_poller, matrix_tx_processor) = TransactionJob::create_job(
Arc::clone(&matrix_client),
Arc::clone(&matrix_sub_params),
keypair.clone(),
platform_url.clone(),
platform_token.clone(),
);

let (transaction_poller, transaction_processor) = TransactionJob::create_job(
Arc::clone(&chain_client),
Arc::clone(&params),
let (relay_tx_poller, relay_tx_processor) = TransactionJob::create_job(
Arc::clone(&relay_client),
Arc::clone(&relay_sub_params),
keypair.clone(),
platform_url.clone(),
platform_token.clone(),
Expand All @@ -66,14 +89,21 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
DeriveWalletJob::create_job(keypair, platform_url, platform_token);

tokio::select! {
_ = transaction_poller.start() => {}
_ = transaction_processor.start() => {}
_ = wallet_poller.start() => {}
_ = wallet_processor.start() => {}
r = subscription_job.start() => {
r = relay_subscription.start() => {
let err = r.unwrap_err();
tracing::error!("Subscription job failed: {:?}", err);
}
m = matrix_subscription.start() => {
let err = m.unwrap_err();
tracing::error!("Subscription job failed: {:?}", err);
}

_ = relay_tx_poller.start() => {}
_ = relay_tx_processor.start() => {}
_ = matrix_tx_poller.start() => {}
_ = matrix_tx_processor.start() => {}
_ = wallet_poller.start() => {}
_ = wallet_processor.start() => {}
}

Ok(())
Expand Down
84 changes: 78 additions & 6 deletions src/subscription.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::{panic};
use std::{fmt, panic};
use subxt::client::ClientRuntimeUpdater;
use subxt::dynamic::At;
use subxt::ext::subxt_core;
use subxt::{OnlineClient, PolkadotConfig};
use subxt_core::config::substrate;
Expand All @@ -11,6 +13,48 @@ pub struct SubscriptionJob {
params: Arc<SubscriptionParams>,
}

#[derive(Debug, Clone)]
pub enum Network {
EnjinRelay,
CanaryRelay,
EnjinMatrix,
CanaryMatrix,
}

impl Network {
pub fn to_query_var(&self) -> Option<String> {
match self {
Network::EnjinRelay => Some("relay".to_string()),
Network::CanaryRelay => Some("relay".to_string()),
_ => Some("matrix".to_string()),
}
}
}

impl fmt::Display for Network {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Network::EnjinRelay => write!(f, "Enjin Relaychain"),
Network::CanaryRelay => write!(f, "Canary Relaychain"),
Network::EnjinMatrix => write!(f, "Enjin Matrixchain"),
Network::CanaryMatrix => write!(f, "Canary Matrixchain"),
}
}
}

impl FromStr for Network {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"enjin" => Ok(Network::EnjinRelay),
"matrix-enjin" => Ok(Network::EnjinMatrix),
"canary" => Ok(Network::CanaryRelay),
"matrix" => Ok(Network::CanaryMatrix),
_ => Err(format!("Unknown network: {}", s)),
}
}
}

impl SubscriptionJob {
pub fn new(params: Arc<SubscriptionParams>) -> Self {
Self { params }
Expand All @@ -19,10 +63,20 @@ impl SubscriptionJob {
pub fn create_job(rpc: Arc<OnlineClient<PolkadotConfig>>) -> SubscriptionJob {
let block_header = Arc::new(Mutex::new(None));
let spec_version = Arc::new(Mutex::new(None));

let system_version = rpc
.constants()
.at(&subxt::dynamic::constant("System", "Version"))
.unwrap();
let system_version = system_version.to_value().unwrap();
let spec_name = system_version.at("spec_name").unwrap();
let network = Network::from_str(spec_name.as_str().unwrap()).unwrap();

let subscription = Arc::new(SubscriptionParams {
rpc,
block_header: Arc::clone(&block_header),
spec_version: Arc::clone(&spec_version),
network: Arc::clone(&Arc::new(network)),
});

SubscriptionJob::new(subscription)
Expand Down Expand Up @@ -51,6 +105,7 @@ pub struct SubscriptionParams {
rpc: Arc<OnlineClient<PolkadotConfig>>,
block_header: Arc<Mutex<Option<substrate::SubstrateHeader<u32, substrate::BlakeTwo256>>>>,
spec_version: Arc<Mutex<Option<u32>>>,
network: Arc<Network>,
}

impl SubscriptionParams {
Expand All @@ -64,7 +119,11 @@ impl SubscriptionParams {
match updater.apply_update(update) {
Ok(()) => {
*spec_version = Some(version);
tracing::info!("Upgrade to specVersion: {} successful", version)
tracing::info!(
"{} has been upgraded to specVersion {} successfully",
self.network,
version
)
}
Err(e) => match *spec_version {
Some(v) => {
Expand All @@ -73,14 +132,19 @@ impl SubscriptionParams {
}

tracing::error!(
"Upgrade to specVersion {} failed {:?}. Please restart your daemon.",
"{} has failed to upgrade to specVersion {} with error {:?}. Please restart your daemon.",
self.network,
version,
e
);
}
None => {
*spec_version = Some(version);
tracing::info!("Using specVersion {} to sign transactions", version);
tracing::info!(
"Using specVersion {} to sign transactions for {}",
version,
self.network
);
}
},
};
Expand All @@ -97,15 +161,19 @@ impl SubscriptionParams {
Ok(b) => b,
Err(e) => {
if e.is_disconnected_will_reconnect() {
tracing::warn!("Lost connection with the RPC node, reconnecting...");
tracing::warn!(
"Lost connection with {} rpc node, reconnecting...",
self.network
);
}

continue;
}
};

tracing::info!(
"Current finalized block #{}: {}",
"Current finalized block for {}: #{} ({})",
self.network,
block.number(),
block.hash()
);
Expand All @@ -123,4 +191,8 @@ impl SubscriptionParams {

block_header_lock.clone().unwrap()
}

pub fn get_network(&self) -> Arc<Network> {
Arc::clone(&self.network)
}
}
14 changes: 12 additions & 2 deletions src/transaction.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::graphql::{mark_and_list_pending_transactions, MarkAndListPendingTransactions};
use crate::subscription::Network;
use crate::{platform_client, SubscriptionParams};
use backoff::exponential::ExponentialBackoff;
use backoff::SystemClock;
Expand Down Expand Up @@ -62,6 +63,7 @@ pub struct TransactionJob {
sender: Sender<Vec<TransactionRequest>>,
platform_url: String,
platform_token: String,
network: Arc<Network>,
}

impl TransactionJob {
Expand All @@ -70,12 +72,14 @@ impl TransactionJob {
sender: Sender<Vec<TransactionRequest>>,
platform_url: String,
platform_token: String,
network: Arc<Network>,
) -> Self {
Self {
client,
sender,
platform_url,
platform_token,
network,
}
}

Expand All @@ -87,13 +91,15 @@ impl TransactionJob {
platform_token: String,
) -> (TransactionJob, TransactionProcessor) {
let (sender, receiver) = tokio::sync::mpsc::channel(50_000);
let network = block_sub.get_network();

(
TransactionJob::new(
Client::new(),
sender,
platform_url.clone(),
platform_token.clone(),
network,
),
TransactionProcessor::new(
rpc,
Expand Down Expand Up @@ -128,7 +134,11 @@ impl TransactionJob {
}
Err(e) => {
if e.to_string() == NO_TRANSACTIONS_MSG {
tracing::info!("MarkAndListPendingTransactions: {}", NO_TRANSACTIONS_MSG);
tracing::info!(
"MarkAndListPendingTransactions: {} for {}",
NO_TRANSACTIONS_MSG,
self.network
);
} else {
tracing::error!("Error: {:?}", e);
}
Expand All @@ -142,7 +152,7 @@ impl TransactionJob {
) -> Result<Vec<TransactionRequest>, Box<dyn std::error::Error + Send + Sync>> {
let res = MarkAndListPendingTransactions::build_query(
mark_and_list_pending_transactions::Variables {
network: None,
network: self.network.to_query_var(),
after: None,
first: Some(TRANSACTION_PAGE_SIZE),
mark_as_processing: Some(true),
Expand Down

0 comments on commit 1556eb5

Please sign in to comment.