Skip to content

Commit

Permalink
Update SharingStarted usages from Lazily to WhileSubscribed(). (#62)
Browse files Browse the repository at this point in the history
  • Loading branch information
prashanDYDX authored Apr 5, 2024
1 parent 48822ed commit de98092
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,15 @@ import exchange.dydx.dydxstatemanager.AbacusStateManagerProtocol
import exchange.dydx.dydxstatemanager.clientState.favorite.DydxFavoriteStoreProtocol
import exchange.dydx.trading.common.formatter.DydxFormatter
import exchange.dydx.trading.feature.shared.viewstate.SharedMarketViewState
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.MainScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.filterNotNull
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.shareIn
import kotlinx.coroutines.newSingleThreadContext
import kotlinx.coroutines.plus
import javax.inject.Inject

interface MarketInfoStreaming {
Expand All @@ -42,7 +40,7 @@ class MarketInfoStream @Inject constructor(
val favoriteStore: DydxFavoriteStoreProtocol,
) : MutableMarketInfoStreaming {

private val streamScope = CoroutineScope(newSingleThreadContext("MarketInfoStream"))
private val streamScope = MainScope()

override fun update(marketId: String?) {
abacusStateManager.setMarket(marketId)
Expand All @@ -56,8 +54,7 @@ class MarketInfoStream @Inject constructor(
abacusStateManager.state.market(marketId)
}
.distinctUntilChanged()
.flowOn(Dispatchers.Default)
.shareIn(streamScope, SharingStarted.Lazily, 1)
.shareIn(streamScope, SharingStarted.WhileSubscribed(), 1)

override val marketAndAsset: Flow<MarketAndAsset?> =
abacusStateManager.marketId
Expand All @@ -80,8 +77,7 @@ class MarketInfoStream @Inject constructor(
}
}
.distinctUntilChanged()
.flowOn(Dispatchers.Default)
.shareIn(streamScope, SharingStarted.Lazily, 1)
.shareIn(streamScope, SharingStarted.WhileSubscribed(), 1)

override val selectedSubaccountPosition: Flow<SubaccountPosition?> =
combine(
Expand All @@ -97,8 +93,7 @@ class MarketInfoStream @Inject constructor(
}
}
.distinctUntilChanged()
.flowOn(Dispatchers.Default)
.shareIn(streamScope, SharingStarted.Lazily, 1)
.shareIn(streamScope, SharingStarted.WhileSubscribed(), 1)

override val sharedMarketViewState: Flow<SharedMarketViewState?> =
combine(
Expand All @@ -118,8 +113,7 @@ class MarketInfoStream @Inject constructor(
}
}
.distinctUntilChanged()
.flowOn(Dispatchers.Default)
.shareIn(streamScope, SharingStarted.Lazily, 1)
.shareIn(streamScope, SharingStarted.WhileSubscribed(), 1)
}

data class MarketAndAsset(
Expand Down
Loading

0 comments on commit de98092

Please sign in to comment.