Skip to content

Commit

Permalink
Modify wallet migration (#1006)
Browse files Browse the repository at this point in the history
* Do not delete target wallet, do not fail migration on item-error, tweak logs

Signed-off-by: Patrik Stas <patrik.stas@absa.africa>
  • Loading branch information
Patrik-Stas authored Oct 3, 2023
1 parent 725797b commit 34c35de
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 81 deletions.
2 changes: 1 addition & 1 deletion aries_vcx/tests/utils/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ where
let old_wh = self.profile.wallet_handle().unwrap();
let new_wh = migrate_to_new_wallet(old_wh).await;
let wallet = Arc::new(IndySdkWallet::new(new_wh));
let profile = dev_build_profile_modular(self.genesis_file_path.clone(), wallet.clone());
let profile = dev_build_profile_modular(self.genesis_file_path.clone(), wallet);

TestAgent {
profile,
Expand Down
25 changes: 11 additions & 14 deletions libvcx_core/src/api_vcx/api_global/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use aries_vcx::{
base_wallet::BaseWallet,
indy::{
internal::{close_search_wallet, fetch_next_records_wallet, open_search_wallet},
wallet::{close_wallet, create_and_open_wallet, delete_wallet, import},
wallet::{close_wallet, create_indy_wallet, import, open_wallet},
IndySdkWallet, IssuerConfig, RestoreWalletConfigs, WalletConfig,
},
structs_io::UnpackMessageOutput,
Expand Down Expand Up @@ -282,7 +282,11 @@ pub async fn wallet_import(config: &RestoreWalletConfigs) -> LibvcxResult<()> {

pub async fn wallet_migrate(wallet_config: &WalletConfig) -> LibvcxResult<()> {
let src_wallet_handle = get_main_wallet_handle()?;
let dest_wallet_handle = create_and_open_wallet(wallet_config).await?;
info!("Assuring target wallet exists.");
create_indy_wallet(wallet_config).await?;
info!("Opening target wallet.");
let dest_wallet_handle = open_wallet(wallet_config).await?;
info!("Target wallet is ready.");

let migration_res = wallet_migrator::migrate_wallet(
src_wallet_handle,
Expand All @@ -291,18 +295,11 @@ pub async fn wallet_migrate(wallet_config: &WalletConfig) -> LibvcxResult<()> {
)
.await;

if let Err(e) = migration_res {
close_wallet(dest_wallet_handle).await.ok();
delete_wallet(wallet_config).await.ok();
Err(LibvcxError::from_msg(
LibvcxErrorKind::WalletMigrationFailed,
e,
))
} else {
setup_wallet(dest_wallet_handle)?;
close_wallet(src_wallet_handle).await?;
Ok(())
}
info!("Closing source and target wallets");
close_wallet(src_wallet_handle).await.ok();
close_wallet(dest_wallet_handle).await.ok();

migration_res.map_err(|e| LibvcxError::from_msg(LibvcxErrorKind::WalletMigrationFailed, e))
}

#[allow(clippy::unwrap_used)]
Expand Down
16 changes: 14 additions & 2 deletions libvdrtools/indy-api-types/src/domain/wallet/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::{collections::HashMap, fmt};

use serde_json::value::Value;

Expand Down Expand Up @@ -75,7 +75,7 @@ pub struct KeyConfig {
pub seed: Option<String>,
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Serialize, Deserialize)]
pub struct Record {
// Wallet record type
#[serde(rename = "type")]
Expand All @@ -88,6 +88,18 @@ pub struct Record {
pub tags: HashMap<String, String>,
}

impl fmt::Debug for Record {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Record")
.field("type_", &self.type_)
.field("id", &self.id)
// Censor the value
.field("value", &"******".to_string())
.field("tags", &self.tags)
.finish()
}
}

pub type Tags = HashMap<String, String>;

impl Validatable for Config {
Expand Down
2 changes: 1 addition & 1 deletion libvdrtools/indy-wallet/src/export_import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ where
)?;

wallet
.add(&record.type_, &record.id, &record.value, &record.tags)
.add(&record.type_, &record.id, &record.value, &record.tags, true)
.await?;
}

Expand Down
165 changes: 121 additions & 44 deletions libvdrtools/indy-wallet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use std::{
collections::{HashMap, HashSet},
fs,
fmt, fs,
io::BufReader,
path::PathBuf,
sync::{Arc, Mutex},
Expand All @@ -18,7 +18,7 @@ use indy_utils::{
crypto::chacha20poly1305_ietf::{self, Key as MasterKey},
secret,
};
use log::{info, trace};
use log::{error, info, trace, warn};
use serde::{Deserialize, Serialize};
use serde_json::Value as SValue;

Expand All @@ -44,6 +44,14 @@ mod cache;
mod export_import;
mod wallet;

#[derive(Debug)]
pub struct MigrationResult {
migrated: u32,
skipped: u32,
duplicated: u32,
failed: u32,
}

pub struct WalletService {
storage_types: Mutex<HashMap<String, Arc<dyn WalletStorageType>>>,
wallets: Mutex<HashMap<WalletHandle, Arc<Wallet>>>,
Expand Down Expand Up @@ -354,7 +362,7 @@ impl WalletService {
) -> IndyResult<()> {
let wallet = self.get_wallet(wallet_handle).await?;
wallet
.add(type_, name, value, tags)
.add(type_, name, value, tags, true)
.await
.map_err(|err| WalletService::_map_wallet_storage_error(err, type_, name))
}
Expand Down Expand Up @@ -706,7 +714,7 @@ impl WalletService {
old_wh: WalletHandle,
new_wh: WalletHandle,
mut migrate_fn: impl FnMut(Record) -> Result<Option<Record>, E>,
) -> IndyResult<()>
) -> IndyResult<MigrationResult>
where
E: std::fmt::Display,
{
Expand All @@ -716,53 +724,111 @@ impl WalletService {
let mut records = old_wallet.get_all().await?;
let total = records.get_total_count()?;
info!("Migrating {total:?} records");
let mut num_records = 0;
let mut num_record = 0;
let mut migration_result = MigrationResult {
migrated: 0,
skipped: 0,
duplicated: 0,
failed: 0,
};

while let Some(WalletRecord {
type_,
id,
value,
tags,
}) = records.next().await?
{
num_records += 1;
if num_records % 1000 == 1 {
info!("Migrating wallet record number {num_records} / {total:?}");
while let Some(source_record) = records.next().await? {
num_record += 1;
if num_record % 1000 == 1 {
warn!(
"Migrating wallet record number {num_record} / {total:?}, intermediary \
migration result: ${migration_result:?}"
);
}
trace!("Migrating record: {:?}", source_record);
let unwrapped_type_ = match &source_record.type_ {
None => {
warn!(
"Skipping item missing 'type' field, record ({num_record}): \
{source_record:?}"
);
migration_result.skipped += 1;
continue;
}
Some(type_) => type_.clone(),
};
let unwrapped_value = match &source_record.value {
None => {
warn!(
"Skipping item missing 'value' field, record ({num_record}): \
{source_record:?}"
);
migration_result.skipped += 1;
continue;
}
Some(value) => value.clone(),
};
let unwrapped_tags = match &source_record.tags {
None => HashMap::new(),
Some(tags) => tags.clone(),
};

let record = Record {
type_: type_.ok_or_else(|| {
err_msg(
IndyErrorKind::InvalidState,
"No type fetched for exported record",
)
})?,
id,
value: value.ok_or_else(|| {
err_msg(
IndyErrorKind::InvalidState,
"No value fetched for exported record",
)
})?,
tags: tags.ok_or_else(|| {
err_msg(
IndyErrorKind::InvalidState,
"No tags fetched for exported record",
)
})?,
type_: unwrapped_type_,
id: source_record.id.clone(),
value: unwrapped_value,
tags: unwrapped_tags,
};

if let Some(record) = migrate_fn(record)
.map_err(|e| IndyError::from_msg(IndyErrorKind::InvalidStructure, e.to_string()))?
let migrated_record = match migrate_fn(record) {
Ok(record) => match record {
None => {
warn!("Skipping non-migratable record ({num_record}): {source_record:?}");
migration_result.skipped += 1;
continue;
}
Some(record) => record,
},
Err(err) => {
warn!(
"Skipping item due failed item migration, record ({num_record}): \
{source_record:?}, err: {err}"
);
migration_result.failed += 1;
continue;
}
};

match new_wallet
.add(
&migrated_record.type_,
&migrated_record.id,
&migrated_record.value,
&migrated_record.tags,
false,
)
.await
{
new_wallet
.add(&record.type_, &record.id, &record.value, &record.tags)
.await?;
Err(err) => match err.kind() {
IndyErrorKind::WalletItemAlreadyExists => {
trace!(
"Record type: {migrated_record:?} already exists in destination \
wallet, skipping"
);
migration_result.duplicated += 1;
continue;
}
_ => {
error!(
"Error adding record {migrated_record:?} to destination wallet: \
{err:?}"
);
migration_result.failed += 1;
return Err(err);
}
},
Ok(()) => {
migration_result.migrated += 1;
}
}
}

info!("{num_records} / {total:?} records have been migrated!");

Ok(())
warn!("Migration of total {total:?} records completed, result: ${migration_result:?}");
Ok(migration_result)
}

pub async fn export_wallet(
Expand Down Expand Up @@ -1073,7 +1139,7 @@ pub struct MetadataRaw {
pub keys: Vec<u8>,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct WalletRecord {
#[serde(rename = "type")]
type_: Option<String>,
Expand All @@ -1082,6 +1148,17 @@ pub struct WalletRecord {
tags: Option<Tags>,
}

impl fmt::Debug for WalletRecord {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("WalletRecord")
.field("type_", &self.type_)
.field("id", &self.id)
.field("value", &self.value.as_ref().map(|_| "******"))
.field("tags", &self.tags)
.finish()
}
}

impl Ord for WalletRecord {
fn cmp(&self, other: &Self) -> ::std::cmp::Ordering {
(&self.type_, &self.id).cmp(&(&other.type_, &other.id))
Expand Down
5 changes: 4 additions & 1 deletion libvdrtools/indy-wallet/src/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ impl Wallet {
name: &str,
value: &str,
tags: &HashMap<String, String>,
cache_record: bool,
) -> IndyResult<()> {
let etype = encrypt_as_searchable(
type_.as_bytes(),
Expand All @@ -188,7 +189,9 @@ impl Wallet {
);

self.storage.add(&etype, &ename, &evalue, &etags).await?;
self.cache.add(type_, &etype, &ename, &evalue, &etags);
if cache_record {
self.cache.add(type_, &etype, &ename, &evalue, &etags);
}

Ok(())
}
Expand Down
Loading

0 comments on commit 34c35de

Please sign in to comment.