- FlowExt is a Kotlin Multiplatform library, that provides many operators and extensions to Kotlin Coroutines Flow.
- FlowExt provides a collection of operators, Flows and utilities for Flow, that are not provided by Kotlinx Coroutine themselves, but are common in other Reactive Frameworks (rxjs, RxJava, RxSwift, rxdart, ...) and standards.
Kotlinx Coroutines Flow Extensions. Extensions to the Kotlin Flow library. Kotlin Flow extensions. Multiplatform Kotlinx Coroutines Flow Extensions. Multiplatform Extensions to the Kotlin Flow library. Multiplatform Kotlin Flow extensions. RxJS Kotlin Coroutines Flow. RxSwift Kotlin Coroutines Flow. RxJava Kotlin Coroutines Flow. RxJS Kotlin Flow. RxSwift Kotlin Flow. RxJava Kotlin Flow. RxJS Coroutines Flow. RxSwift Coroutines Flow. RxJava Coroutines Flow. Kotlin Flow operators. Coroutines Flow operators.
Author: Petrus Nguyễn Thái Học
Liked some of my work? Buy me a coffee (or more likely a beer)
android
.jvm
.js
(IR
).iosArm64
,iosX64
,iosSimulatorArm64
.watchosArm32
,watchosArm64
,watchosX64
,watchosSimulatorArm64
,watchosDeviceArm64
.tvosX64
,tvosSimulatorArm64
,tvosArm64
.macosX64
,macosArm64
.mingwX64
linuxX64
,linuxArm64
.androidNativeArm32
,androidNativeArm64
,androidNativeX86
,androidNativeX64
.
Note: This is still a relatively early version of FlowExt, with much more to be desired. I gladly accept PRs, ideas, opinions, or improvements. Thank you! :)
0.x release docs: https://hoc081098.github.io/FlowExt/docs/0.x
Snapshot docs: https://hoc081098.github.io/FlowExt/docs/latest
allprojects {
repositories {
...
mavenCentral()
}
}
implementation("io.github.hoc081098:FlowExt:0.7.3")
Snapshots of the development version are available in Sonatype's snapshots repository.
- Kotlin
allprojects {
repositories {
...
maven(url = "https://s01.oss.sonatype.org/content/repositories/snapshots/")
}
}
dependencies {
implementation("io.github.hoc081098:FlowExt:0.7.4-SNAPSHOT")
}
- Groovy
allprojects {
repositories {
...
maven { url "https://s01.oss.sonatype.org/content/repositories/snapshots/" }
}
}
dependencies {
implementation("io.github.hoc081098:FlowExt:0.7.4-SNAPSHOT")
}
-
Create
-
Intermediate operators
bufferCount
combine
cast
castNotNull
castNullable
chunked
safeCast
concatWith
startWith
flatMapFirst
exhaustMap
flattenFirst
flatMapConcatEager
mapEager
flattenEager
exhaustAll
groupBy
ignoreElements
mapIndexed
mapTo
mapToUnit
materialize
dematerialize
raceWith
ambWith
pairwise
repeat
retryWhenWithDelayStrategy
retryWhenWithExponentialBackoff
retryWithExponentialBackoff
scanWith
select
skipUntil
dropUntil
takeUntil
throttleTime
withLatestFrom
zipWithNext
plus
- Similar to RxJS bufferCount
- Similar to RxJava buffer
Buffers the source Flow
values until the size hits the maximum bufferSize
given.
Note, chunked
is an alias to bufferCount
.
range(start = 0, count = 10)
.bufferCount(bufferSize = 3)
.collect { println("bufferCount: $it") }
println("---")
range(start = 0, count = 10)
.bufferCount(bufferSize = 3, startBufferEvery = 2)
.collect { println("bufferCount: $it") }
Output:
bufferCount: [0, 1, 2]
bufferCount: [3, 4, 5]
bufferCount: [6, 7, 8]
bufferCount: [9]
---
bufferCount: [0, 1, 2]
bufferCount: [2, 3, 4]
bufferCount: [4, 5, 6]
bufferCount: [6, 7, 8]
bufferCount: [8, 9]
- Similar to RxJS concat
- Similar to RxJava concat
Creates an output Flow
which sequentially emits all values from the first given Flow
and then moves on to the next.
concat(
flow1 = flowOf(1, 2, 3),
flow2 = flowOf(4, 5, 6)
).collect { println("concat: $it") }
Output:
concat: 1
concat: 2
concat: 3
concat: 4
concat: 5
concat: 6
- Similar to RxJS defer
- Similar to RxJava defer
Creates a Flow
that, on collection, calls a Flow
factory to make a Flow
for each new FlowCollector
.
In some circumstances, waiting until the last minute (that is, until collection time)
to generate the Flow
can ensure that collectors receive the freshest data.
var count = 0L
val flow = defer {
delay(count)
flowOf(count++)
}
flow.collect { println("defer: $it") }
println("---")
flow.collect { println("defer: $it") }
println("---")
flow.collect { println("defer: $it") }
Output:
defer: 0
---
defer: 1
---
defer: 2
- Similar to RxJava fromCallable
Creates a cold flow that produces a single value from the given function
.
It calls the function for each new FlowCollector
.
See also flowFromSuspend for the suspend version.
var count = 0L
val flow = flowFromNonSuspend { count++ }
flow.collect { println("flowFromNonSuspend: $it") }
println("---")
flow.collect { println("flowFromNonSuspend: $it") }
println("---")
flow.collect { println("flowFromNonSuspend: $it") }
Output:
flowFromNonSuspend: 0
---
flowFromNonSuspend: 1
---
flowFromNonSuspend: 2
- Similar to RxJava fromCallable
Creates a cold flow that produces a single value from the given function
.
It calls the function for each new FlowCollector
.
See also flowFromNonSuspend for the non-suspend version.
var count = 0L
val flow = flowFromSuspend {
delay(count)
count++
}
flow.collect { println("flowFromSuspend: $it") }
println("---")
flow.collect { println("flowFromSuspend: $it") }
println("---")
flow.collect { println("flowFromSuspend: $it") }
Output:
flowFromSuspend: 0
---
flowFromSuspend: 1
---
flowFromSuspend: 2
- Similar to RxJS interval
Returns a Flow
that emits a 0L
after the initialDelay
and ever-increasing numbers
after each period
of time thereafter.
interval(initialDelay = 100.milliseconds, period = 1.seconds)
.take(5)
.collect { println("interval: $it") }
Output:
interval: 0
interval: 1
interval: 2
interval: 3
interval: 4
- Similar to RxJS NEVER
Returns a NeverFlow
that never emits any values to the FlowCollector
and never completes.
neverFlow()
.startWith(7)
.collect { println("neverFlow: $it") }
println("Completed!")
Output:
neverFlow: 7
// Never prints "Completed!"
- ReactiveX docs: http://reactivex.io/documentation/operators/amb.html
- Similar to RxJava amb .
- Similar to RxJS race
Mirrors the one Flow
in an Iterable
of several Flow
s that first either emits a value
or sends a termination event (error or complete event).
When you pass a number of source Flow
s to race
, it will pass through the emissions
and events of exactly one of these Flow
s: the first one that sends an event to race
,
either by emitting a value or sending an error or complete event.
race
will cancel the emissions and events of all of the other source Flow
s.
race(
flow {
delay(100)
emit(1)
emit(2)
emit(3)
},
flow {
delay(200)
emit(2)
emit(3)
emit(4)
}
).collect { println("race: $it") }
Output:
race: 1
race: 2
race: 3
- ReactiveX docs: http://reactivex.io/documentation/operators/range.html
- Similar to RxJS range
Creates a Flow
that emits a sequence of numbers within a specified range.
range(start = 0, count = 5)
.collect { println("range: $it") }
Output:
range: 1
range: 2
range: 3
range: 4
- ReactiveX docs: http://reactivex.io/documentation/operators/timer.html
- Similar to RxJS timer
Creates a Flow
that will wait for a given duration
, before emitting the value
.
timer(value = Unit, duration = 1.seconds)
.collect { println("timer: $it") }
Output:
// After 1 second
timer: kotlin.Unit
- ReactiveX docs: https://reactivex.io/documentation/operators/combinelatest.html
combine
versions for6 - 12
Flow
s.
- Similar to RxJava cast
Adapt this Flow
to be a Flow<R>
.
This Flow
is wrapped as a Flow<R>
which checks at run-time that each value event emitted
by this Flow is also an instance of R
.
At the collection time, if this Flow
has any value that is not an instance of R
,
a ClassCastException
will be thrown.
flowOf<Any?>(1, 2, 3)
.cast<Int>()
.collect { v: Int -> println("cast: $v") }
Output:
cast: 1
cast: 2
cast: 3
Adapt this Flow<T?>
to be a Flow<T>
.
At the collection time, if this Flow
has any null
value,
a NullPointerException
will be thrown.
flowOf<Int?>(1, 2, 3)
.castNotNull()
.collect { v: Int -> println("castNotNull: $v") }
Output:
castNotNull: 1
castNotNull: 2
castNotNull: 3
Adapt this Flow<*>
to be a Flow<R?>
.
At the collection time, if this Flow
has any value that is not an instance of R, null will be emitted.
flowOf<Any?>(1, 2, 3, "Kotlin", null)
.safeCast<Int?>()
.collect { v: Int? -> println("safeCast: $v") }
Output:
safeCast: 1
safeCast: 2
safeCast: 3
safeCast: null
safeCast: null
- Similar to RxJS concatWith
- Similar to RxJava concatWith
Returns a Flow
that emits the items emitted from the current Flow
, then the next, one after the other, without interleaving them.
Note, plus
is an alias to concatWith
.
flowOf(1, 2, 3)
.concatWith(flowOf(4, 5, 6))
.collect { println("concatWith: $it") }
println("---")
val flow1 = flowOf(1, 2, 3)
val flow2 = flowOf(4, 5, 6)
(flow1 + flow2).collect { println("plus: $it") }
Output:
concatWith: 1
concatWith: 2
concatWith: 3
concatWith: 4
concatWith: 5
concatWith: 6
---
plus: 1
plus: 2
plus: 3
plus: 4
plus: 5
plus: 6
- Similar to RxJS startWith
- Similar to RxJava startWith
Returns a Flow
that emits a specified item (or many items) before it begins to emit items emitted by the current Flow
.
flowOf(1, 2, 3)
.startWith(0)
.collect { println("startWith: $i") }
Output:
startWith: 0
startWith: 1
startWith: 2
startWith: 3
- Similar to RxJS exhaustMap
- Similar to RxSwift flatMapFirst
Projects each source value to a Flow
which is merged in the output Flow
only if the previous projected Flow
has completed.
If value is received while there is some projected Flow
sequence being merged, it will simply be ignored.
This method is a shortcut for map(transform).flattenFirst()
.
range(1, 5)
.onEach { delay(100) }
.flatMapFirst { timer(it, 130) }
.collect { println("flatMapFirst: $it") }
Output:
flatMapFirst: 1
flatMapFirst: 3
flatMapFirst: 5
- Similar to RxJS exhaustAll
Converts a higher-order Flow
into a first-order Flow
by dropping inner Flow
while the previous inner Flow
has not yet completed.
range(1, 5)
.onEach { delay(100) }
.map { timer(it, 130) }
.flattenFirst()
.collect { println("flattenFirst: $it") }
Output:
flattenFirst: 1
flattenFirst: 3
flattenFirst: 5
- Similar to RxJava groupBy
Groups the items emitted by the current Flow
according to a specified criterion,
and emits these grouped items as GroupedFlow
s.
range(1, 10)
.groupBy { it % 2 }
.flatMapMerge { groupedFlow ->
groupedFlow
.map { groupedFlow.key to it }
}
.collect { println("groupBy: $it") }
Output:
groupBy: (1, 1)
groupBy: (0, 2)
groupBy: (1, 3)
groupBy: (0, 4)
groupBy: (1, 5)
groupBy: (0, 6)
groupBy: (1, 7)
groupBy: (0, 8)
groupBy: (1, 9)
groupBy: (0, 10)
- Similar to RxJS ignoreElements
Ignores all elements emitted by the source Flow
, only passes calls of complete
or error
.
flowOf("you", "talking", "to", "me")
.ignoreElements()
.materialize()
.collect { println("ignoreElements: $it") }
Output:
ignoreElements: Event.Complete
- Similar to RxJava concatMapEager
Transforms elements emitted by the original Flow
by applying transform
, that returns another flow
,
and then merging and flattening these flows.
This operator calls transform
sequentially and then concatenates the resulting flows with a concurrency
limit on the number of concurrently collected flows.
It is a shortcut for map(transform).flattenConcatEager(concurrency)
.
range(1, 5)
.onEach { delay(100) }
.flatMapConcatEager(concurrency = 2) { v ->
timer(v, 130)
.onStart { println("flatMapConcatEager: onStart $v") }
.onCompletion { println("flatMapConcatEager: onCompletion $v") }
}
.collect { println("flatMapConcatEager: $it") }
Output:
flatMapConcatEager: onStart 1
flatMapConcatEager: onStart 2
flatMapConcatEager: 1
flatMapConcatEager: onCompletion 1
flatMapConcatEager: onStart 3
flatMapConcatEager: 2
flatMapConcatEager: onCompletion 2
flatMapConcatEager: onStart 4
flatMapConcatEager: 3
flatMapConcatEager: onCompletion 3
flatMapConcatEager: onStart 5
flatMapConcatEager: 4
flatMapConcatEager: onCompletion 4
flatMapConcatEager: 5
flatMapConcatEager: onCompletion 5
Returns a flow containing the results of applying the given transform
function
to each value and its index in the original flow.
range(1, 3)
.mapIndexed { index, value -> index to value }
.collect { println("mapIndexed: $it") }
Output:
mapIndexed: (0, 1)
mapIndexed: (1, 2)
mapIndexed: (2, 3)
- Similar to RxJS mapTo
Emits the given constant value on the output Flow
every time the source Flow
emits a value.
range(1, 3)
.mapTo("Value")
.collect { println("mapTo: $it") }
Output:
mapTo: Value
mapTo: Value
mapTo: Value
Emits kotlin.Unit
value on the output Flow
every time the source Flow
emits a value.
range(1, 3)
.mapToUnit()
.collect { println("mapToUnit: $it") }
Output:
mapToUnit: kotlin.Unit
mapToUnit: kotlin.Unit
mapToUnit: kotlin.Unit
- Similar to RxJS materialize
- Similar to RxJava materialize
Represents all of the notifications from the source Flow
as value
emissions marked with their original types within Event
objects.
flowOf(1, 2, 3)
.materialize()
.collect { println("materialize: $it") }
Output:
materialize: Event.Value(1)
materialize: Event.Value(2)
materialize: Event.Value(3)
materialize: Event.Complete
- Similar to RxJS dematerialize
- Similar to RxJava dematerialize
Converts a Flow
of Event
objects into the emissions that they represent.
flowOf(Event.Value(1), Event.Value(2), Event.Value(3))
.dematerialize()
.collect { println("dematerialize: $it") }
Output:
dematerialize: 1
dematerialize: 2
dematerialize: 3
- ReactiveX docs: http://reactivex.io/documentation/operators/amb.html
- Similar to RxJava ambWith .
- Similar to RxJS raceWith
Mirrors the current Flow
or the other Flow
s provided of which the first either emits a value
or sends a termination event (error or complete event).
flow {
delay(100)
emit(1)
emit(2)
emit(3)
}.raceWith(
flow {
delay(200)
emit(2)
emit(3)
emit(4)
}
).collect { println("raceWith: $it") }
Output:
raceWith: 1
raceWith: 2
raceWith: 3
- Similar to RxJS pairwise
Groups pairs of consecutive emissions together and emits them as a pair.
Emits the (n)th
and (n-1)th
events as a pair.
The first value won't be emitted until the second one arrives.
Note, zipWithNext
is an alias to pairwise
.
range(0, 4)
.pairwise()
.collect { println("pairwise: $it") }
println("---")
range(0, 4)
.zipWithNext { a, b -> "$a -> $b" }
.collect { println("zipWithNext: $it") }
Output:
pairwise: (0, 1)
pairwise: (1, 2)
pairwise: (2, 3)
---
zipWithNext: 0 -> 1
zipWithNext: 1 -> 2
zipWithNext: 2 -> 3
- Similar to RxJS repeat
Returns a Flow
that will recollect to the source stream when the source stream completes.
flowFromSuspend {
println("Start collecting...")
Random
.nextInt(0..3)
.also { println("Emit: $it") }
}
.repeat(
delay = 1.seconds,
count = 10
)
.filter { it == 2 }
.take(1)
.collect { println("repeat: $it") }
Output:
Start collecting...
Emit: 1
Start collecting...
Emit: 3
Start collecting...
Emit: 1
Start collecting...
Emit: 0
Start collecting...
Emit: 1
Start collecting...
Emit: 3
Start collecting...
Emit: 2
repeat: 2
Retries collection of the given flow when an exception occurs in the upstream flow and the
predicate
returns true. The predicate also receives an attempt
number as parameter,
starting from zero on the initial call. When predicate
returns true, the next retries will be
delayed after a duration computed by DelayStrategy.nextDelay
.
- ReactiveX docs: https://reactivex.io/documentation/operators/retry.html
var count = -1
flowFromSuspend {
++count
println("Call count=$count")
when (count) {
0 -> throw MyException(message = "Will retry...", cause = null)
1 -> "Result: count=$count"
else -> error("Unexpected: count=$count")
}
}
.retryWhenWithDelayStrategy(
strategy = DelayStrategy.FixedTimeDelayStrategy(duration = 200.milliseconds),
predicate = { cause, attempt -> cause is MyException && attempt < 1 }
)
.collect { println("retryWhenWithDelayStrategy: $it") }
Output:
Call count=0
Call count=1
retryWhenWithDelayStrategy: Result: count=1
- ReactiveX docs: https://reactivex.io/documentation/operators/retry.html
Retries collection of the given flow with exponential backoff delay strategy
when an exception occurs in the upstream flow and the predicate
returns true. When predicate
returns true,
the next retries will be delayed after a duration computed by DelayStrategy.ExponentialBackoffDelayStrategy
.
var count = -1
flowFromSuspend {
++count
println("Call count=$count")
when (count) {
0 -> throw MyException(message = "Will retry...", cause = null)
1 -> "Result: count=$count"
else -> error("Unexpected: count=$count")
}
}
.retryWhenWithExponentialBackoff(
initialDelay = 500.milliseconds,
factor = 2.0,
) { cause, attempt -> cause is MyException && attempt < 1 }
.collect { println("retryWhenWithExponentialBackoff: $it") }
Output:
Call count=0
Call count=1
retryWhenWithExponentialBackoff: Result: count=1
- ReactiveX docs: https://reactivex.io/documentation/operators/retry.html
Retries collection of the given flow with exponential backoff delay strategy
when an exception occurs in the upstream flow and the predicate
returns true. When predicate
returns true,
the next retries will be delayed after a duration computed by DelayStrategy.ExponentialBackoffDelayStrategy
.
var count = -1
flowFromSuspend {
++count
println("Call count=$count")
when (count) {
0 -> throw MyException(message = "Will retry...", cause = null)
1 -> "Result: count=$count"
else -> error("Unexpected: count=$count")
}
}
.retryWithExponentialBackoff(
maxAttempt = 2,
initialDelay = 500.milliseconds,
factor = 2.0,
) { it is MyException }
.collect { println("retryWithExponentialBackoff: $it") }
Output:
Call count=0
Call count=1
retryWithExponentialBackoff: Result: count=1
- Similar to RxJava scanWith
Folds the given flow with [operation], emitting every intermediate result, including the initial value supplied by [initialSupplier] at the collection time.
This is a variant of scan
that the initial value is lazily supplied,
which is useful when the initial value is expensive to create
or depends on a logic that should be executed at the collection time (lazy semantics).
var count = 0
val mutex = Mutex()
suspend fun calculateInitialValue(): Int {
println("calculateInitialValue")
delay(1000)
return mutex.withLock { count++ }
}
flowOf(1, 2, 3)
.scanWith(::calculateInitialValue) { acc, e -> acc + e }
.collect { println("scanWith[1]: $it") }
flowOf(1, 2, 3)
.scanWith(::calculateInitialValue) { acc, e -> acc + e }
.collect { println("scanWith[2]: $it") }
Output:
calculateInitialValue
scanWith[1]: 0
scanWith[1]: 1
scanWith[1]: 3
scanWith[1]: 6
calculateInitialValue
scanWith[2]: 1
scanWith[2]: 2
scanWith[2]: 4
scanWith[2]: 7
-
Inspirited by NgRx memoized selector.
-
Selectors are pure functions used for obtaining slices of a Flow of state.
FlowExt
provides a few helper functions for optimizing this selection.- Selectors can compute derived data, to store the minimal possible state.
- Selectors are efficient. A selector is not recomputed unless one of its arguments changes.
- When using the [select] functions, it will keep track of the latest arguments in which your selector function was invoked. Because selectors are pure functions, the last result can be returned when the arguments match without re-invoking your selector function. This can provide performance benefits, particularly with selectors that perform expensive computation. This practice is known as memoization.
data class UiState(
val items: List<String> = emptyList(),
val term: String? = null,
val isLoading: Boolean = false,
val error: Throwable? = null
)
flow {
println("select: emit 1")
emit(UiState())
println("select: emit 2")
emit(
UiState(
items = listOf("a", "b", "c"),
term = "a",
isLoading = true,
error = Throwable("error")
)
)
println("select: emit 3")
emit(
UiState(
items = listOf("a", "b", "c"),
term = "a",
isLoading = false,
error = Throwable("error")
)
)
println("select: emit 4")
emit(
UiState(
items = listOf("a", "b", "c"),
term = "b",
isLoading = false,
error = Throwable("error")
)
)
}
.select(
selector1 = { it.items },
selector2 = { it.term },
projector = { items, term ->
term?.let { v ->
items.filter { it.contains(v, ignoreCase = true) }
}
}
)
.collect { println("select: $it") }
Output:
select: emit 1
select: null
select: emit 2
select: [a]
select: emit 3
select: emit 4
select: [b]
- ReactiveX docs: https://reactivex.io/documentation/operators/skipuntil.html
- Similar to RxJS skipUntil
- Similar to RxJava skipUntil
Returns a Flow
that skips items emitted by the source Flow
until a second Flow
emits a value or completes.
flowOf(1, 2, 3)
.onEach { delay(100) }
.skipUntil(timer(Unit, 150))
.collect { println("skipUntil: $it") }
Output:
skipUntil: 2
skipUntil: 3
- ReactiveX docs: http://reactivex.io/documentation/operators/takeuntil.html
- Similar to RxJS takeUntil
Emits the values emitted by the source Flow
until a notifier Flow
emits a value or completes.
range(0, 5)
.onEach { delay(100) }
.takeUntil(timer(Unit, 270.milliseconds))
.collect { println("takeUntil: $it") }
Output:
takeUntil: 0
takeUntil: 1
- ReactiveX docs: https://reactivex.io/documentation/operators/debounce.html
- Similar to RxJS throttleTime
Returns a Flow
that emits a value from the source Flow
, then ignores subsequent source values
for a duration determined by durationSelector
, then repeats this process for the next source value.
(1..10)
.asFlow()
.onEach { delay(200) }
.throttleTime(500)
.collect { println("throttleTime: $it") }
Output:
throttleTime: 1
throttleTime: 4
throttleTime: 7
throttleTime: 10
- RxMarbles: https://rxmarbles.com/#withLatestFrom
- Similar to RxJS withLatestFrom
Merges two Flow
s into one Flow
by combining each value from self with the latest value from the second Flow
, if any.
Values emitted by self before the second Flow
has emitted any values will be omitted.
range(0, 5)
.onEach { delay(100) }
.withLatestFrom(
range(0, 10)
.onEach { delay(70) }
)
.collect { println("withLatestFrom: $it") }
Output:
withLatestFrom: (0, 0)
withLatestFrom: (1, 1)
withLatestFrom: (2, 3)
withLatestFrom: (3, 4)
withLatestFrom: (4, 6)
... and more, please check out Docs 0.x/Docs snapshot.
MIT License
Copyright (c) 2021-2023 Petrus Nguyễn Thái Học