Skip to content

Commit

Permalink
Revert "Update SharingStarted usages from Lazily to WhileSubscribed(). (
Browse files Browse the repository at this point in the history
#62)"

This reverts commit de98092.
  • Loading branch information
ruixhuang committed Apr 11, 2024
1 parent 41f045b commit 4613710
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,19 @@ import exchange.dydx.abacus.output.SubaccountPosition
import exchange.dydx.abacus.protocols.LocalizerProtocol
import exchange.dydx.dydxstatemanager.AbacusStateManagerProtocol
import exchange.dydx.dydxstatemanager.clientState.favorite.DydxFavoriteStoreProtocol
import exchange.dydx.trading.common.di.CoroutineScopes
import exchange.dydx.trading.common.formatter.DydxFormatter
import exchange.dydx.trading.feature.shared.viewstate.SharedMarketViewState
import kotlinx.coroutines.MainScope
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.filterNotNull
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.shareIn
import kotlinx.coroutines.plus
import javax.inject.Inject

interface MarketInfoStreaming {
Expand All @@ -38,10 +40,8 @@ class MarketInfoStream @Inject constructor(
val formatter: DydxFormatter,
val localizer: LocalizerProtocol,
val favoriteStore: DydxFavoriteStoreProtocol,
@CoroutineScopes.App private val streamScope: CoroutineScope,
) : MutableMarketInfoStreaming {

private val streamScope = MainScope()

override fun update(marketId: String?) {
abacusStateManager.setMarket(marketId)
abacusStateManager.startTrade()
Expand All @@ -54,7 +54,8 @@ class MarketInfoStream @Inject constructor(
abacusStateManager.state.market(marketId)
}
.distinctUntilChanged()
.shareIn(streamScope, SharingStarted.WhileSubscribed(), 1)
.flowOn(Dispatchers.Default)
.shareIn(streamScope, SharingStarted.Lazily, 1)

override val marketAndAsset: Flow<MarketAndAsset?> =
abacusStateManager.marketId
Expand All @@ -77,7 +78,8 @@ class MarketInfoStream @Inject constructor(
}
}
.distinctUntilChanged()
.shareIn(streamScope, SharingStarted.WhileSubscribed(), 1)
.flowOn(Dispatchers.Default)
.shareIn(streamScope, SharingStarted.Lazily, 1)

override val selectedSubaccountPosition: Flow<SubaccountPosition?> =
combine(
Expand All @@ -93,7 +95,8 @@ class MarketInfoStream @Inject constructor(
}
}
.distinctUntilChanged()
.shareIn(streamScope, SharingStarted.WhileSubscribed(), 1)
.flowOn(Dispatchers.Default)
.shareIn(streamScope, SharingStarted.Lazily, 1)

override val sharedMarketViewState: Flow<SharedMarketViewState?> =
combine(
Expand All @@ -113,7 +116,8 @@ class MarketInfoStream @Inject constructor(
}
}
.distinctUntilChanged()
.shareIn(streamScope, SharingStarted.WhileSubscribed(), 1)
.flowOn(Dispatchers.Default)
.shareIn(streamScope, SharingStarted.Lazily, 1)
}

data class MarketAndAsset(
Expand Down
Loading

0 comments on commit 4613710

Please sign in to comment.