Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Fix refCount termination-reconnect race #6187

Merged
merged 3 commits into from
Aug 30, 2018

Conversation

akarnokd
Copy link
Member

This PR modifies the refCount operator (in both Flowable and Observable types) to avoid certain termination-reconnection races.

The original race could happen when the refCounted source terminated at the same time as new observers arrived, leaving those new observers hanging as they practically joined a dying connection and got possibly undercut by the dispose call.

The change involve a new internal interface ResettableConnection that will allow resetting the connection object inside the connectable source if it is equal to the connection object known by the initiator of the original connect call.

Fixes #6185

@akarnokd akarnokd added this to the 2.2 backlog milestone Aug 30, 2018
@codecov
Copy link

codecov bot commented Aug 30, 2018

Codecov Report

Merging #6187 into 2.x will decrease coverage by 0.01%.
The diff coverage is 100%.

Impacted file tree graph

@@             Coverage Diff              @@
##                2.x    #6187      +/-   ##
============================================
- Coverage     98.26%   98.24%   -0.02%     
- Complexity     6197     6203       +6     
============================================
  Files           667      667              
  Lines         44880    44888       +8     
  Branches       6214     6218       +4     
============================================
  Hits          44100    44100              
- Misses          245      251       +6     
- Partials        535      537       +2
Impacted Files Coverage Δ Complexity Δ
...ex/internal/operators/flowable/FlowableReplay.java 95.11% <100%> (+0.61%) 20 <1> (-3) ⬇️
...nternal/operators/observable/ObservableReplay.java 99.17% <100%> (+0.54%) 20 <1> (-3) ⬇️
...ernal/operators/observable/ObservableRefCount.java 100% <100%> (ø) 27 <0> (+5) ⬆️
.../internal/operators/flowable/FlowableRefCount.java 100% <100%> (ø) 27 <0> (+5) ⬆️
.../operators/observable/ObservableFlatMapSingle.java 88.8% <0%> (-6.72%) 2% <0%> (ø)
...tivex/internal/observers/FutureSingleObserver.java 94.33% <0%> (-3.78%) 24% <0%> (-1%)
...ternal/operators/observable/ObservablePublish.java 96.46% <0%> (-3.54%) 11% <0%> (ø)
...activex/internal/observers/QueueDrainObserver.java 97.43% <0%> (-2.57%) 21% <0%> (-1%)
...ava/io/reactivex/processors/BehaviorProcessor.java 96.86% <0%> (-2.25%) 60% <0%> (ø)
...ernal/operators/flowable/FlowableFromIterable.java 95.18% <0%> (-2.14%) 5% <0%> (ø)
... and 24 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 2e566fb...05ac5d1. Read the comment docs.

public interface ResettableConnectable {

/**
* Reset the connectable if the current internal connection object is the
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Find it hard to read and follow. Could we link the param in here so it's understandable what's the internal and what's the provided object?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the description.

Copy link
Collaborator

@vanniktech vanniktech left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much better.

@akarnokd akarnokd merged commit c7d91c6 into ReactiveX:2.x Aug 30, 2018
@akarnokd akarnokd deleted the RefCountFix branch August 30, 2018 13:30
Copy link
Collaborator

@davidmoten davidmoten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks

@davidmoten
Copy link
Collaborator

Thanks for the fix @akarnokd and the report @BernhardReu. A release in the next couple of weeks with this change would be very welcome (.share is a critical path for me). /cc @SuperEvenSteven @philipgroom.

@akarnokd
Copy link
Member Author

akarnokd commented Sep 3, 2018

@davidmoten Sure. #6194

@jhansche
Copy link

@akarnokd This seems to fix the issue for .replay().refCount(), but I'm seeing the same test fail intermittently with .publish().refCount() (aka share()):

    @Test
    fun `test refcount race conditions`() {
        for (i in 0 until 10_000) {
            val observable = Observable.just(1).publish().refCount()

            val observer1 = observable
                .subscribeOn(Schedulers.io())
                .test()

            val observer2 = observable
                .subscribeOn(Schedulers.io())
                .test()

            observer1
                .withTag("observer1 $i")
                .awaitDone(5, TimeUnit.SECONDS)
                .assertResult(1)

            observer2
                .withTag("observer2 $i")
                .awaitDone(5, TimeUnit.SECONDS)
                .assertResult(1)
        }
    }

The actual failure is obviously unpredictable due to the race conditions, but it usually fails pretty early. In this case, it failed at i=14:

java.lang.AssertionError: Value count differs; expected: 1 [1] but was: 0 [] (latch = 1, values = 0, errors = 0, completions = 0, timeout!, disposed!, tag = observer2 14)
Expected :1 [1] 
Actual   :0 [] (latch = 1, values = 0, errors = 0, completions = 0, timeout!, disposed!, tag = observer2 14)

@akarnokd
Copy link
Member Author

@jhansche It is possible one of the observers connect right between onNext and onComplete, thus it may not receive the value. However, they should both complete, no timeout. I put in a fresh issue #6501 and will investigate further tomorrow.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Possible concurrency issue with .refCount()? (2.2.0)
4 participants