Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Add Observable.rangeLong & Flowable.rangeLong #4687

Merged
merged 5 commits into from
Oct 11, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -3455,6 +3455,50 @@ public static Flowable<Integer> range(int start, int count) {
return RxJavaPlugins.onAssembly(new FlowableRange(start, count));
}

/**
* Returns a Flowable that emits a sequence of Longs within a specified range.
* <p>
* <img width="640" height="195" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/range.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream and signals values on-demand (i.e., when requested).</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code rangeLong} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param start
* the value of the first Long in the sequence
* @param count
* the number of sequential Longs to generate
* @return a Flowable that emits a range of sequential Longs
* @throws IllegalArgumentException
* if {@code count} is less than zero, or if {@code start} + {@code count} &minus; 1 exceeds
* {@code Long.MAX_VALUE}
* @see <a href="http://reactivex.io/documentation/operators/range.html">ReactiveX operators documentation: Range</a>
*/
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public static Flowable<Long> rangeLong(long start, long count) {
if (count < 0) {
throw new IllegalArgumentException("count >= 0 required but it was " + count);
}

if (count == 0) {
return empty();
}

if (count == 1) {
return just(start);
}

long end = start + (count - 1);
if (start > 0 && end < 0) {
throw new IllegalArgumentException("Overflow! start + count is bigger than Long.MAX_VALUE");
}

return RxJavaPlugins.onAssembly(new FlowableRangeLong(start, count));
}

/**
* Returns a Flowable that emits a Boolean value that indicates whether two Publisher sequences are the
* same by comparing the items emitted by each Publisher pairwise.
Expand Down
41 changes: 41 additions & 0 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -2992,6 +2992,47 @@ public static Observable<Integer> range(final int start, final int count) {
return RxJavaPlugins.onAssembly(new ObservableRange(start, count));
}

/**
* Returns an Observable that emits a sequence of Longs within a specified range.
* <p>
* <img width="640" height="195" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/range.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code rangeLong} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param start
* the value of the first Long in the sequence
* @param count
* the number of sequential Longs to generate
* @return an Observable that emits a range of sequential Longs
* @throws IllegalArgumentException
* if {@code count} is less than zero, or if {@code start} + {@code count} &minus; 1 exceeds
* {@code Long.MAX_VALUE}
* @see <a href="http://reactivex.io/documentation/operators/range.html">ReactiveX operators documentation: Range</a>
*/
@SchedulerSupport(SchedulerSupport.NONE)
public static Observable<Long> rangeLong(long start, long count) {
if (count < 0) {
throw new IllegalArgumentException("count >= 0 required but it was " + count);
}

if (count == 0) {
return empty();
}

if (count == 1) {
return just(start);
}

long end = start + (count - 1);
if (start > 0 && end < 0) {
throw new IllegalArgumentException("Overflow! start + count is bigger than Long.MAX_VALUE");
}

return RxJavaPlugins.onAssembly(new ObservableRangeLong(start, count));
}

/**
* Returns an Observable that emits a Boolean value that indicates whether two ObservableSource sequences are the
* same by comparing the items emitted by each ObservableSource pairwise.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.flowable;

import io.reactivex.Flowable;
import io.reactivex.internal.fuseable.ConditionalSubscriber;
import io.reactivex.internal.subscriptions.BasicQueueSubscription;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.internal.util.BackpressureHelper;
import org.reactivestreams.Subscriber;

/**
* Emits a range of long values.
*/
public final class FlowableRangeLong extends Flowable<Long> {
final long start;
final long end;

public FlowableRangeLong(long start, long count) {
this.start = start;
this.end = start + count;
}

@Override
public void subscribeActual(Subscriber<? super Long> s) {
if (s instanceof ConditionalSubscriber) {
s.onSubscribe(new RangeConditionalSubscription(
(ConditionalSubscriber<? super Long>)s, start, end));
} else {
s.onSubscribe(new RangeSubscription(s, start, end));
}
}

abstract static class BaseRangeSubscription extends BasicQueueSubscription<Long> {

private static final long serialVersionUID = -2252972430506210021L;

final long end;

long index;

volatile boolean cancelled;

BaseRangeSubscription(long index, long end) {
this.index = index;
this.end = end;
}

@Override
public final int requestFusion(int mode) {
return mode & SYNC;
}

@Override
public final Long poll() {
long i = index;
if (i == end) {
return null;
}
index = i + 1;
return i;
}

@Override
public final boolean isEmpty() {
return index == end;
}

@Override
public final void clear() {
index = end;
}

@Override
public final void request(long n) {
if (SubscriptionHelper.validate(n)) {
if (BackpressureHelper.add(this, n) == 0L) {
if (n == Long.MAX_VALUE) {
fastPath();
} else {
slowPath(n);
}
}
}
}

@Override
public final void cancel() {
cancelled = true;
}


abstract void fastPath();

abstract void slowPath(long r);
}

static final class RangeSubscription extends BaseRangeSubscription {

private static final long serialVersionUID = 2587302975077663557L;

final Subscriber<? super Long> actual;

RangeSubscription(Subscriber<? super Long> actual, long index, long end) {
super(index, end);
this.actual = actual;
}

@Override
void fastPath() {
long f = end;
Subscriber<? super Long> a = actual;

for (long i = index; i != f; i++) {
if (cancelled) {
return;
}
a.onNext(i);
}
if (cancelled) {
return;
}
a.onComplete();
}

@Override
void slowPath(long r) {
long e = 0;
long f = end;
long i = index;
Subscriber<? super Long> a = actual;

for (;;) {

while (e != r && i != f) {
if (cancelled) {
return;
}

a.onNext(i);

e++;
i++;
}

if (i == f) {
if (!cancelled) {
a.onComplete();
}
return;
}

r = get();
if (e == r) {
index = i;
r = addAndGet(-e);
if (r == 0L) {
return;
}
e = 0L;
}
}
}
}

static final class RangeConditionalSubscription extends BaseRangeSubscription {


private static final long serialVersionUID = 2587302975077663557L;

final ConditionalSubscriber<? super Long> actual;

RangeConditionalSubscription(ConditionalSubscriber<? super Long> actual, long index, long end) {
super(index, end);
this.actual = actual;
}

@Override
void fastPath() {
long f = end;
ConditionalSubscriber<? super Long> a = actual;

for (long i = index; i != f; i++) {
if (cancelled) {
return;
}
a.tryOnNext(i);
}
if (cancelled) {
return;
}
a.onComplete();
}

@Override
void slowPath(long r) {
long e = 0;
long f = end;
long i = index;
ConditionalSubscriber<? super Long> a = actual;

for (;;) {

while (e != r && i != f) {
if (cancelled) {
return;
}

if (a.tryOnNext(i)) {
e++;
}

i++;
}

if (i == f) {
if (!cancelled) {
a.onComplete();
}
return;
}

r = get();
if (e == r) {
index = i;
r = addAndGet(-e);
if (r == 0) {
return;
}
e = 0;
}
}
}
}
}
Loading