diff --git a/v4/feature/market/src/main/java/exchange/dydx/trading/feature/market/marketinfo/streams/MarketInfoStream.kt b/v4/feature/market/src/main/java/exchange/dydx/trading/feature/market/marketinfo/streams/MarketInfoStream.kt index 0fbe1bcf..d655b1a9 100644 --- a/v4/feature/market/src/main/java/exchange/dydx/trading/feature/market/marketinfo/streams/MarketInfoStream.kt +++ b/v4/feature/market/src/main/java/exchange/dydx/trading/feature/market/marketinfo/streams/MarketInfoStream.kt @@ -8,17 +8,19 @@ import exchange.dydx.abacus.output.SubaccountPosition import exchange.dydx.abacus.protocols.LocalizerProtocol import exchange.dydx.dydxstatemanager.AbacusStateManagerProtocol import exchange.dydx.dydxstatemanager.clientState.favorite.DydxFavoriteStoreProtocol +import exchange.dydx.trading.common.di.CoroutineScopes import exchange.dydx.trading.common.formatter.DydxFormatter import exchange.dydx.trading.feature.shared.viewstate.SharedMarketViewState -import kotlinx.coroutines.MainScope +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.SharingStarted import kotlinx.coroutines.flow.combine import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.filterNotNull import kotlinx.coroutines.flow.flatMapLatest +import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.shareIn -import kotlinx.coroutines.plus import javax.inject.Inject interface MarketInfoStreaming { @@ -38,10 +40,8 @@ class MarketInfoStream @Inject constructor( val formatter: DydxFormatter, val localizer: LocalizerProtocol, val favoriteStore: DydxFavoriteStoreProtocol, + @CoroutineScopes.App private val streamScope: CoroutineScope, ) : MutableMarketInfoStreaming { - - private val streamScope = MainScope() - override fun update(marketId: String?) { abacusStateManager.setMarket(marketId) abacusStateManager.startTrade() @@ -54,7 +54,8 @@ class MarketInfoStream @Inject constructor( abacusStateManager.state.market(marketId) } .distinctUntilChanged() - .shareIn(streamScope, SharingStarted.WhileSubscribed(), 1) + .flowOn(Dispatchers.Default) + .shareIn(streamScope, SharingStarted.Lazily, 1) override val marketAndAsset: Flow = abacusStateManager.marketId @@ -77,7 +78,8 @@ class MarketInfoStream @Inject constructor( } } .distinctUntilChanged() - .shareIn(streamScope, SharingStarted.WhileSubscribed(), 1) + .flowOn(Dispatchers.Default) + .shareIn(streamScope, SharingStarted.Lazily, 1) override val selectedSubaccountPosition: Flow = combine( @@ -93,7 +95,8 @@ class MarketInfoStream @Inject constructor( } } .distinctUntilChanged() - .shareIn(streamScope, SharingStarted.WhileSubscribed(), 1) + .flowOn(Dispatchers.Default) + .shareIn(streamScope, SharingStarted.Lazily, 1) override val sharedMarketViewState: Flow = combine( @@ -113,7 +116,8 @@ class MarketInfoStream @Inject constructor( } } .distinctUntilChanged() - .shareIn(streamScope, SharingStarted.WhileSubscribed(), 1) + .flowOn(Dispatchers.Default) + .shareIn(streamScope, SharingStarted.Lazily, 1) } data class MarketAndAsset( diff --git a/v4/integration/dydxStateManager/src/main/java/exchange/dydx/dydxstatemanager/AbacusState.kt b/v4/integration/dydxStateManager/src/main/java/exchange/dydx/dydxstatemanager/AbacusState.kt index d961d2df..b7a0c487 100644 --- a/v4/integration/dydxStateManager/src/main/java/exchange/dydx/dydxstatemanager/AbacusState.kt +++ b/v4/integration/dydxStateManager/src/main/java/exchange/dydx/dydxstatemanager/AbacusState.kt @@ -42,8 +42,9 @@ import exchange.dydx.dydxstatemanager.clientState.transfers.DydxTransferInstance import exchange.dydx.dydxstatemanager.clientState.transfers.DydxTransferState import exchange.dydx.dydxstatemanager.clientState.wallets.DydxWalletInstance import exchange.dydx.dydxstatemanager.clientState.wallets.DydxWalletState +import exchange.dydx.trading.common.di.CoroutineScopes +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.MainScope import kotlinx.coroutines.flow.SharingStarted import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.combine @@ -62,10 +63,8 @@ class AbacusState( val transferState: StateFlow, private val abacusStateManager: SingletonAsyncAbacusStateManagerProtocol, private val parser: ParserProtocol, + @CoroutineScopes.App private val stateManagerScope: CoroutineScope, ) { - - private val stateManagerScope = MainScope() - var isMainNet: Boolean? = null get() = abacusStateManager.environment?.isMainNet ?: false @@ -79,7 +78,7 @@ class AbacusState( currentWallet.cosmoAddress?.isNotEmpty() == true } ?: false } - .stateIn(stateManagerScope, SharingStarted.WhileSubscribed(), false) + .stateIn(stateManagerScope, SharingStarted.Lazily, false) } /** @@ -88,7 +87,7 @@ class AbacusState( val currentWallet: StateFlow by lazy { walletState .map { it?.currentWallet } - .stateIn(stateManagerScope, SharingStarted.WhileSubscribed(), null) + .stateIn(stateManagerScope, SharingStarted.Lazily, null) } val transfers: StateFlow?> by lazy { @@ -98,13 +97,13 @@ class AbacusState( val subaccountNumber = subaccountNumber ?: return@map null it?.get(subaccountNumber)?.toList() } - .stateIn(stateManagerScope, SharingStarted.WhileSubscribed(), null) + .stateIn(stateManagerScope, SharingStarted.Lazily, null) } fun transferInstance(transactionHash: String?): StateFlow { return transferState .map { it?.transfers?.first { it.transactionHash == transactionHash } } - .stateIn(stateManagerScope, SharingStarted.WhileSubscribed(), null) + .stateIn(stateManagerScope, SharingStarted.Lazily, null) } /** @@ -113,7 +112,7 @@ class AbacusState( val transferStatuses: StateFlow> by lazy { perpetualState .map { it?.transferStatuses ?: emptyMap() } - .stateIn(stateManagerScope, SharingStarted.WhileSubscribed(), emptyMap()) + .stateIn(stateManagerScope, SharingStarted.Lazily, emptyMap()) } /** @@ -122,13 +121,13 @@ class AbacusState( val account: StateFlow by lazy { perpetualState .map { it?.account } - .stateIn(stateManagerScope, SharingStarted.WhileSubscribed(), null) + .stateIn(stateManagerScope, SharingStarted.Lazily, null) } val hasAccount: StateFlow by lazy { account .map { it != null } - .stateIn(stateManagerScope, SharingStarted.WhileSubscribed(), false) + .stateIn(stateManagerScope, SharingStarted.Lazily, false) } /** @@ -140,7 +139,7 @@ class AbacusState( .map { state: PerpetualState? -> state?.account?.balances?.get(tokenDenom)?.amount?.toDoubleOrNull() } - .stateIn(stateManagerScope, SharingStarted.WhileSubscribed(), null) + .stateIn(stateManagerScope, SharingStarted.Lazily, null) } fun stakingBalance(tokenDenom: String?): StateFlow { @@ -148,7 +147,7 @@ class AbacusState( .map { state: PerpetualState? -> state?.account?.stakingBalances?.get(tokenDenom)?.amount?.toDoubleOrNull() } - .stateIn(stateManagerScope, SharingStarted.WhileSubscribed(), null) + .stateIn(stateManagerScope, SharingStarted.Lazily, null) } /** @@ -157,7 +156,7 @@ class AbacusState( fun subaccount(subaccountNumber: String): StateFlow { return account .map { it?.subaccounts?.get(subaccountNumber) } - .stateIn(stateManagerScope, SharingStarted.WhileSubscribed(), null) + .stateIn(stateManagerScope, SharingStarted.Lazily, null) } val selectedSubaccount: StateFlow by lazy { @@ -169,7 +168,7 @@ class AbacusState( null } } - .stateIn(stateManagerScope, SharingStarted.WhileSubscribed(), null) + .stateIn(stateManagerScope, SharingStarted.Lazily, null) } val selectedSubaccountFills: StateFlow?> by lazy { @@ -181,7 +180,7 @@ class AbacusState( null } } - .stateIn(stateManagerScope, SharingStarted.WhileSubscribed(), null) + .stateIn(stateManagerScope, SharingStarted.Lazily, null) } val selectedSubaccountFundings: StateFlow?> by lazy { @@ -193,7 +192,7 @@ class AbacusState( null } } - .stateIn(stateManagerScope, SharingStarted.WhileSubscribed(), null) + .stateIn(stateManagerScope, SharingStarted.Lazily, null) } val selectedSubaccountPositions: StateFlow?> by lazy { @@ -201,7 +200,7 @@ class AbacusState( .map { subaccount -> subaccount?.openPositions } - .stateIn(stateManagerScope, SharingStarted.WhileSubscribed(), null) + .stateIn(stateManagerScope, SharingStarted.Lazily, null) } val selectedSubaccountOrders: StateFlow?> by lazy { @@ -209,7 +208,7 @@ class AbacusState( .map { subaccount -> subaccount?.orders } - .stateIn(stateManagerScope, SharingStarted.WhileSubscribed(), null) + .stateIn(stateManagerScope, SharingStarted.Lazily, null) } val selectedSubaccountPNLs: StateFlow?> by lazy { @@ -221,7 +220,7 @@ class AbacusState( null } } - .stateIn(stateManagerScope, SharingStarted.WhileSubscribed(), null) + .stateIn(stateManagerScope, SharingStarted.Lazily, null) } /** @@ -230,7 +229,7 @@ class AbacusState( val historicalFundingsMap: StateFlow>?> by lazy { perpetualState .map { it?.historicalFundings } - .stateIn(stateManagerScope, SharingStarted.WhileSubscribed(), null) + .stateIn(stateManagerScope, SharingStarted.Lazily, null) } /** @@ -239,7 +238,7 @@ class AbacusState( fun historicalFundings(marketId: String): StateFlow?> { return historicalFundingsMap .map { it?.get(marketId) } - .stateIn(stateManagerScope, SharingStarted.WhileSubscribed(), null) + .stateIn(stateManagerScope, SharingStarted.Lazily, null) } /** @@ -250,7 +249,7 @@ class AbacusState( .map { it?.marketsSummary } - .stateIn(stateManagerScope, SharingStarted.WhileSubscribed(), null) + .stateIn(stateManagerScope, SharingStarted.Lazily, null) } /** @@ -259,7 +258,7 @@ class AbacusState( val candlesMap: StateFlow?> by lazy { perpetualState .map { it?.candles } - .stateIn(stateManagerScope, SharingStarted.WhileSubscribed(), null) + .stateIn(stateManagerScope, SharingStarted.Lazily, null) } /** @@ -268,7 +267,7 @@ class AbacusState( fun candles(marketId: String): StateFlow { return candlesMap .map { it?.get(marketId) } - .stateIn(stateManagerScope, SharingStarted.WhileSubscribed(), null) + .stateIn(stateManagerScope, SharingStarted.Lazily, null) } /** @@ -279,7 +278,7 @@ class AbacusState( .map { it?.orderbooks } - .stateIn(stateManagerScope, SharingStarted.WhileSubscribed(), null) + .stateIn(stateManagerScope, SharingStarted.Lazily, null) } /** @@ -288,7 +287,7 @@ class AbacusState( fun orderbook(marketId: String): StateFlow { return orderbooksMap .map { it?.get(marketId) } - .stateIn(stateManagerScope, SharingStarted.WhileSubscribed(), null) + .stateIn(stateManagerScope, SharingStarted.Lazily, null) } /** @@ -297,7 +296,7 @@ class AbacusState( val tradesMap: StateFlow>?> by lazy { perpetualState .map { it?.trades } - .stateIn(stateManagerScope, SharingStarted.WhileSubscribed(), null) + .stateIn(stateManagerScope, SharingStarted.Lazily, null) } /** @@ -306,7 +305,7 @@ class AbacusState( fun trade(marketId: String): StateFlow?> { return tradesMap .map { it?.get(marketId) } - .stateIn(stateManagerScope, SharingStarted.WhileSubscribed(), null) + .stateIn(stateManagerScope, SharingStarted.Lazily, null) } /** @@ -315,7 +314,7 @@ class AbacusState( val markeeIds: StateFlow?> by lazy { marketSummary .map { it?.marketIds() } - .stateIn(stateManagerScope, SharingStarted.WhileSubscribed(), null) + .stateIn(stateManagerScope, SharingStarted.Lazily, null) } /** @@ -326,7 +325,7 @@ class AbacusState( .map { it?.markets } - .stateIn(stateManagerScope, SharingStarted.WhileSubscribed(), null) + .stateIn(stateManagerScope, SharingStarted.Lazily, null) } /** @@ -341,7 +340,7 @@ class AbacusState( map?.get(id) } } - .stateIn(stateManagerScope, SharingStarted.WhileSubscribed(), null) + .stateIn(stateManagerScope, SharingStarted.Lazily, null) } /** @@ -352,7 +351,7 @@ class AbacusState( .map { it?.get(marketId) } - .stateIn(stateManagerScope, SharingStarted.WhileSubscribed(), null) + .stateIn(stateManagerScope, SharingStarted.Lazily, null) } /** @@ -361,7 +360,7 @@ class AbacusState( val assetMap: StateFlow?> by lazy { perpetualState .map { it?.assets } - .stateIn(stateManagerScope, SharingStarted.WhileSubscribed(), null) + .stateIn(stateManagerScope, SharingStarted.Lazily, null) } /** @@ -379,7 +378,7 @@ class AbacusState( } output } - .stateIn(stateManagerScope, SharingStarted.WhileSubscribed(), null) + .stateIn(stateManagerScope, SharingStarted.Lazily, null) } /** @@ -389,7 +388,7 @@ class AbacusState( perpetualState .map { it?.input?.trade } .throttleTime(10, throttleConfiguration = ThrottleConfiguration.LEADING_AND_TRAILING) - .stateIn(stateManagerScope, SharingStarted.WhileSubscribed(), null) + .stateIn(stateManagerScope, SharingStarted.Lazily, null) } /** @@ -399,7 +398,7 @@ class AbacusState( perpetualState .map { it?.input?.closePosition } .throttleTime(10, throttleConfiguration = ThrottleConfiguration.LEADING_AND_TRAILING) - .stateIn(stateManagerScope, SharingStarted.WhileSubscribed(), null) + .stateIn(stateManagerScope, SharingStarted.Lazily, null) } /** @@ -409,7 +408,7 @@ class AbacusState( perpetualState .map { it?.input?.transfer } .throttleTime(10, throttleConfiguration = ThrottleConfiguration.LEADING_AND_TRAILING) - .stateIn(stateManagerScope, SharingStarted.WhileSubscribed(), null) + .stateIn(stateManagerScope, SharingStarted.Lazily, null) } /** @@ -419,7 +418,7 @@ class AbacusState( perpetualState .map { it?.input?.receiptLines ?: emptyList() } .throttleTime(10, throttleConfiguration = ThrottleConfiguration.LEADING_AND_TRAILING) - .stateIn(stateManagerScope, SharingStarted.WhileSubscribed(), emptyList()) + .stateIn(stateManagerScope, SharingStarted.Lazily, emptyList()) } /** @@ -429,7 +428,7 @@ class AbacusState( perpetualState .map { it?.input?.errors ?: emptyList() } .throttleTime(10, throttleConfiguration = ThrottleConfiguration.LEADING_AND_TRAILING) - .stateIn(stateManagerScope, SharingStarted.WhileSubscribed(), emptyList()) + .stateIn(stateManagerScope, SharingStarted.Lazily, emptyList()) } /** @@ -437,7 +436,7 @@ class AbacusState( */ val lastOrder: StateFlow by lazy { lastOrderPublisher - .stateIn(stateManagerScope, SharingStarted.WhileSubscribed(), null) + .stateIn(stateManagerScope, SharingStarted.Lazily, null) } /** @@ -453,7 +452,7 @@ class AbacusState( } } } - .stateIn(stateManagerScope, SharingStarted.WhileSubscribed(), null) + .stateIn(stateManagerScope, SharingStarted.Lazily, null) } /** @@ -462,7 +461,7 @@ class AbacusState( val configs: StateFlow by lazy { perpetualState .map { it?.configs } - .stateIn(stateManagerScope, SharingStarted.WhileSubscribed(), null) + .stateIn(stateManagerScope, SharingStarted.Lazily, null) } /** @@ -471,7 +470,7 @@ class AbacusState( val user: StateFlow by lazy { perpetualState .map { it?.wallet?.user } - .stateIn(stateManagerScope, SharingStarted.WhileSubscribed(), null) + .stateIn(stateManagerScope, SharingStarted.Lazily, null) } /** @@ -482,7 +481,7 @@ class AbacusState( .map { it?.launchIncentive } - .stateIn(stateManagerScope, SharingStarted.WhileSubscribed(), null) + .stateIn(stateManagerScope, SharingStarted.Lazily, null) } val launchIncentivePoints: StateFlow by lazy { @@ -490,7 +489,7 @@ class AbacusState( .map { it?.account?.launchIncentivePoints } - .stateIn(stateManagerScope, SharingStarted.WhileSubscribed(), null) + .stateIn(stateManagerScope, SharingStarted.Lazily, null) } /** @@ -504,7 +503,7 @@ class AbacusState( val restriction: StateFlow by lazy { perpetualState .map { it?.restriction?.restriction ?: Restriction.NO_RESTRICTION } - .stateIn(stateManagerScope, SharingStarted.WhileSubscribed(), Restriction.NO_RESTRICTION) + .stateIn(stateManagerScope, SharingStarted.Lazily, Restriction.NO_RESTRICTION) } /** diff --git a/v4/integration/dydxStateManager/src/main/java/exchange/dydx/dydxstatemanager/AbacusStateManager.kt b/v4/integration/dydxStateManager/src/main/java/exchange/dydx/dydxstatemanager/AbacusStateManager.kt index 8e94edb9..f4403037 100644 --- a/v4/integration/dydxStateManager/src/main/java/exchange/dydx/dydxstatemanager/AbacusStateManager.kt +++ b/v4/integration/dydxStateManager/src/main/java/exchange/dydx/dydxstatemanager/AbacusStateManager.kt @@ -198,6 +198,7 @@ class AbacusStateManager @Inject constructor( abacusStateManager = asyncStateManager, transferState = transferStateManager.state, parser = parser, + stateManagerScope = appScope, ) override val deploymentUri: String