Skip to content

Convert RxJava Flowable to Flow and elements seem to be delayed or skipped to collect when onErrorReturnItem called. #1766

@bennyhuo

Description

@bennyhuo

Hi, I use asFlow to convert rxjava Flowable to Flow.

suspend fun main() {
    Flowable.create<Int>({ emitter ->
        repeat(5) {
            emitter.onNext(it)
            println("emit: $it")
            Thread.sleep(100)
        }
    }, BackpressureStrategy.BUFFER)
        .onErrorReturnItem(-1)
        .subscribeOn(Schedulers.io())
        .asFlow()
        .collect {
            println("collect: $it")
        }
}

The producer is slower than consumer, but the result is strange:

emit: 0
collect: 0
emit: 1
emit: 2
emit: 3
emit: 4
collect: 1
collect: 2
collect: 3
collect: 4

If I use BackpressureStrategy.LATEST,the consumer will only get 0 and 4 (the first and the last):

emit: 0
collect: 0
emit: 1
emit: 2
emit: 3
emit: 4
collect: 4

I finally figure out that it may be related to the onErrorReturnItem(-1) call. If I comment it out, the result seems as expected:

emit: 0
collect: 0
collect: 1
emit: 1
collect: 2
emit: 2
collect: 3
emit: 3
collect: 4
emit: 4

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions