Skip to content

Cannot subscribe to a Retry observable once all subscribers unsubscribed #879

@davidmoten

Description

@davidmoten

For version 0.16.1 and master of rx-java-core . Once all subscribers have unsubscribed from a retry observable it is not possible to subscribe again to it. Unit test below (that would belong in OperatorRetryTest.java if its valid). The same test once changed slightly to conform to code in master fails also but earlier! Code for both 0.16.1 and master is below:

version 0.16.1:

        @Test
    public void testRetryAllowsSubscriptionAfterAllSubscriptionsUnsubsribed() {
        final AtomicInteger subsCount = new AtomicInteger(0);
        OnSubscribeFunc<String> onSubscribe = new OnSubscribeFunc<String>() {
            @Override
            public Subscription onSubscribe(Observer<? super String> observer) {
                subsCount.incrementAndGet();
                return new Subscription() {

                    @Override
                    public void unsubscribe() {
                        subsCount.decrementAndGet();
                    }
                };
            }
        };
        Observable<String> stream = Observable.create(onSubscribe);
        Observable<String> streamWithRetry = stream.retry();
        Subscription sub = streamWithRetry.subscribe();
        assertEquals(1, subsCount.get());
        sub.unsubscribe();
        assertEquals(0, subsCount.get());
        streamWithRetry.subscribe();
        assertEquals(1, subsCount.get());
    }

version master:

        @Test
    public void testRetryAllowsSubscriptionAfterAllSubscriptionsUnsubsribed()
            throws InterruptedException {
        final AtomicInteger subsCount = new AtomicInteger(0);
        OnSubscribeFunc<String> onSubscribe = new OnSubscribeFunc<String>() {
            @Override
            public Subscription onSubscribe(Observer<? super String> observer) {
                subsCount.incrementAndGet();
                return new Subscription() {
                    boolean unsubscribed = false;

                    @Override
                    public void unsubscribe() {
                        subsCount.decrementAndGet();
                        unsubscribed = true;
                    }

                    @Override
                    public boolean isUnsubscribed() {
                        return unsubscribed;
                    }
                };
            }
        };
        Observable<String> stream = Observable.create(onSubscribe);
        Observable<String> streamWithRetry = stream.retry();
        Subscription sub = streamWithRetry.subscribe();
        assertEquals(1, subsCount.get());
        sub.unsubscribe();
        assertEquals(0, subsCount.get());
        streamWithRetry.subscribe();
        assertEquals(1, subsCount.get());
    }

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions