Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

concatEager does not unsubscribe from sources #6404

Closed
rossdanderson opened this issue Feb 12, 2019 · 4 comments · Fixed by #6405
Closed

concatEager does not unsubscribe from sources #6404

rossdanderson opened this issue Feb 12, 2019 · 4 comments · Fixed by #6405

Comments

@rossdanderson
Copy link

Hi, I have a question regarding the behaviour of concatEager in version 2.2.6.

We were caught out recently by the behaviour of concatEager when the downstream unsubscribes before the upstream sources have completed, namely that the upstream sources are not unsubscribed from when this happens. This left us with a rather nasty memory leak in our application due to some downstream periodical resubscription logic.

Consider the following two tests (apologies for the Kotlin, I can convert to Java on request, but I hope it makes sense)

    @Test
    fun concat() {
        val _publisher1 = PublishProcessor.create<Unit>()
        val _publisher2 = PublishProcessor.create<Unit>()
        val publisher1 = _publisher1.doOnSubscribe { println("1 Subscribed") }.doOnCancel { println("1 Cancelled") }
        val publisher2 = _publisher2.doOnSubscribe { println("2 Subscribed") }.doOnCancel { println("2 Cancelled") }

        assertFalse(_publisher1.hasSubscribers())
        assertFalse(_publisher2.hasSubscribers())

        val testSubscriber = Flowable.concat(Flowable.just(publisher1, publisher2))
            .subscribe()

        assertTrue(_publisher1.hasSubscribers())
        assertFalse(_publisher2.hasSubscribers())

        testSubscriber.dispose()

        assertFalse(_publisher1.hasSubscribers())
        assertFalse(_publisher2.hasSubscribers())
    }

    @Test
    fun concatEager() {
        val _publisher1 = PublishProcessor.create<Unit>()
        val _publisher2 = PublishProcessor.create<Unit>()
        val publisher1 = _publisher1.doOnSubscribe { println("1 Subscribed") }.doOnCancel { println("1 Cancelled") }
        val publisher2 = _publisher2.doOnSubscribe { println("2 Subscribed") }.doOnCancel { println("2 Cancelled") }

        assertFalse(_publisher1.hasSubscribers())
        assertFalse(_publisher2.hasSubscribers())

        val testSubscriber = Flowable.concatEager(Flowable.just(publisher1, publisher2))
            .subscribe()

        assertTrue(_publisher1.hasSubscribers())
        assertTrue(_publisher2.hasSubscribers())

        testSubscriber.dispose()

        assertFalse(_publisher1.hasSubscribers())
        assertFalse(_publisher2.hasSubscribers())
    }

The first test (concat) completes successfully with the following output:

1 Subscribed
1 Cancelled

As we expected, _publisher1 was unsubscribed from when the testSubscriber is disposed.

The second test (concatEager) fails however with the following output:

1 Subscribed
2 Subscribed
2 Cancelled

This doesn't behave as we expected, with the current active publisher _publisher1 not having been unsubscribed from when the testSubscriber is disposed.

Now the wording on the documentation is different - concat:

Returns a Flowable that emits the items emitted by each of the Publishers emitted by the source Publisher, one after the other, without interleaving them.

vs concatEager:

The operator buffers the values emitted by these Publishers and then drains them in order, each one after the previous one completes.

But this was not enough to convey the rather large difference in behaviour with the upstream dispose, to us at least, if this is indeed what it was attempting to convey.

So my questions really are
Is this behaviour expected, or should concatEager dispose all of the upstream sources, including the active one, when it is itself disposed?
If this is the expected behaviour, do you think the documentation should/could be improved here?

@akarnokd
Copy link
Member

concatEager is supposed to dispose all sources. I'll investigate it.

@rossdanderson
Copy link
Author

Thanks, I appreciate it

@akarnokd
Copy link
Member

This is a bug in the operator. I posted a fix in #6405.

@rossdanderson
Copy link
Author

👍 That was very quick, thanks again

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants