Skip to content

Commit

Permalink
use guard in NftStorageBuilder to lock it
Browse files Browse the repository at this point in the history
  • Loading branch information
laruh committed Jul 13, 2023
1 parent e7c5c55 commit eaed80e
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 14 deletions.
19 changes: 12 additions & 7 deletions mm2src/coins/nft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub type WithdrawNftResult = Result<TransactionNftDetails, MmError<WithdrawError

/// `get_nft_list` function returns list of NFTs on requested chains owned by user.
pub async fn get_nft_list(ctx: MmArc, req: NftListReq) -> MmResult<NftList, GetNftInfoError> {
let storage = NftStorageBuilder::new(&ctx).build()?;
let storage = NftStorageBuilder::new(&ctx)?.build()?;
for chain in req.chains.iter() {
if !NftListStorageOps::is_initialized(&storage, chain).await? {
NftListStorageOps::init(&storage, chain).await?;
Expand All @@ -62,7 +62,7 @@ pub async fn get_nft_list(ctx: MmArc, req: NftListReq) -> MmResult<NftList, GetN

/// `get_nft_metadata` function returns info of one specific NFT.
pub async fn get_nft_metadata(ctx: MmArc, req: NftMetadataReq) -> MmResult<Nft, GetNftInfoError> {
let storage = NftStorageBuilder::new(&ctx).build()?;
let storage = NftStorageBuilder::new(&ctx)?.build()?;
if !NftListStorageOps::is_initialized(&storage, &req.chain).await? {
NftListStorageOps::init(&storage, &req.chain).await?;
}
Expand All @@ -82,7 +82,7 @@ pub async fn get_nft_metadata(ctx: MmArc, req: NftMetadataReq) -> MmResult<Nft,

/// `get_nft_transfers` function returns a transfer history of NFTs on requested chains owned by user.
pub async fn get_nft_transfers(ctx: MmArc, req: NftTransfersReq) -> MmResult<NftsTransferHistoryList, GetNftInfoError> {
let storage = NftStorageBuilder::new(&ctx).build()?;
let storage = NftStorageBuilder::new(&ctx)?.build()?;
for chain in req.chains.iter() {
if !NftTxHistoryStorageOps::is_initialized(&storage, chain).await? {
NftTxHistoryStorageOps::init(&storage, chain).await?;
Expand All @@ -102,7 +102,10 @@ pub async fn get_nft_transfers(ctx: MmArc, req: NftTransfersReq) -> MmResult<Nft

/// `update_nft` function updates cache of nft transfer history and nft list.
pub async fn update_nft(ctx: MmArc, req: UpdateNftReq) -> MmResult<(), UpdateNftError> {
let storage = NftStorageBuilder::new(&ctx).build()?;
let mut storage_builder = NftStorageBuilder::new(&ctx)?;
let storage = storage_builder.build()?;
let _lock = storage_builder.lock().await;

for chain in req.chains.iter() {
let tx_history_initialized = NftTxHistoryStorageOps::is_initialized(&storage, chain).await?;

Expand Down Expand Up @@ -162,7 +165,9 @@ pub async fn update_nft(ctx: MmArc, req: UpdateNftReq) -> MmResult<(), UpdateNft
}

pub async fn refresh_nft_metadata(ctx: MmArc, req: RefreshMetadataReq) -> MmResult<(), UpdateNftError> {
let storage = NftStorageBuilder::new(&ctx).build()?;
let mut storage_builder = NftStorageBuilder::new(&ctx)?;
let storage = storage_builder.build()?;
let _lock = storage_builder.lock();
let moralis_meta = get_moralis_metadata(
format!("{:#02x}", req.token_address),
req.token_id.clone(),
Expand All @@ -176,7 +181,7 @@ pub async fn refresh_nft_metadata(ctx: MmArc, req: RefreshMetadataReq) -> MmResu
chain: req.chain,
protect_from_spam: false,
};
let mut nft_db = get_nft_metadata(ctx, req).await?;
let mut nft_db = get_nft_metadata(ctx.clone(), req).await?;
let token_uri = check_moralis_ipfs_bafy(moralis_meta.common.token_uri.as_deref());
let uri_meta = get_uri_meta(token_uri.as_deref(), moralis_meta.common.metadata.as_deref()).await;
nft_db.common.collection_name = moralis_meta.common.collection_name;
Expand Down Expand Up @@ -717,7 +722,7 @@ pub(crate) async fn find_wallet_nft_amount(
token_address: String,
token_id: BigDecimal,
) -> MmResult<BigDecimal, GetNftInfoError> {
let storage = NftStorageBuilder::new(ctx).build()?;
let storage = NftStorageBuilder::new(ctx)?.build()?;
if !NftListStorageOps::is_initialized(&storage, chain).await? {
NftListStorageOps::init(&storage, chain).await?;
}
Expand Down
17 changes: 17 additions & 0 deletions mm2src/coins/nft/nft_structs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ use crate::nft::eth_add_to_hex;
use crate::{TransactionType, TxFeeDetails, WithdrawFee};
use common::ten;
use ethereum_types::Address;
use futures::lock::Mutex as AsyncMutex;
use mm2_core::mm_ctx::{from_ctx, MmArc};
use mm2_number::BigDecimal;
use rpc::v1::types::Bytes as BytesJson;
use serde::Deserialize;
use serde_json::Value as Json;
use std::fmt;
use std::num::NonZeroUsize;
use std::str::FromStr;
use std::sync::Arc;
use url::Url;

#[derive(Debug, Deserialize)]
Expand Down Expand Up @@ -456,3 +459,17 @@ impl From<Nft> for TxMeta {
}
}
}

pub(crate) struct NftCtx {
pub(crate) guard: Arc<AsyncMutex<()>>,
}

impl NftCtx {
pub(crate) fn from_ctx(ctx: &MmArc) -> Result<Arc<NftCtx>, String> {
Ok(try_s!(from_ctx(&ctx.nft_ctx, move || {
Ok(NftCtx {
guard: Arc::new(AsyncMutex::new(())),
})
})))
}
}
4 changes: 2 additions & 2 deletions mm2src/coins/nft/storage/db_test_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ fn nft_tx_history() -> Vec<NftTransferHistory> {

async fn init_nft_list_storage(chain: &Chain) -> impl NftListStorageOps + NftTxHistoryStorageOps {
let ctx = mm_ctx_with_custom_db();
let storage = NftStorageBuilder::new(&ctx).build().unwrap();
let storage = NftStorageBuilder::new(&ctx).unwrap().build().unwrap();
NftListStorageOps::init(&storage, chain).await.unwrap();
let is_initialized = NftListStorageOps::is_initialized(&storage, chain).await.unwrap();
assert!(is_initialized);
Expand All @@ -300,7 +300,7 @@ async fn init_nft_list_storage(chain: &Chain) -> impl NftListStorageOps + NftTxH

async fn init_nft_history_storage(chain: &Chain) -> impl NftListStorageOps + NftTxHistoryStorageOps {
let ctx = mm_ctx_with_custom_db();
let storage = NftStorageBuilder::new(&ctx).build().unwrap();
let storage = NftStorageBuilder::new(&ctx).unwrap().build().unwrap();
NftTxHistoryStorageOps::init(&storage, chain).await.unwrap();
let is_initialized = NftTxHistoryStorageOps::is_initialized(&storage, chain).await.unwrap();
assert!(is_initialized);
Expand Down
28 changes: 23 additions & 5 deletions mm2src/coins/nft/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
use crate::nft::nft_structs::{Chain, Nft, NftList, NftTokenAddrId, NftTransferHistory, NftTxHistoryFilters,
use crate::nft::nft_structs::{Chain, Nft, NftCtx, NftList, NftTokenAddrId, NftTransferHistory, NftTxHistoryFilters,
NftsTransferHistoryList, TxMeta};
use crate::WithdrawError;
use async_trait::async_trait;
use derive_more::Display;
use futures::lock::Mutex as AsyncMutex;
use futures::lock::MutexLockFuture;
use mm2_core::mm_ctx::MmArc;
use mm2_err_handle::mm_error::MmResult;
use mm2_err_handle::mm_error::{NotEqual, NotMmError};
use mm2_number::BigDecimal;
use serde::{Deserialize, Serialize};
use std::num::NonZeroUsize;
use std::sync::Arc;

#[cfg(any(test, target_arch = "wasm32"))]
pub(crate) mod db_test_helpers;
Expand Down Expand Up @@ -154,23 +157,38 @@ impl From<CreateNftStorageError> for WithdrawError {
}
}

/// `NftStorageBuilder` is used to create an instance that implements the `NftListStorageOps`
/// and `NftTxHistoryStorageOps` traits.
/// `NftStorageBuilder` is used to create an instance that implements the [`NftListStorageOps`]
/// and [`NftTxHistoryStorageOps`] traits.Also has guard to lock write operations.
pub struct NftStorageBuilder<'a> {
ctx: &'a MmArc,
guard: Arc<AsyncMutex<()>>,
}

impl<'a> NftStorageBuilder<'a> {
#[inline]
pub fn new(ctx: &MmArc) -> NftStorageBuilder<'_> { NftStorageBuilder { ctx } }
pub fn new(ctx: &MmArc) -> MmResult<NftStorageBuilder<'_>, CreateNftStorageError> {
let nft_ctx = NftCtx::from_ctx(ctx).map_err(CreateNftStorageError::Internal)?;
let builder = NftStorageBuilder {
ctx,
guard: nft_ctx.guard.clone(),
};
Ok(builder)
}

/// `build` function is used to build nft storage which implements [`NftListStorageOps`] and [`NftTxHistoryStorageOps`] traits.
#[inline]
pub fn build(self) -> MmResult<impl NftListStorageOps + NftTxHistoryStorageOps, CreateNftStorageError> {
pub fn build(&self) -> MmResult<impl NftListStorageOps + NftTxHistoryStorageOps, CreateNftStorageError> {
#[cfg(target_arch = "wasm32")]
return wasm::wasm_storage::IndexedDbNftStorage::new(self.ctx);
#[cfg(not(target_arch = "wasm32"))]
sql_storage::SqliteNftStorage::new(self.ctx)
}

/// `lock` is used at the beginning of functions where we need to update database, so
/// we can avoid race condition.
/// Also it prevents sending identical moralis requests several times in write operations.
#[inline]
pub fn lock(&mut self) -> MutexLockFuture<'_, ()> { self.guard.lock() }
}

/// `get_offset_limit` function calculates offset and limit for final result if we use pagination.
Expand Down
3 changes: 3 additions & 0 deletions mm2src/mm2_core/src/mm_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ pub struct MmCtx {
pub graceful_shutdown_registry: graceful_shutdown::GracefulShutdownRegistry,
#[cfg(target_arch = "wasm32")]
pub db_namespace: DbNamespaceId,
/// The context belonging to the `nft` mod: `NftCtx`.
pub nft_ctx: Mutex<Option<Arc<dyn Any + 'static + Send + Sync>>>,
}

impl MmCtx {
Expand Down Expand Up @@ -162,6 +164,7 @@ impl MmCtx {
graceful_shutdown_registry: graceful_shutdown::GracefulShutdownRegistry::default(),
#[cfg(target_arch = "wasm32")]
db_namespace: DbNamespaceId::Main,
nft_ctx: Mutex::new(None),
}
}

Expand Down

0 comments on commit eaed80e

Please sign in to comment.