Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not delete target wallet, do not fail migration on item-error #1006

Merged
merged 4 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aries_vcx/tests/utils/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ where
let old_wh = self.profile.wallet_handle().unwrap();
let new_wh = migrate_to_new_wallet(old_wh).await;
let wallet = Arc::new(IndySdkWallet::new(new_wh));
let profile = dev_build_profile_modular(self.genesis_file_path.clone(), wallet.clone());
let profile = dev_build_profile_modular(self.genesis_file_path.clone(), wallet);

TestAgent {
profile,
Expand Down
25 changes: 11 additions & 14 deletions libvcx_core/src/api_vcx/api_global/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use aries_vcx::{
base_wallet::BaseWallet,
indy::{
internal::{close_search_wallet, fetch_next_records_wallet, open_search_wallet},
wallet::{close_wallet, create_and_open_wallet, delete_wallet, import},
wallet::{close_wallet, create_indy_wallet, import, open_wallet},
IndySdkWallet, IssuerConfig, RestoreWalletConfigs, WalletConfig,
},
structs_io::UnpackMessageOutput,
Expand Down Expand Up @@ -282,7 +282,11 @@ pub async fn wallet_import(config: &RestoreWalletConfigs) -> LibvcxResult<()> {

pub async fn wallet_migrate(wallet_config: &WalletConfig) -> LibvcxResult<()> {
let src_wallet_handle = get_main_wallet_handle()?;
let dest_wallet_handle = create_and_open_wallet(wallet_config).await?;
info!("Assuring target wallet exists.");
create_indy_wallet(wallet_config).await?;
info!("Opening target wallet.");
let dest_wallet_handle = open_wallet(wallet_config).await?;
info!("Target wallet is ready.");

let migration_res = wallet_migrator::migrate_wallet(
src_wallet_handle,
Expand All @@ -291,18 +295,11 @@ pub async fn wallet_migrate(wallet_config: &WalletConfig) -> LibvcxResult<()> {
)
.await;

if let Err(e) = migration_res {
close_wallet(dest_wallet_handle).await.ok();
delete_wallet(wallet_config).await.ok();
Err(LibvcxError::from_msg(
LibvcxErrorKind::WalletMigrationFailed,
e,
))
} else {
setup_wallet(dest_wallet_handle)?;
close_wallet(src_wallet_handle).await?;
Ok(())
}
Comment on lines -294 to -305
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not delete the destination wallet on failure?

Copy link
Contributor Author

@Patrik-Stas Patrik-Stas Oct 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should be able to rerun migration against the same target repeatedly, with predictable result. If I get IO error midway, I want to be able to retry.

info!("Closing source and target wallets");
close_wallet(src_wallet_handle).await.ok();
close_wallet(dest_wallet_handle).await.ok();

migration_res.map_err(|e| LibvcxError::from_msg(LibvcxErrorKind::WalletMigrationFailed, e))
}

#[allow(clippy::unwrap_used)]
Expand Down
16 changes: 14 additions & 2 deletions libvdrtools/indy-api-types/src/domain/wallet/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::{collections::HashMap, fmt};

use serde_json::value::Value;

Expand Down Expand Up @@ -75,7 +75,7 @@ pub struct KeyConfig {
pub seed: Option<String>,
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Serialize, Deserialize)]
pub struct Record {
// Wallet record type
#[serde(rename = "type")]
Expand All @@ -88,6 +88,18 @@ pub struct Record {
pub tags: HashMap<String, String>,
}

impl fmt::Debug for Record {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Record")
.field("type_", &self.type_)
.field("id", &self.id)
// Censor the value
.field("value", &"******".to_string())
.field("tags", &self.tags)
.finish()
}
}

pub type Tags = HashMap<String, String>;

impl Validatable for Config {
Expand Down
2 changes: 1 addition & 1 deletion libvdrtools/indy-wallet/src/export_import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ where
)?;

wallet
.add(&record.type_, &record.id, &record.value, &record.tags)
.add(&record.type_, &record.id, &record.value, &record.tags, true)
.await?;
}

Expand Down
165 changes: 121 additions & 44 deletions libvdrtools/indy-wallet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use std::{
collections::{HashMap, HashSet},
fs,
fmt, fs,
io::BufReader,
path::PathBuf,
sync::{Arc, Mutex},
Expand All @@ -18,7 +18,7 @@ use indy_utils::{
crypto::chacha20poly1305_ietf::{self, Key as MasterKey},
secret,
};
use log::{info, trace};
use log::{error, info, trace, warn};
use serde::{Deserialize, Serialize};
use serde_json::Value as SValue;

Expand All @@ -44,6 +44,14 @@ mod cache;
mod export_import;
mod wallet;

#[derive(Debug)]
pub struct MigrationResult {
migrated: u32,
skipped: u32,
duplicated: u32,
failed: u32,
}

pub struct WalletService {
storage_types: Mutex<HashMap<String, Arc<dyn WalletStorageType>>>,
wallets: Mutex<HashMap<WalletHandle, Arc<Wallet>>>,
Expand Down Expand Up @@ -354,7 +362,7 @@ impl WalletService {
) -> IndyResult<()> {
let wallet = self.get_wallet(wallet_handle).await?;
wallet
.add(type_, name, value, tags)
.add(type_, name, value, tags, true)
.await
.map_err(|err| WalletService::_map_wallet_storage_error(err, type_, name))
}
Expand Down Expand Up @@ -706,7 +714,7 @@ impl WalletService {
old_wh: WalletHandle,
new_wh: WalletHandle,
mut migrate_fn: impl FnMut(Record) -> Result<Option<Record>, E>,
) -> IndyResult<()>
) -> IndyResult<MigrationResult>
where
E: std::fmt::Display,
{
Expand All @@ -716,53 +724,111 @@ impl WalletService {
let mut records = old_wallet.get_all().await?;
let total = records.get_total_count()?;
info!("Migrating {total:?} records");
let mut num_records = 0;
let mut num_record = 0;
let mut migration_result = MigrationResult {
migrated: 0,
skipped: 0,
duplicated: 0,
failed: 0,
};

while let Some(WalletRecord {
type_,
id,
value,
tags,
}) = records.next().await?
{
num_records += 1;
if num_records % 1000 == 1 {
info!("Migrating wallet record number {num_records} / {total:?}");
while let Some(source_record) = records.next().await? {
num_record += 1;
if num_record % 1000 == 1 {
warn!(
"Migrating wallet record number {num_record} / {total:?}, intermediary \
migration result: ${migration_result:?}"
);
}
trace!("Migrating record: {:?}", source_record);
let unwrapped_type_ = match &source_record.type_ {
None => {
warn!(
"Skipping item missing 'type' field, record ({num_record}): \
{source_record:?}"
);
migration_result.skipped += 1;
continue;
}
Some(type_) => type_.clone(),
};
let unwrapped_value = match &source_record.value {
None => {
warn!(
"Skipping item missing 'value' field, record ({num_record}): \
{source_record:?}"
);
migration_result.skipped += 1;
continue;
}
Some(value) => value.clone(),
};
let unwrapped_tags = match &source_record.tags {
None => HashMap::new(),
Some(tags) => tags.clone(),
};

let record = Record {
type_: type_.ok_or_else(|| {
err_msg(
IndyErrorKind::InvalidState,
"No type fetched for exported record",
)
})?,
id,
value: value.ok_or_else(|| {
err_msg(
IndyErrorKind::InvalidState,
"No value fetched for exported record",
)
})?,
tags: tags.ok_or_else(|| {
err_msg(
IndyErrorKind::InvalidState,
"No tags fetched for exported record",
)
})?,
type_: unwrapped_type_,
id: source_record.id.clone(),
value: unwrapped_value,
tags: unwrapped_tags,
};

if let Some(record) = migrate_fn(record)
.map_err(|e| IndyError::from_msg(IndyErrorKind::InvalidStructure, e.to_string()))?
let migrated_record = match migrate_fn(record) {
Ok(record) => match record {
None => {
warn!("Skipping non-migratable record ({num_record}): {source_record:?}");
migration_result.skipped += 1;
continue;
}
Some(record) => record,
},
Err(err) => {
warn!(
"Skipping item due failed item migration, record ({num_record}): \
{source_record:?}, err: {err}"
);
migration_result.failed += 1;
continue;
}
};

match new_wallet
.add(
&migrated_record.type_,
&migrated_record.id,
&migrated_record.value,
&migrated_record.tags,
false,
)
.await
{
new_wallet
.add(&record.type_, &record.id, &record.value, &record.tags)
.await?;
Err(err) => match err.kind() {
IndyErrorKind::WalletItemAlreadyExists => {
trace!(
"Record type: {migrated_record:?} already exists in destination \
wallet, skipping"
);
migration_result.duplicated += 1;
continue;
}
_ => {
error!(
"Error adding record {migrated_record:?} to destination wallet: \
{err:?}"
);
migration_result.failed += 1;
return Err(err);
}
},
Ok(()) => {
migration_result.migrated += 1;
}
}
}

info!("{num_records} / {total:?} records have been migrated!");

Ok(())
warn!("Migration of total {total:?} records completed, result: ${migration_result:?}");
Ok(migration_result)
}

pub async fn export_wallet(
Expand Down Expand Up @@ -1073,7 +1139,7 @@ pub struct MetadataRaw {
pub keys: Vec<u8>,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct WalletRecord {
#[serde(rename = "type")]
type_: Option<String>,
Expand All @@ -1082,6 +1148,17 @@ pub struct WalletRecord {
tags: Option<Tags>,
}

impl fmt::Debug for WalletRecord {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("WalletRecord")
.field("type_", &self.type_)
.field("id", &self.id)
.field("value", &self.value.as_ref().map(|_| "******"))
.field("tags", &self.tags)
.finish()
}
}

impl Ord for WalletRecord {
fn cmp(&self, other: &Self) -> ::std::cmp::Ordering {
(&self.type_, &self.id).cmp(&(&other.type_, &other.id))
Expand Down
5 changes: 4 additions & 1 deletion libvdrtools/indy-wallet/src/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ impl Wallet {
name: &str,
value: &str,
tags: &HashMap<String, String>,
cache_record: bool,
) -> IndyResult<()> {
let etype = encrypt_as_searchable(
type_.as_bytes(),
Expand All @@ -188,7 +189,9 @@ impl Wallet {
);

self.storage.add(&etype, &ename, &evalue, &etags).await?;
self.cache.add(type_, &etype, &ename, &evalue, &etags);
if cache_record {
self.cache.add(type_, &etype, &ename, &evalue, &etags);
}
Comment on lines +192 to +194
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this change supposed to do? I see the flag is set to true in both instances where this is used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In migration, the flag is set to false.


Ok(())
}
Expand Down
Loading
Loading