diff --git a/src/main/java/rx/internal/operators/OnSubscribeAutoConnect.java b/src/main/java/rx/internal/operators/OnSubscribeAutoConnect.java index c664717332..75ea9c82cf 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeAutoConnect.java +++ b/src/main/java/rx/internal/operators/OnSubscribeAutoConnect.java @@ -18,9 +18,11 @@ import java.util.concurrent.atomic.AtomicInteger; import rx.Observable.OnSubscribe; -import rx.*; +import rx.Subscriber; +import rx.Subscription; import rx.functions.Action1; import rx.observables.ConnectableObservable; +import rx.observers.Subscribers; /** * Wraps a ConnectableObservable and calls its connect() method once @@ -47,7 +49,7 @@ public OnSubscribeAutoConnect(ConnectableObservable source, } @Override public void call(Subscriber child) { - source.unsafeSubscribe(child); + source.unsafeSubscribe(Subscribers.wrap(child)); if (clients.incrementAndGet() == numberOfSubscribers) { source.connect(connection); }