From 53b38b202c76b0d269926dbada596f51184fb88e Mon Sep 17 00:00:00 2001 From: Anton Yemelyanov Date: Thu, 22 Jun 2023 05:49:17 +0300 Subject: [PATCH] replace async Iterators with Streams wip - initial WASM wallet interface --- Cargo.lock | 32 ---- Cargo.toml | 32 ++-- wallet/cli/src/cli.rs | 105 +++-------- wallet/core/src/imports.rs | 7 +- wallet/core/src/iterator.rs | 33 ---- wallet/core/src/lib.rs | 2 +- wallet/core/src/runtime/account.rs | 31 +-- wallet/core/src/runtime/iterators.rs | 101 ---------- wallet/core/src/runtime/mod.rs | 2 - wallet/core/src/runtime/wallet.rs | 63 +++++-- wallet/core/src/storage/interface.rs | 34 +--- wallet/core/src/storage/keydata.rs | 9 + wallet/core/src/storage/local/cache.rs | 52 ++++++ wallet/core/src/storage/local/interface.rs | 90 ++------- wallet/core/src/storage/local/iterators.rs | 207 --------------------- wallet/core/src/storage/local/mod.rs | 3 +- wallet/core/src/storage/local/store.rs | 4 - wallet/core/src/storage/local/streams.rs | 154 +++++++++++++++ wallet/core/src/tx/transaction.rs | 8 +- wallet/core/src/wasm/account.rs | 161 ++++++++++++++++ wallet/core/src/wasm/mod.rs | 3 + wallet/core/src/wasm/tests.rs | 57 ++++++ wallet/core/src/wasm/wallet.rs | 15 ++ wasm/nodejs/test.js | 29 +++ 24 files changed, 625 insertions(+), 609 deletions(-) delete mode 100644 wallet/core/src/iterator.rs delete mode 100644 wallet/core/src/runtime/iterators.rs create mode 100644 wallet/core/src/storage/local/cache.rs delete mode 100644 wallet/core/src/storage/local/iterators.rs create mode 100644 wallet/core/src/storage/local/streams.rs create mode 100644 wallet/core/src/wasm/account.rs create mode 100644 wallet/core/src/wasm/mod.rs create mode 100644 wallet/core/src/wasm/tests.rs create mode 100644 wallet/core/src/wasm/wallet.rs create mode 100644 wasm/nodejs/test.js diff --git a/Cargo.lock b/Cargo.lock index 79deb9cf7..335f655e2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5055,8 +5055,6 @@ dependencies = [ [[package]] name = "workflow-core" version = "0.3.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6d92a715bf429af48ba8b97be93a6f0ab814840a86141d5abecc39a687b1a3d" dependencies = [ "async-channel", "async-std", @@ -5082,8 +5080,6 @@ dependencies = [ [[package]] name = "workflow-core-macros" version = "0.3.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26be7bc2a0d23c990ed267c536e82d09b31f01ece2b3eb7a6240ecd29ddcce49" dependencies = [ "convert_case 0.6.0", "parse-variants", @@ -5098,8 +5094,6 @@ dependencies = [ [[package]] name = "workflow-dom" version = "0.3.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa50f0834bb994da72c862f47c2e944df62a2b2f8d4afda1503ea48fddcac6e6" dependencies = [ "futures", "js-sys", @@ -5115,8 +5109,6 @@ dependencies = [ [[package]] name = "workflow-log" version = "0.3.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f99f114d69ae9742aa0773259f14c4719ff1d4df48e967ea49b57e12ed32c09" dependencies = [ "cfg-if 1.0.0", "console", @@ -5131,8 +5123,6 @@ dependencies = [ [[package]] name = "workflow-macro-tools" version = "0.3.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "574fd252dfd6484be3f16c6f2b76dbf92657d6eca8ddafff67bf92c517715117" dependencies = [ "convert_case 0.6.0", "parse-variants", @@ -5144,8 +5134,6 @@ dependencies = [ [[package]] name = "workflow-node" version = "0.3.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec61304523b75a58c3dbd7a96dcf6b0c9b8fc78891b8c0c845a5794f34aa1983" dependencies = [ "futures", "js-sys", @@ -5163,8 +5151,6 @@ dependencies = [ [[package]] name = "workflow-panic-hook" version = "0.3.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5cffeb9c24d36625651cafdb4eb61dfff8afdbd38ae4edd8280450e8397bda5a" dependencies = [ "cfg-if 1.0.0", "wasm-bindgen", @@ -5174,8 +5160,6 @@ dependencies = [ [[package]] name = "workflow-rpc" version = "0.3.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3da096065c5370ec64b86efb6ad73271ea4bb439cec589083cfbad27dedf4416" dependencies = [ "ahash 0.8.3", "async-std", @@ -5204,8 +5188,6 @@ dependencies = [ [[package]] name = "workflow-rpc-macros" version = "0.3.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91195047832ae4e7de3824722a4be6a333172e735900cca09cd536ad37811422" dependencies = [ "parse-variants", "proc-macro-error", @@ -5217,8 +5199,6 @@ dependencies = [ [[package]] name = "workflow-store" version = "0.3.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a0d6e1a472f90ae42f728801b2d460b2160b9deb5d7c9c0b3bd646cf04420ae" dependencies = [ "async-std", "base64 0.21.0", @@ -5237,8 +5217,6 @@ dependencies = [ [[package]] name = "workflow-task" version = "0.3.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e944ed147ac9a46fa17d0779f080d4058358dcc01f6e52415a264c54258435a" dependencies = [ "futures", "thiserror", @@ -5249,8 +5227,6 @@ dependencies = [ [[package]] name = "workflow-task-macros" version = "0.3.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5d2b5a0e8f19fad09e069138e131d73cb56ac90de2f7f76641192b47df93810" dependencies = [ "convert_case 0.6.0", "parse-variants", @@ -5265,8 +5241,6 @@ dependencies = [ [[package]] name = "workflow-terminal" version = "0.3.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d1991405665d322b27dcba3c3aff3e1f2d3437a0b61c409beca58b6cf1b97cbc" dependencies = [ "async-std", "async-trait", @@ -5289,8 +5263,6 @@ dependencies = [ [[package]] name = "workflow-wasm" version = "0.3.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd89a47abbb335d86e12fc6babb4851fcac5384e18150bd537100fa3a64f6272" dependencies = [ "cfg-if 1.0.0", "faster-hex", @@ -5310,8 +5282,6 @@ dependencies = [ [[package]] name = "workflow-wasm-macros" version = "0.3.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "187b9521e689374a8e00521f77a2082d8b83d3db6bc3aa036bd63d024eb4fcbe" dependencies = [ "js-sys", "proc-macro-error", @@ -5324,8 +5294,6 @@ dependencies = [ [[package]] name = "workflow-websocket" version = "0.3.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fe15e9fa1f8bebc410ba15b29205b5b3fd7238cd45e11b37af624d57fdcde2e" dependencies = [ "ahash 0.8.3", "async-std", diff --git a/Cargo.toml b/Cargo.toml index b1679d189..bdf25ba7b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -180,23 +180,23 @@ pbkdf2 = { version = "0.12.1" } # pbkdf2 = { version = "0.11", default-features = false} # workflow dependencies -workflow-log = { version = "0.3.17" } -workflow-core = { version = "0.3.17" } -workflow-wasm = { version = "0.3.17" } -workflow-dom = { version = "0.3.17" } -workflow-rpc = { version = "0.3.17" } -workflow-node = { version = "0.3.17" } -workflow-store = { version = "0.3.17" } -workflow-terminal = { version = "0.3.17" } +# workflow-log = { version = "0.3.17" } +# workflow-core = { version = "0.3.17" } +# workflow-wasm = { version = "0.3.17" } +# workflow-dom = { version = "0.3.17" } +# workflow-rpc = { version = "0.3.17" } +# workflow-node = { version = "0.3.17" } +# workflow-store = { version = "0.3.17" } +# workflow-terminal = { version = "0.3.17" } -# workflow-log = { path = "../workflow-rs/log" } -# workflow-core = { path = "../workflow-rs/core" } -# workflow-wasm = { path = "../workflow-rs/wasm" } -# workflow-dom = { path = "../workflow-rs/dom" } -# workflow-rpc = { path = "../workflow-rs/rpc" } -# workflow-node = { path = "../workflow-rs/node" } -# workflow-store = { path = "../workflow-rs/store" } -# workflow-terminal = { path = "../workflow-rs/terminal" } +workflow-log = { path = "../workflow-rs/log" } +workflow-core = { path = "../workflow-rs/core" } +workflow-wasm = { path = "../workflow-rs/wasm" } +workflow-dom = { path = "../workflow-rs/dom" } +workflow-rpc = { path = "../workflow-rs/rpc" } +workflow-node = { path = "../workflow-rs/node" } +workflow-store = { path = "../workflow-rs/store" } +workflow-terminal = { path = "../workflow-rs/terminal" } [profile.release] lto = "thin" diff --git a/wallet/cli/src/cli.rs b/wallet/cli/src/cli.rs index 6a9daa8bc..50eadcd13 100644 --- a/wallet/cli/src/cli.rs +++ b/wallet/cli/src/cli.rs @@ -3,6 +3,7 @@ use crate::error::Error; use crate::helpers; use crate::result::Result; use async_trait::async_trait; +use futures::stream::{Stream, StreamExt, TryStreamExt}; use futures::*; use kaspa_wallet_core::accounts::gen0::import::*; use kaspa_wallet_core::imports::ToHex; @@ -121,13 +122,8 @@ impl WalletCli { let account_kind: AccountKind = select_variant(&term, "Please select account type", &mut argv).await?; - let prv_key_data_info = select_item( - &term, - "Please select private key", - &mut argv, - self.wallet.store().as_prv_key_data_store()?.iter().await?, - ) - .await?; + let prv_key_data_info = + select_item(&term, "Please select private key", &mut argv, self.wallet.keys().await?.err_into()).await?; self.create_account(prv_key_data_info.id, account_kind, term).await?; } @@ -188,19 +184,20 @@ impl WalletCli { let address = serde_json::from_str::
(address)?; let amount_sompi = helpers::kas_str_to_sompi(amount)?; - let password = Secret::new(term.ask(true, "Enter wallet password: ").await?.trim().as_bytes().to_vec()); + let wallet_secret = Secret::new(term.ask(true, "Enter wallet password: ").await?.trim().as_bytes().to_vec()); let mut payment_secret = Option::::None; if self.wallet.is_account_key_encrypted(&account).await?.is_some_and(|f| f) { payment_secret = Some(Secret::new(term.ask(true, "Enter payment password: ").await?.trim().as_bytes().to_vec())); } - let keydata = self.wallet.get_prv_key_data(password.clone(), &account.prv_key_data_id).await?; + let keydata = self.wallet.get_prv_key_data(wallet_secret.clone(), &account.prv_key_data_id).await?; if keydata.is_none() { return Err("It is read only wallet.".into()); } let abortable = Abortable::default(); let ids = - account.send(&address, amount_sompi, priority_fee_sompi, keydata.unwrap(), payment_secret, &abortable).await?; + // account.send(&address, amount_sompi, priority_fee_sompi, keydata.unwrap(), payment_secret, &abortable).await?; + account.send(&address, amount_sompi, priority_fee_sompi, wallet_secret, payment_secret, &abortable).await?; term.writeln(format!("\r\nSending {amount} KAS to {address}, tx ids:")); term.writeln(format!("{}\r\n", ids.into_iter().map(|a| a.to_string()).collect::>().join("\r\n"))); @@ -248,7 +245,7 @@ impl WalletCli { let mnemonic = helpers::ask_mnemonic(&term).await?; log_info!("Mnemonic: {:?}", mnemonic); } - "kaspanet" => { + "legacy" => { if exists_v0_keydata().await? { let import_secret = Secret::new( term.ask(true, "Enter the password for the wallet you are importing:") @@ -275,26 +272,14 @@ impl WalletCli { // ~~~ Action::List => { - let mut keys = self.wallet.keys(); - while let Some(keys) = keys.next().await? { - for key in keys { - term.writeln(format!("{key}")); - let mut accounts = self.wallet.accounts(Some(key.id)); - while let Some(accounts) = accounts.next().await? { - for account in accounts { - term.writeln(format!(" {}", account.get_ls_string())); - } - } + let mut keys = self.wallet.keys().await?; + while let Some(key) = keys.try_next().await? { + term.writeln(format!("{key}")); + let mut accounts = self.wallet.accounts(Some(key.id)).await?; + while let Some(account) = accounts.try_next().await? { + term.writeln(format!(" {}", account.get_ls_string())); } } - - // let map = self.wallet.account_map().locked_map(); - // for (prv_key_data_id, list) in map.iter() { - // term.writeln(format!("key: {}", prv_key_data_id.to_hex())); - // for account in list.iter() { - // term.writeln(account.get_ls_string()); - // } - // } } Action::Select => { if argv.is_empty() { @@ -541,55 +526,24 @@ impl Cli for WalletCli { impl WalletCli {} -// use kaspa_wallet_core::storage; use kaspa_wallet_core::runtime; -async fn select_account( - term: &Arc, - // prompt: &str, - // argv: &mut Vec, - // iface : &Arc, - wallet: &Arc, - // mut iter: Box>>, -) -> Result> { +async fn select_account(term: &Arc, wallet: &Arc) -> Result> { let mut selection = None; - // let list: Vec> = iter.collect().await?; - - // if !argv.is_empty() { - // let text = argv.remove(0); - // let matched = list - // .into_iter() - // // - TODO match by name - // .filter(|item| item.id().to_hex().starts_with(&text)) - // .collect::>(); - - // if matched.len() == 1 { - // return Ok(matched.first().cloned().unwrap()); - // } else { - // return Err(Error::MultipleMatches(text)); - // } - // } let mut list_by_key = Vec::<(Arc, Vec<(usize, Arc)>)>::new(); let mut flat_list = Vec::>::new(); - // let mut seq: usize = 0; - let mut keys = wallet.keys(); - while let Some(keys) = keys.next().await? { - for key in keys { - let mut prv_key_accounts = Vec::new(); - // term.writeln(format!("key: {}", key.id.to_hex())); - let mut accounts = wallet.accounts(Some(key.id)); - while let Some(accounts) = accounts.next().await? { - for account in accounts { - // term.writeln(format!("account: {}", account.id.to_hex())); - prv_key_accounts.push((flat_list.len(), account.clone())); - flat_list.push(account.clone()); - // seq += 1; - } - } - - list_by_key.push((key.clone(), prv_key_accounts)); + let mut keys = wallet.keys().await?; + while let Some(key) = keys.try_next().await? { + let mut prv_key_accounts = Vec::new(); + let mut accounts = wallet.accounts(Some(key.id)).await?; + while let Some(account) = accounts.next().await { + let account = account?; + prv_key_accounts.push((flat_list.len(), account.clone())); + flat_list.push(account.clone()); } + + list_by_key.push((key.clone(), prv_key_accounts)); } while selection.is_none() { @@ -601,10 +555,6 @@ async fn select_account( }) }); - // list.iter().enumerate().for_each(|(seq, item)| { - // term.writeln(format!("{}: {} ({})", seq + 1, item, item.id().to_hex())); - // }); - let text = term .ask(false, &format!("Please select account ({}..{}) or to abort: ", 0, flat_list.len() - 1)) .await? @@ -629,18 +579,17 @@ async fn select_account( } #[allow(dead_code)] -// async fn select_item(term : &Arc, prompt: &str, argv : &mut Vec, mut iter : impl kaspa_wallet_core::iterator::Iterator) -> Result async fn select_item( term: &Arc, prompt: &str, argv: &mut Vec, - mut iter: Box>>, + iter: impl Stream>>, ) -> Result> where T: std::fmt::Display + IdT + Clone + Send + Sync + 'static, { let mut selection = None; - let list: Vec> = iter.collect().await?; + let list = iter.try_collect::>().await?; if !argv.is_empty() { let text = argv.remove(0); diff --git a/wallet/core/src/imports.rs b/wallet/core/src/imports.rs index 91a712db5..b281fc8da 100644 --- a/wallet/core/src/imports.rs +++ b/wallet/core/src/imports.rs @@ -1,16 +1,21 @@ pub use crate::convert::ScriptPublicKeyTrait; pub use crate::error::Error; +pub use async_trait::async_trait; pub use borsh::{BorshDeserialize, BorshSerialize}; +pub use futures::stream::{self, Stream, StreamExt, Then, TryStreamExt}; pub use js_sys::{Array, Object}; pub use kaspa_addresses::Address; pub use kaspa_consensus_core::subnets; pub use kaspa_consensus_core::subnets::SubnetworkId; pub use kaspa_consensus_core::tx as cctx; pub use kaspa_consensus_core::tx::{ScriptPublicKey, TransactionId, TransactionIndexType}; -pub use kaspa_utils::hex::ToHex; +pub use kaspa_utils::hex::{FromHex, ToHex}; pub use serde::{Deserialize, Deserializer, Serialize}; +pub use std::pin::Pin; pub use std::sync::{Arc, Mutex, MutexGuard}; +pub use std::task::{Context, Poll}; pub use wasm_bindgen::prelude::*; pub use workflow_log::prelude::*; pub use workflow_wasm::jsvalue::*; pub use workflow_wasm::object::*; +pub use workflow_wasm::stream::AsyncStream; diff --git a/wallet/core/src/iterator.rs b/wallet/core/src/iterator.rs deleted file mode 100644 index 6ec059976..000000000 --- a/wallet/core/src/iterator.rs +++ /dev/null @@ -1,33 +0,0 @@ -use crate::result::Result; -use async_trait::async_trait; - -#[derive(Default, Clone)] -pub struct IteratorOptions { - pub chunk_size: Option, -} - -#[async_trait] -pub trait Iterator: Send + Sync { - type Item: Send + Sync; - async fn next(&mut self) -> Result>>; - - async fn len(&mut self) -> Result { - let mut len = 0; - while let Some(chunk) = self.next().await? { - len += chunk.len(); - } - Ok(len) - } - - async fn is_empty(&mut self) -> Result { - Ok(self.len().await? == 0) - } - - async fn collect(&mut self) -> Result> { - let mut result = Vec::new(); - while let Some(chunk) = self.next().await? { - result.extend(chunk); - } - Ok(result) - } -} diff --git a/wallet/core/src/lib.rs b/wallet/core/src/lib.rs index 102d87cf2..e321a2775 100644 --- a/wallet/core/src/lib.rs +++ b/wallet/core/src/lib.rs @@ -7,7 +7,6 @@ pub mod convert; pub mod encryption; pub mod error; pub mod imports; -pub mod iterator; pub mod keypair; pub mod result; pub mod runtime; @@ -17,6 +16,7 @@ pub mod storage; pub mod tx; pub mod utils; pub mod utxo; +pub mod wasm; pub mod xprivatekey; pub mod xpublickey; diff --git a/wallet/core/src/runtime/account.rs b/wallet/core/src/runtime/account.rs index cb115bfbb..d686a99c5 100644 --- a/wallet/core/src/runtime/account.rs +++ b/wallet/core/src/runtime/account.rs @@ -6,7 +6,8 @@ use crate::result::Result; use crate::runtime::wallet::{BalanceUpdate, Events, Wallet}; use crate::secret::Secret; use crate::signer::sign_mutable_transaction; -use crate::storage::{self, PrvKeyData, PrvKeyDataId, PubKeyData}; +use crate::storage::interface::AccessContext; +use crate::storage::{self, AccessContextT, PrvKeyData, PrvKeyDataId, PubKeyData}; use crate::tx::{LimitCalcStrategy, PaymentOutput, PaymentOutputs, VirtualTransaction}; use crate::utxo::{UtxoEntryId, UtxoEntryReference, UtxoOrdering, UtxoSet}; use crate::AddressDerivationManager; @@ -143,9 +144,9 @@ pub struct Inner { /// Wallet `Account` data structure. An account is typically a single /// HD-key derivation (derived from a wallet or from an an external secret) -#[wasm_bindgen(inspectable)] +// #[wasm_bindgen(inspectable)] pub struct Account { - #[wasm_bindgen(skip)] + // #[wasm_bindgen(skip)] pub id: AccountId, inner: Arc>, wallet: Arc, @@ -153,17 +154,17 @@ pub struct Account { // balance: Arc, balance: Mutex>, is_connected: AtomicBool, - #[wasm_bindgen(js_name = "accountKind")] + // #[wasm_bindgen(js_name = "accountKind")] pub account_kind: AccountKind, pub account_index: u64, - #[wasm_bindgen(skip)] + // #[wasm_bindgen(skip)] pub prv_key_data_id: PrvKeyDataId, pub ecdsa: bool, - #[wasm_bindgen(skip)] + // #[wasm_bindgen(skip)] pub derivation: Arc, - #[wasm_bindgen(skip)] + // #[wasm_bindgen(skip)] pub task_ctl: DuplexChannel, - #[wasm_bindgen(skip)] + // #[wasm_bindgen(skip)] pub notification_channel: Channel, } @@ -260,8 +261,6 @@ impl Account { .broadcast(Events::Balance(balance_update)) .await .map_err(|_| Error::Custom("multiplexer channel error during update_balance".to_string()))?; - // self.wallet.multiplexer.broadcast(Events::Balance(balance_update)).await.map_err(Error::Custom("multiplexer channel error during update_balance".to_string())); - // self.wallet.multiplexer.broadcast(Events::Balance(self.clone())).await?; Ok(balance) } @@ -403,7 +402,8 @@ impl Account { address: &Address, amount_sompi: u64, priority_fee_sompi: u64, - keydata: PrvKeyData, + // keydata: PrvKeyData, + wallet_secret: Secret, payment_secret: Option, abortable: &Abortable, ) -> Result> { @@ -437,6 +437,15 @@ impl Account { let receive_indexes = indexes.0; let change_indexes = indexes.1; + let ctx: Arc = Arc::new(AccessContext::new(Some(wallet_secret))); + let keydata = self + .wallet + .store() + .as_prv_key_data_store()? + .load_key_data(&ctx, &self.prv_key_data_id) + .await? + .ok_or(Error::PrivateKeyNotFound(self.prv_key_data_id.to_hex()))?; + let private_keys = self.create_private_keys(keydata, payment_secret, receive_indexes, change_indexes)?; let private_keys = &private_keys.iter().map(|k| k.to_bytes()).collect::>(); let mut tx_ids = vec![]; diff --git a/wallet/core/src/runtime/iterators.rs b/wallet/core/src/runtime/iterators.rs deleted file mode 100644 index f3dc53658..000000000 --- a/wallet/core/src/runtime/iterators.rs +++ /dev/null @@ -1,101 +0,0 @@ -use super::{Account, Wallet}; -use crate::imports::*; -use crate::iterator::*; -use crate::result::Result; -use crate::storage; -use crate::storage::interface::Interface; -use crate::storage::PrvKeyDataId; -use crate::storage::PrvKeyDataInfo; -use async_trait::async_trait; -use futures::future::join_all; -use kaspa_addresses::Prefix as AddressPrefix; - -/// Runtime Account iterator. This iterator uses a storage iterator to -/// fetch all accounts from the storage, converting them into runtime accounts. -/// If an account already exists in the wallet runtime, the existing instance -/// is returned. -pub struct AccountIterator { - wallet: Arc, - store: Arc, - filter: Option, - options: IteratorOptions, - iter: Option>>>, -} - -impl AccountIterator { - pub fn new(wallet: &Arc, store: &Arc, filter: Option) -> AccountIterator { - AccountIterator { wallet: wallet.clone(), store: store.clone(), filter, options: IteratorOptions::default(), iter: None } - } - - pub fn new_with_options( - wallet: &Arc, - store: &Arc, - filter: Option, - options: IteratorOptions, - ) -> AccountIterator { - AccountIterator { wallet: wallet.clone(), store: store.clone(), filter, options, iter: None } - } - - async fn load_or_create(&self, stored: &storage::Account, prefix: AddressPrefix) -> Result> { - if let Some(account) = self.wallet.active_accounts().get(&stored.id) { - Ok(account) - } else { - Account::try_new_arc_from_storage(&self.wallet, stored, prefix).await - } - } -} - -#[async_trait] -impl Iterator for AccountIterator { - type Item = Arc; - - async fn next(&mut self) -> Result>> { - if self.iter.is_none() { - self.iter = Some(self.store.clone().as_account_store()?.iter_with_options(self.filter, self.options.clone()).await?); - } - - // use underlying iterator to fetch accounts - // if account is already loaded in the wallet, return it - // otherwise create a new (temporary) instance of the account - if let Some(accounts) = self.iter.as_mut().unwrap().next().await? { - let prefix: AddressPrefix = self.wallet.network().into(); - let accounts = accounts.iter().map(|stored| self.load_or_create(stored, prefix)).collect::>(); - let accounts = join_all(accounts).await.into_iter().collect::>>()?; - Ok(Some(accounts)) - } else { - Ok(None) - } - } -} - -pub struct PrvKeyDataIterator { - store: Arc, - options: IteratorOptions, - iter: Option>>>, -} - -impl PrvKeyDataIterator { - pub fn new(store: &Arc) -> PrvKeyDataIterator { - Self::new_with_options(store, IteratorOptions::default()) - } - pub fn new_with_options(store: &Arc, options: IteratorOptions) -> PrvKeyDataIterator { - PrvKeyDataIterator { store: store.clone(), options, iter: None } - } -} - -#[async_trait] -impl Iterator for PrvKeyDataIterator { - type Item = Arc; - - async fn next(&mut self) -> Result>> { - if self.iter.is_none() { - self.iter = Some(self.store.as_prv_key_data_store()?.iter_with_options(self.options.clone()).await?); - } - - if let Some(keydata) = self.iter.as_mut().unwrap().next().await? { - Ok(Some(keydata)) - } else { - Ok(None) - } - } -} diff --git a/wallet/core/src/runtime/mod.rs b/wallet/core/src/runtime/mod.rs index d59dd368b..9df5f7737 100644 --- a/wallet/core/src/runtime/mod.rs +++ b/wallet/core/src/runtime/mod.rs @@ -1,7 +1,5 @@ pub mod account; -pub mod iterators; pub mod wallet; pub use account::{Account, AccountId, AccountKind, AccountMap}; -pub use iterators::AccountIterator; pub use wallet::{BalanceUpdate, Events, Wallet}; diff --git a/wallet/core/src/runtime/wallet.rs b/wallet/core/src/runtime/wallet.rs index 94a31f13f..972dea319 100644 --- a/wallet/core/src/runtime/wallet.rs +++ b/wallet/core/src/runtime/wallet.rs @@ -1,6 +1,6 @@ -use crate::iterator::*; +// use crate::iterator::*; use crate::result::Result; -use crate::runtime::iterators::*; +// use crate::runtime::iterators::*; use crate::runtime::{Account, AccountId, AccountMap}; use crate::secret::Secret; use crate::storage::interface::{AccessContext, CreateArgs}; @@ -11,7 +11,8 @@ use crate::utxo::UtxoEntryReference; use crate::{accounts::gen0, accounts::gen0::import::*, accounts::gen1, accounts::gen1::import::*}; use crate::{imports::*, DynRpcApi}; use futures::future::join_all; -use futures::{select, FutureExt}; +use futures::stream::StreamExt; +use futures::{select, FutureExt, Stream}; use kaspa_addresses::Prefix as AddressPrefix; use kaspa_bip32::Mnemonic; use kaspa_consensus_core::networktype::NetworkType; @@ -105,16 +106,16 @@ pub struct Inner { /// `Wallet` data structure #[derive(Clone)] -#[wasm_bindgen] +// #[wasm_bindgen] pub struct Wallet { - #[wasm_bindgen(skip)] + // #[wasm_bindgen(skip)] pub rpc: Arc, - #[wasm_bindgen(skip)] + // #[wasm_bindgen(skip)] pub multiplexer: Multiplexer, // #[wasm_bindgen(skip)] // pub rpc_client: Arc, inner: Arc, - #[wasm_bindgen(skip)] + // #[wasm_bindgen(skip)] pub virtual_daa_score: Arc, } @@ -199,27 +200,37 @@ impl Wallet { } // pub fn load_accounts(&self, stored_accounts: Vec) => Result<()> { - pub async fn load(self: &Arc, secret: Secret, prefix: AddressPrefix) -> Result<()> { + pub async fn load(self: &Arc, secret: Secret, _prefix: AddressPrefix) -> Result<()> { // - TODO - RESET? self.reset().await?; use storage::interface::*; use storage::local::interface::*; - let ctx = Arc::new(AccessContext::new(Some(secret))); - let ctx: Arc = ctx; + let ctx: Arc = Arc::new(AccessContext::new(Some(secret))); + // let ctx: Arc = ctx; // let local_store = Arc::new(LocalStore::try_new(None, storage::local::DEFAULT_WALLET_FILE)?); let local_store = Arc::new(LocalStore::try_new()?); local_store.open(&ctx, OpenArgs::new(None)).await?; // let iface : Arc = local_store; let store_accounts = local_store.as_account_store()?; let mut iter = store_accounts.iter(None).await?; + // pin_mut!(iter); + // let mut iter = Box::pin(iter); + // let mut iter = iter; + // pin!(iter); + // let v = iter; // while let Some(ids) = iter.next().await { - while let Some(accounts) = iter.next().await? { + + // iter.for_each() + + while let Some(_accounts) = iter.try_next().await? { // let accounts = store_accounts.load(&ctx, &ids).await?; - let accounts = accounts.iter().map(|stored| Account::try_new_arc_from_storage(self, stored, prefix)).collect::>(); - let _accounts = join_all(accounts).await.into_iter().collect::>>()?; + // let account = accounts?; + + // let accounts = accounts.iter().map(|stored| Account::try_new_arc_from_storage(self, stored, prefix)).collect::>(); + // let _accounts = join_all(accounts).await.into_iter().collect::>>()?; // let accounts = accounts.into_iter().map(Arc::new).collect::>(); todo!(); @@ -697,12 +708,30 @@ impl Wallet { self.inner.store.exists(name).await } - pub fn keys(self: &Arc) -> Box>> { - Box::new(PrvKeyDataIterator::new(&self.inner.store)) + pub async fn keys(&self) -> Result>>> { + self.inner.store.as_prv_key_data_store().unwrap().iter().await } - pub fn accounts(self: &Arc, filter: Option) -> Box>> { - Box::new(AccountIterator::new(self, &self.inner.store, filter)) + pub async fn accounts(self: &Arc, filter: Option) -> Result>>> { + let iter = self.inner.store.as_account_store().unwrap().iter(filter).await.unwrap(); + let wallet = self.clone(); + + let stream = iter.then(move |stored| { + let wallet = wallet.clone(); + async move { + // TODO - set prefix in the Wallet + let prefix: AddressPrefix = wallet.network().into(); + + let stored = stored.unwrap(); + if let Some(account) = wallet.active_accounts().get(&stored.id) { + Ok(account) + } else { + Account::try_new_arc_from_storage(&wallet, &stored, prefix).await + } + } + }); + + Ok(Box::pin(stream)) } } diff --git a/wallet/core/src/storage/interface.rs b/wallet/core/src/storage/interface.rs index 147d2bb52..a6215e423 100644 --- a/wallet/core/src/storage/interface.rs +++ b/wallet/core/src/storage/interface.rs @@ -1,5 +1,4 @@ use crate::imports::*; -use crate::iterator::*; use crate::result::Result; use crate::secret::Secret; use async_trait::async_trait; @@ -35,13 +34,11 @@ impl AccessContextT for AccessContext { } } +pub type StorageStream = Pin>> + Send>>; + #[async_trait] pub trait PrvKeyDataStore: Send + Sync { - async fn iter(self: Arc) -> Result>>> { - self.iter_with_options(IteratorOptions::default()).await - } - - async fn iter_with_options(self: Arc, options: IteratorOptions) -> Result>>>; + async fn iter(&self) -> Result>; async fn load_key_info(&self, id: &PrvKeyDataId) -> Result>>; async fn load_key_data(&self, ctx: &Arc, id: &PrvKeyDataId) -> Result>; async fn store(&self, ctx: &Arc, data: PrvKeyData) -> Result<()>; @@ -50,15 +47,7 @@ pub trait PrvKeyDataStore: Send + Sync { #[async_trait] pub trait AccountStore: Send + Sync { - async fn iter(self: Arc, prv_key_data_id_filter: Option) -> Result>>> { - self.iter_with_options(prv_key_data_id_filter, IteratorOptions::default()).await - } - - async fn iter_with_options( - self: Arc, - prv_key_data_id_filter: Option, - options: IteratorOptions, - ) -> Result>>>; + async fn iter(&self, prv_key_data_id_filter: Option) -> Result>; async fn len(self: Arc, prv_key_data_id_filter: Option) -> Result; async fn load(&self, ids: &[AccountId]) -> Result>>; async fn store(&self, data: &[&Account]) -> Result<()>; @@ -67,21 +56,13 @@ pub trait AccountStore: Send + Sync { #[async_trait] pub trait MetadataStore: Send + Sync { - async fn iter(self: Arc, prv_key_data_id_filter: Option) -> Result>>> { - self.iter_with_options(prv_key_data_id_filter, IteratorOptions::default()).await - } - - async fn iter_with_options( - self: Arc, - prv_key_data_id_filter: Option, - options: IteratorOptions, - ) -> Result>>>; + async fn iter(&self, prv_key_data_id_filter: Option) -> Result>; async fn load(&self, id: &[AccountId]) -> Result>>; } #[async_trait] pub trait TransactionRecordStore: Send + Sync { - async fn iter(self: Arc, options: IteratorOptions) -> Result>>; + async fn iter(&self) -> Result>; async fn load(&self, id: &[TransactionRecordId]) -> Result>>; async fn store(&self, data: &[&TransactionRecord]) -> Result<()>; async fn remove(&self, id: &[&TransactionRecordId]) -> Result<()>; @@ -128,8 +109,7 @@ pub trait Interface: Send + Sync + AnySync { // stop the storage subsystem async fn close(&self) -> Result<()>; - // ~~~ - + // return storage information string (file location) async fn descriptor(&self) -> Result>; // ~~~ diff --git a/wallet/core/src/storage/keydata.rs b/wallet/core/src/storage/keydata.rs index 3e495f8c2..12ede7ecc 100644 --- a/wallet/core/src/storage/keydata.rs +++ b/wallet/core/src/storage/keydata.rs @@ -28,6 +28,15 @@ impl ToHex for KeyDataId { } } +impl FromHex for KeyDataId { + type Error = Error; + fn from_hex(hex_str: &str) -> Result { + let mut data = vec![0u8; hex_str.len() / 2]; + hex_decode(hex_str.as_bytes(), &mut data)?; + Ok(Self::new_from_slice(&data)) + } +} + impl std::fmt::Debug for KeyDataId { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "KeyDataId ( {:?} )", self.0) diff --git a/wallet/core/src/storage/local/cache.rs b/wallet/core/src/storage/local/cache.rs new file mode 100644 index 000000000..e623896d2 --- /dev/null +++ b/wallet/core/src/storage/local/cache.rs @@ -0,0 +1,52 @@ +use crate::imports::*; +use crate::result::Result; +use crate::secret::Secret; +use crate::storage::local::wallet::Wallet; +use crate::storage::local::*; +use crate::storage::*; +use std::collections::HashMap; + +pub struct Cache { + pub user_hint: Option, + pub prv_key_data: Encrypted, + pub prv_key_data_info: Collection, + pub accounts: Collection, + pub metadata: Collection, + pub transaction_records: Collection, +} + +impl TryFrom<(Wallet, &Secret)> for Cache { + type Error = Error; + fn try_from((wallet, secret): (Wallet, &Secret)) -> Result { + let payload = wallet.payload(secret.clone())?; + + let prv_key_data_info = + payload.0.prv_key_data.iter().map(|pkdata| pkdata.into()).collect::>().try_into()?; + + let prv_key_data_map = payload.0.prv_key_data.into_iter().map(|pkdata| (pkdata.id, pkdata)).collect::>(); + let prv_key_data: Decrypted = Decrypted::new(prv_key_data_map); + let prv_key_data = prv_key_data.encrypt(secret.clone())?; + let accounts: Collection = payload.0.accounts.try_into()?; + let metadata: Collection = wallet.metadata.try_into()?; + let user_hint = wallet.user_hint; + let transaction_records: Collection = payload.0.transaction_records.try_into()?; + + Ok(Cache { prv_key_data, prv_key_data_info, accounts, metadata, transaction_records, user_hint }) + } +} + +impl TryFrom<(&Cache, &Secret)> for Wallet { + type Error = Error; + + fn try_from((cache, secret): (&Cache, &Secret)) -> Result { + let prv_key_data: Decrypted = cache.prv_key_data.decrypt(secret.clone())?; + let prv_key_data = prv_key_data.values().cloned().collect::>(); + let accounts: Vec = (&cache.accounts).try_into()?; + let metadata: Vec = (&cache.metadata).try_into()?; + let transaction_records: Vec = (&cache.transaction_records).try_into()?; + let payload = Payload { prv_key_data, accounts, transaction_records }; + let payload = Decrypted::new(payload).encrypt(secret.clone())?; + + Ok(Wallet { payload, metadata, user_hint: cache.user_hint.clone() }) + } +} diff --git a/wallet/core/src/storage/local/interface.rs b/wallet/core/src/storage/local/interface.rs index b1fe636c5..894b714b6 100644 --- a/wallet/core/src/storage/local/interface.rs +++ b/wallet/core/src/storage/local/interface.rs @@ -1,66 +1,18 @@ -use std::collections::HashMap; -use std::sync::atomic::AtomicBool; -use std::sync::atomic::Ordering; - use crate::imports::*; -use crate::iterator::*; use crate::result::Result; -use crate::secret::Secret; use crate::storage::interface::CreateArgs; use crate::storage::interface::OpenArgs; -use crate::storage::local::iterators::*; +use crate::storage::interface::StorageStream; +use crate::storage::local::cache::*; +use crate::storage::local::streams::*; use crate::storage::local::wallet::Wallet; use crate::storage::local::*; use crate::storage::*; -use async_trait::async_trait; - -pub struct Cache { - pub user_hint: Option, - pub prv_key_data: Encrypted, - pub prv_key_data_info: Collection, - pub accounts: Collection, - pub metadata: Collection, - pub transaction_records: Collection, -} - -impl TryFrom<(Wallet, &Secret)> for Cache { - type Error = Error; - fn try_from((wallet, secret): (Wallet, &Secret)) -> Result { - let payload = wallet.payload(secret.clone())?; - - let prv_key_data_info = - payload.0.prv_key_data.iter().map(|pkdata| pkdata.into()).collect::>().try_into()?; - - let prv_key_data_map = payload.0.prv_key_data.into_iter().map(|pkdata| (pkdata.id, pkdata)).collect::>(); - let prv_key_data: Decrypted = Decrypted::new(prv_key_data_map); - let prv_key_data = prv_key_data.encrypt(secret.clone())?; - let accounts: Collection = payload.0.accounts.try_into()?; - let metadata: Collection = wallet.metadata.try_into()?; - let user_hint = wallet.user_hint; - let transaction_records: Collection = payload.0.transaction_records.try_into()?; - - Ok(Cache { prv_key_data, prv_key_data_info, accounts, metadata, transaction_records, user_hint }) - } -} - -impl TryFrom<(&Cache, &Secret)> for Wallet { - type Error = Error; - - fn try_from((cache, secret): (&Cache, &Secret)) -> Result { - let prv_key_data: Decrypted = cache.prv_key_data.decrypt(secret.clone())?; - let prv_key_data = prv_key_data.values().cloned().collect::>(); - let accounts: Vec = (&cache.accounts).try_into()?; - let metadata: Vec = (&cache.metadata).try_into()?; - let transaction_records: Vec = (&cache.transaction_records).try_into()?; - let payload = Payload { prv_key_data, accounts, transaction_records }; - let payload = Decrypted::new(payload).encrypt(secret.clone())?; - - Ok(Wallet { payload, metadata, user_hint: cache.user_hint.clone() }) - } -} +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; pub(crate) struct LocalStoreInner { - pub cache: Mutex, + pub cache: Arc>, pub store: Store, pub modified: AtomicBool, } @@ -80,7 +32,7 @@ impl LocalStoreInner { let secret = ctx.wallet_secret().await.expect("wallet requires an encryption secret"); let payload = Payload::default(); let wallet = Wallet::try_new(secret.clone(), payload)?; - let cache = Mutex::new(Cache::try_from((wallet, &secret))?); + let cache = Arc::new(Mutex::new(Cache::try_from((wallet, &secret))?)); let modified = AtomicBool::new(false); Ok(Self { cache, store, modified }) @@ -92,7 +44,7 @@ impl LocalStoreInner { let secret = ctx.wallet_secret().await.expect("wallet requires an encryption secret"); let wallet = Wallet::try_load(&store).await?; - let cache = Mutex::new(Cache::try_from((wallet, &secret))?); + let cache = Arc::new(Mutex::new(Cache::try_from((wallet, &secret))?)); let modified = AtomicBool::new(false); Ok(Self { cache, store, modified }) @@ -135,8 +87,6 @@ impl Drop for LocalStoreInner { if self.is_modified() { panic!("LocalStoreInner::drop called while modified flag is true"); } - - // if self.cache() } } @@ -252,8 +202,8 @@ impl Interface for LocalStore { #[async_trait] impl PrvKeyDataStore for LocalStoreInner { - async fn iter_with_options(self: Arc, options: IteratorOptions) -> Result>>> { - Ok(Box::new(KeydataIterator::new(self, options))) + async fn iter(&self) -> Result> { + Ok(Box::pin(PrvKeyDataInfoStream::new(self.cache.clone()))) } async fn load_key_info(&self, prv_key_data_id: &PrvKeyDataId) -> Result>> { @@ -288,12 +238,8 @@ impl PrvKeyDataStore for LocalStoreInner { #[async_trait] impl AccountStore for LocalStoreInner { - async fn iter_with_options( - self: Arc, - prv_key_data_id_filter: Option, - options: IteratorOptions, - ) -> Result>>> { - Ok(Box::new(AccountIterator::new(self, prv_key_data_id_filter, options))) + async fn iter(&self, prv_key_data_id_filter: Option) -> Result> { + Ok(Box::pin(AccountStream::new(self.cache.clone(), prv_key_data_id_filter))) } async fn len(self: Arc, prv_key_data_id_filter: Option) -> Result { @@ -342,12 +288,8 @@ impl AccountStore for LocalStoreInner { #[async_trait] impl MetadataStore for LocalStoreInner { - async fn iter_with_options( - self: Arc, - filter: Option, - options: IteratorOptions, - ) -> Result>>> { - Ok(Box::new(MetadataIterator::new(self, filter, options))) + async fn iter(&self, prv_key_data_id_filter: Option) -> Result> { + Ok(Box::pin(MetadataStream::new(self.cache.clone(), prv_key_data_id_filter))) } async fn load(&self, ids: &[AccountId]) -> Result>> { @@ -357,8 +299,8 @@ impl MetadataStore for LocalStoreInner { #[async_trait] impl TransactionRecordStore for LocalStoreInner { - async fn iter(self: Arc, options: IteratorOptions) -> Result>> { - Ok(Box::new(TransactionRecordIterator::new(self, options))) + async fn iter(&self) -> Result> { + Ok(Box::pin(TransactionRecordStream::new(self.cache.clone()))) } async fn load(&self, ids: &[TransactionRecordId]) -> Result>> { diff --git a/wallet/core/src/storage/local/iterators.rs b/wallet/core/src/storage/local/iterators.rs deleted file mode 100644 index 78c17a69a..000000000 --- a/wallet/core/src/storage/local/iterators.rs +++ /dev/null @@ -1,207 +0,0 @@ -use crate::imports::*; -use crate::iterator::*; -// use crate::storage::local::*; -use crate::result::Result; -use crate::storage::local::interface::LocalStoreInner; -use crate::storage::*; -use async_trait::async_trait; - -const DEFAULT_CHUNK_SIZE: usize = 25; - -#[derive(Clone)] -struct StoreIteratorInner { - store: Arc, - cursor: usize, - chunk_size: usize, - // filter : Option< -} - -impl std::fmt::Debug for StoreIteratorInner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("StoreIteratorInner").field("cursor", &self.cursor).finish() - } -} - -pub struct KeydataIterator { - inner: StoreIteratorInner, -} - -impl KeydataIterator { - pub(crate) fn new(store: Arc, iterator_options: IteratorOptions) -> Self { - Self { inner: StoreIteratorInner { store, cursor: 0, chunk_size: iterator_options.chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE) } } - } -} - -#[async_trait] -impl Iterator for KeydataIterator { - type Item = Arc; - - async fn next(&mut self) -> Result>> { - let vec = &self.inner.store.cache().prv_key_data_info.vec; - if self.inner.cursor >= vec.len() { - return Ok(None); - } else { - let slice = &vec[self.inner.cursor..(self.inner.cursor + self.inner.chunk_size)]; - if slice.is_empty() { - Ok(None) - } else { - self.inner.cursor += slice.len(); - Ok(Some(slice.to_vec())) - } - } - } -} - -pub struct AccountIterator { - inner: StoreIteratorInner, - filter: Option, -} - -impl AccountIterator { - pub(crate) fn new(store: Arc, filter: Option, iterator_options: IteratorOptions) -> Self { - Self { - inner: StoreIteratorInner { store, cursor: 0, chunk_size: iterator_options.chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE) }, - filter, - } - } -} - -#[async_trait] -impl Iterator for AccountIterator { - type Item = Arc; - - async fn next(&mut self) -> Result>> { - let vec = &self.inner.store.cache().accounts.vec; - if self.inner.cursor >= vec.len() { - return Ok(None); - } else { - match self.filter { - None => { - let slice = &vec[self.inner.cursor..(self.inner.cursor + self.inner.chunk_size)]; - if slice.is_empty() { - Ok(None) - } else { - self.inner.cursor += slice.len(); - Ok(Some(slice.to_vec())) - } - } - Some(filter) => { - let slice = &vec[self.inner.cursor..]; - - let mut accumulator = Vec::new(); - if slice.is_empty() { - return Ok(None); - } else { - for account in slice { - self.inner.cursor += 1; - if account.prv_key_data_id == filter { - accumulator.push(account.clone()); - if accumulator.len() >= self.inner.chunk_size { - break; - } - } - } - - if accumulator.is_empty() { - Ok(None) - } else { - Ok(Some(accumulator)) - } - } - } - } - } - } -} - -pub struct MetadataIterator { - inner: StoreIteratorInner, - filter: Option, -} - -impl MetadataIterator { - pub(crate) fn new(store: Arc, filter: Option, iterator_options: IteratorOptions) -> Self { - Self { - inner: StoreIteratorInner { store, cursor: 0, chunk_size: iterator_options.chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE) }, - filter, - } - } -} - -#[async_trait] -impl Iterator for MetadataIterator { - type Item = Arc; - - async fn next(&mut self) -> Result>> { - let vec = &self.inner.store.cache().metadata.vec; - if self.inner.cursor >= vec.len() { - return Ok(None); - } else { - match self.filter { - None => { - let slice = &vec[self.inner.cursor..(self.inner.cursor + self.inner.chunk_size)]; - if slice.is_empty() { - Ok(None) - } else { - self.inner.cursor += slice.len(); - Ok(Some(slice.to_vec())) - } - } - Some(filter) => { - let slice = &vec[self.inner.cursor..]; - - let mut accumulator = Vec::new(); - if slice.is_empty() { - return Ok(None); - } else { - for account in slice { - self.inner.cursor += 1; - if account.prv_key_data_id == filter { - accumulator.push(account.clone()); - if accumulator.len() >= self.inner.chunk_size { - break; - } - } - } - - if accumulator.is_empty() { - Ok(None) - } else { - Ok(Some(accumulator)) - } - } - } - } - } - } -} - -#[derive(Clone, Debug)] -pub struct TransactionRecordIterator { - inner: StoreIteratorInner, -} - -impl TransactionRecordIterator { - pub(crate) fn new(store: Arc, iterator_options: IteratorOptions) -> Self { - Self { inner: StoreIteratorInner { store, cursor: 0, chunk_size: iterator_options.chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE) } } - } -} -#[async_trait] -impl Iterator for TransactionRecordIterator { - type Item = TransactionRecordId; - async fn next(&mut self) -> Result>> { - let vec = &self.inner.store.cache().transaction_records.vec; - if self.inner.cursor >= vec.len() { - return Ok(None); - } else { - let slice = &vec[self.inner.cursor..(self.inner.cursor + self.inner.chunk_size).min(vec.len())]; - if slice.is_empty() { - Ok(None) - } else { - self.inner.cursor += slice.len(); - let vec = slice.iter().map(|data| data.id).collect(); - Ok(Some(vec)) - } - } - } -} diff --git a/wallet/core/src/storage/local/mod.rs b/wallet/core/src/storage/local/mod.rs index 5d1e8ca1c..018ac2f3b 100644 --- a/wallet/core/src/storage/local/mod.rs +++ b/wallet/core/src/storage/local/mod.rs @@ -1,7 +1,8 @@ +pub mod cache; pub mod collection; pub mod interface; -pub mod iterators; pub mod store; +pub mod streams; pub mod wallet; pub use collection::Collection; diff --git a/wallet/core/src/storage/local/store.rs b/wallet/core/src/storage/local/store.rs index eb0a762b7..751104997 100644 --- a/wallet/core/src/storage/local/store.rs +++ b/wallet/core/src/storage/local/store.rs @@ -72,7 +72,3 @@ impl Store { Ok(()) } } - -// pub struct Settings; - -// #[derive(Default)] diff --git a/wallet/core/src/storage/local/streams.rs b/wallet/core/src/storage/local/streams.rs new file mode 100644 index 000000000..548fd16e9 --- /dev/null +++ b/wallet/core/src/storage/local/streams.rs @@ -0,0 +1,154 @@ +use crate::imports::*; +use crate::result::Result; +use crate::storage::local::cache::Cache; +use crate::storage::*; + +#[derive(Clone)] +struct StoreStreamInner { + cache: Arc>, + cursor: usize, +} + +impl StoreStreamInner { + fn new(cache: Arc>) -> Self { + Self { cache, cursor: 0 } + } +} + +impl std::fmt::Debug for StoreStreamInner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("StoreIteratorInner").field("cursor", &self.cursor).finish() + } +} + +pub struct PrvKeyDataInfoStream { + inner: StoreStreamInner, +} + +impl PrvKeyDataInfoStream { + pub(crate) fn new(cache: Arc>) -> Self { + Self { inner: StoreStreamInner::new(cache) } + } +} + +impl Stream for PrvKeyDataInfoStream { + type Item = Result>; + + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + let cache = self.inner.cache.clone(); + let cache = cache.lock().unwrap(); + let vec = &cache.prv_key_data_info.vec; + if self.inner.cursor < vec.len() { + let prv_key_data_info = vec[self.inner.cursor].clone(); + self.inner.cursor += 1; + Poll::Ready(Some(Ok(prv_key_data_info))) + } else { + Poll::Ready(None) + } + } +} + +pub struct AccountStream { + inner: StoreStreamInner, + filter: Option, +} + +impl AccountStream { + pub(crate) fn new(cache: Arc>, filter: Option) -> Self { + Self { inner: StoreStreamInner::new(cache), filter } + } +} + +impl Stream for AccountStream { + type Item = Result>; + + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + let cache = self.inner.cache.clone(); + let cache = cache.lock().unwrap(); + let vec = &cache.accounts.vec; + + if let Some(filter) = self.filter { + while self.inner.cursor < vec.len() { + let account = vec[self.inner.cursor].clone(); + self.inner.cursor += 1; + if account.prv_key_data_id == filter { + return Poll::Ready(Some(Ok(account))); + } + } + Poll::Ready(None) + } else if self.inner.cursor < vec.len() { + let account = vec[self.inner.cursor].clone(); + self.inner.cursor += 1; + Poll::Ready(Some(Ok(account))) + } else { + Poll::Ready(None) + } + } +} + +pub struct MetadataStream { + inner: StoreStreamInner, + filter: Option, +} + +impl MetadataStream { + pub(crate) fn new(cache: Arc>, filter: Option) -> Self { + Self { inner: StoreStreamInner::new(cache), filter } + } +} + +impl Stream for MetadataStream { + type Item = Result>; + + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + let cache = self.inner.cache.clone(); + let cache = cache.lock().unwrap(); + let vec = &cache.metadata.vec; + + if let Some(filter) = self.filter { + while self.inner.cursor < vec.len() { + let account = vec[self.inner.cursor].clone(); + self.inner.cursor += 1; + if account.prv_key_data_id == filter { + return Poll::Ready(Some(Ok(account))); + } + } + Poll::Ready(None) + } else if self.inner.cursor < vec.len() { + let account = vec[self.inner.cursor].clone(); + self.inner.cursor += 1; + Poll::Ready(Some(Ok(account))) + } else { + Poll::Ready(None) + } + } +} + +#[derive(Clone, Debug)] +pub struct TransactionRecordStream { + inner: StoreStreamInner, +} + +impl TransactionRecordStream { + pub(crate) fn new(cache: Arc>) -> Self { + Self { inner: StoreStreamInner::new(cache) } + } +} + +impl Stream for TransactionRecordStream { + type Item = Result>; + + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + let cache = self.inner.cache.clone(); + let cache = cache.lock().unwrap(); + let vec = &cache.transaction_records.vec; + + if self.inner.cursor < vec.len() { + let transaction_record = vec[self.inner.cursor].clone(); + self.inner.cursor += 1; + Poll::Ready(Some(Ok(transaction_record))) + } else { + Poll::Ready(None) + } + } +} diff --git a/wallet/core/src/tx/transaction.rs b/wallet/core/src/tx/transaction.rs index b8bb124ba..0f5ca7011 100644 --- a/wallet/core/src/tx/transaction.rs +++ b/wallet/core/src/tx/transaction.rs @@ -188,7 +188,7 @@ impl TryFrom for Transaction { if js_value.is_object() { let object = Object::from(js_value); let version = object.get_u16("version")?; - workflow_log::log_trace!("JsValue->Transaction: version: {version:?}"); + // workflow_log::log_trace!("JsValue->Transaction: version: {version:?}"); let lock_time = object.get_u64("lockTime")?; let gas = object.get_u64("gas")?; let payload = object.get_vec_u8("payload")?; @@ -198,13 +198,13 @@ impl TryFrom for Transaction { } let subnetwork_id: SubnetworkId = subnetwork_id.as_slice().try_into().map_err(|err| Error::Custom(format!("`subnetworkId` property error: `{err}`")))?; - workflow_log::log_trace!("JsValue->Transaction: subnetwork_id: {subnetwork_id:?}"); + // workflow_log::log_trace!("JsValue->Transaction: subnetwork_id: {subnetwork_id:?}"); let inputs = object .get_vec("inputs")? .into_iter() .map(|jsv| jsv.try_into()) .collect::, Error>>()?; - workflow_log::log_trace!("JsValue->Transaction: inputs.len(): {:?}", inputs.len()); + // workflow_log::log_trace!("JsValue->Transaction: inputs.len(): {:?}", inputs.len()); let jsv_outputs = object.get("outputs")?; let outputs: Vec = if !jsv_outputs.is_array() { let outputs: PaymentOutputs = jsv_outputs.try_into()?; @@ -219,7 +219,7 @@ impl TryFrom for Transaction { }) .collect::, Error>>()? }; - workflow_log::log_trace!("JsValue->Transaction: outputs: {outputs:?}"); + // workflow_log::log_trace!("JsValue->Transaction: outputs: {outputs:?}"); Transaction::new(version, inputs, outputs, lock_time, subnetwork_id, gas, payload) } else { Err("Transaction must be an object".into()) diff --git a/wallet/core/src/wasm/account.rs b/wallet/core/src/wasm/account.rs new file mode 100644 index 000000000..41c7247f6 --- /dev/null +++ b/wallet/core/src/wasm/account.rs @@ -0,0 +1,161 @@ +// use std::sync::atomic::AtomicBool; +// use std::sync::atomic::Ordering; + +use crate::imports::*; +use crate::runtime; +use crate::runtime::AccountKind; +#[allow(unused_imports)] +use crate::secret::Secret; +#[allow(unused_imports)] +use crate::storage::PrvKeyDataId; +use js_sys::BigInt; +#[allow(unused_imports)] +use js_sys::Reflect; +#[allow(unused_imports)] +use workflow_core::abortable::Abortable; +// use wasm_bindgen::wasm_bindgen; +// use wasm_bindgen::prelude::*; +use crate::result::Result; +// use crate::iterator::*; + +#[wasm_bindgen] +#[derive(Clone)] +pub struct Account { + inner: Arc, + receive_address_cache: Arc>>, + change_address_cache: Arc>>, + // abortable: Arc, +} + +// #[wasm_bindgen(constructor)] +// pub fn constructor(_js_value: JsValue) -> std::result::Result { +// todo!(); +// // Ok(js_value.try_into()?) +// } + +#[wasm_bindgen] +impl Account { + #[wasm_bindgen(getter)] + pub fn balance(&self) -> JsValue { + match self.inner.balance() { + Some(balance) => BigInt::from(balance).into(), + None => JsValue::UNDEFINED, + } + } + + #[wasm_bindgen(getter)] + pub fn kind(&self) -> AccountKind { + self.inner.account_kind + } + + #[wasm_bindgen(getter)] + pub fn index(&self) -> u64 { + self.inner.account_index + } + + #[wasm_bindgen(getter, js_name = "privateKeyId")] + pub fn private_key_id(&self) -> String { + self.inner.prv_key_data_id.to_hex() + } + + #[wasm_bindgen(getter, js_name = "isECDSA")] + pub fn is_ecdsa(&self) -> bool { + self.inner.ecdsa + } + + #[wasm_bindgen(getter, js_name = "receiveAddress")] + pub fn receive_address(&self) -> Address { + self.receive_address_cache.lock().unwrap().clone().unwrap() + } + + #[wasm_bindgen(getter, js_name = "changeAddress")] + pub fn change_address(&self) -> Address { + self.change_address_cache.lock().unwrap().clone().unwrap() + } + + #[wasm_bindgen(js_name = "getReceiveAddress")] + pub async fn get_receive_address(&self) -> Result
{ + self.inner.derivation.receive_address_manager.current_address().await + } + + #[wasm_bindgen(js_name = "createReceiveAddress")] + pub async fn create_receive_address(&self) -> Result
{ + self.inner.derivation.receive_address_manager.new_address().await + } + + #[wasm_bindgen(js_name = "getChangeAddress")] + pub async fn get_change_address(&self) -> Result
{ + self.inner.derivation.change_address_manager.current_address().await + } + + #[wasm_bindgen(js_name = "createChangeAddress")] + pub async fn create_change_address(&self) -> Result
{ + self.inner.derivation.change_address_manager.new_address().await + } + + pub async fn scan(&self) -> Result<()> { + self.inner.scan_utxos(None, None).await + } + + pub async fn send( + &self, + // address: &Address, + // amount_sompi: u64, + // priority_fee_sompi: u64, + // keydata: PrvKeyData, + // payment_secret: Option, + // abortable: &Abortable, + // ) -> Result> { + js_value: JsValue, + ) -> Result { + let _args = SendArgs::try_from(js_value)?; + + todo!() + } +} + +impl Account { + pub async fn update_addresses(&self) -> Result<()> { + let receive_address = self.inner.derivation.receive_address_manager.current_address().await?; + let change_address = self.inner.derivation.receive_address_manager.current_address().await?; + self.receive_address_cache.lock().unwrap().replace(receive_address); + self.change_address_cache.lock().unwrap().replace(change_address); + Ok(()) + } +} + +struct SendArgs { + // outputs : Vec<(Address, u64)>, + // priority_fee_sompi: u64, + // wallet_secret: Option, + // payment_secret: Option, + // abortable: Abortable, +} + +impl TryFrom for SendArgs { + type Error = Error; + fn try_from(js_value: JsValue) -> std::result::Result { + if js_value.is_object() { + let _object = Object::from(js_value); + + // let outputs = object.get_vec("outputs")?; + + // let outputs = { + // let outputs = Reflect::get(&object, &JsValue::from("outputs"))?; + // if outputs != JsValue::UNDEFINED { + // let array = outputs.dyn_into::().map_err(|err| Error::Custom(format!("`outputs` property must be an Array")))?; + // let vec = array.to_vec(); + + // // return Err(Error::MissingProperty(prop.to_string())); + // } else { + // let to = Reflect::get(&object, &JsValue::from("to"))?; + + // } + // }; + + todo!() + } else { + Err("Argument to Account::send() must be an object".into()) + } + } +} diff --git a/wallet/core/src/wasm/mod.rs b/wallet/core/src/wasm/mod.rs new file mode 100644 index 000000000..5cbbdcc0d --- /dev/null +++ b/wallet/core/src/wasm/mod.rs @@ -0,0 +1,3 @@ +pub mod account; +pub mod tests; +pub mod wallet; diff --git a/wallet/core/src/wasm/tests.rs b/wallet/core/src/wasm/tests.rs new file mode 100644 index 000000000..9e1f773d9 --- /dev/null +++ b/wallet/core/src/wasm/tests.rs @@ -0,0 +1,57 @@ +use crate::imports::*; +// use wasm_bindgen::prelude::*; +// use wasm_bindgen::{prelude::*, JsCast, JsValue}; +// use workflow_wasm::object::*; + +// struct Iterable { +// value: js_sys::IntoIter, +// phantom: PhantomData, +// } + +// impl Iterable { +// #[allow(dead_code)] +// fn unchecked_new(value: &JsValue) -> Self { +// Self { value: js_sys::try_iter(value).unwrap().unwrap(), phantom: PhantomData } +// } +// } + +// impl Iterator for Iterable +// where +// A: JsCast, +// { +// type Item = A; + +// #[inline] +// fn next(&mut self) -> Option { +// self.value.next().map(|x| JsCast::unchecked_from_js(x.unwrap())) +// } +// } + +// #[wasm_bindgen(inline_js = " +// export function foo(obj) { +// obj[Symbol.iterator] = function () { +// return this; +// }; +// } +// ")] +// extern "C" { +// fn foo(obj: &Object); +// } + +// // #[wasm_bindgen(start)] +// pub fn test_iter(object: &Object) -> Result<(), JsValue> { +// // ... +// // This works, but I couldn't figure out how to get the prototype of an object without instantiating a copy first +// foo(&Object::get_prototype_of(&object.into())); +// Ok(()) +// } + +// // obj[Symbol.iterator] = function () { +// // return this; +// // }; + +#[wasm_bindgen] +pub fn get_async_iter() -> JsValue { + let iter = stream::iter(0..30); + AsyncStream::new(iter).into() +} diff --git a/wallet/core/src/wasm/wallet.rs b/wallet/core/src/wasm/wallet.rs new file mode 100644 index 000000000..b47c7d964 --- /dev/null +++ b/wallet/core/src/wasm/wallet.rs @@ -0,0 +1,15 @@ +use crate::imports::*; +use crate::runtime; +// use crate::iterator::*; + +#[derive(Clone)] +#[wasm_bindgen] +pub struct Wallet { + _inner: Arc, +} + +// #[wasm_bindgen(constructor)] +// pub fn constructor(_js_value: JsValue) -> std::result::Result { +// todo!(); +// // Ok(js_value.try_into()?) +// } diff --git a/wasm/nodejs/test.js b/wasm/nodejs/test.js new file mode 100644 index 000000000..e83c0babc --- /dev/null +++ b/wasm/nodejs/test.js @@ -0,0 +1,29 @@ +globalThis.WebSocket = require('websocket').w3cwebsocket; // W3C WebSocket module shim + +let kaspa = require('./kaspa/kaspa_wasm'); +let { + // RpcClient, + // Encoding, +} = kaspa; +kaspa.init_console_panic_hook(); + +(async ()=>{ + + let iter = kaspa.get_async_iter(); + console.log("iter ->",iter); + for await (let item of iter) { + console.log("item ->",item); + } + + // let URL = "ws://127.0.0.1:17110"; + // let rpc = new RpcClient(Encoding.Borsh,URL); + + // console.log(`# connecting to ${URL}`) + // await rpc.connect(); + + // let info = await rpc.getBlockDagInfo(); + // console.log("info:", info); + + // await rpc.disconnect(); + +})(); \ No newline at end of file