Skip to content

Commit a65d3fc

Browse files
committed
Improved coverage and cancellation management
1 parent e5621f3 commit a65d3fc

30 files changed

+1031
-84
lines changed

gradle/wrapper/gradle-wrapper.jar

0 Bytes
Binary file not shown.
Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
#Fri Oct 13 21:33:00 CEST 2017
2-
distributionUrl=https\://services.gradle.org/distributions/gradle-4.2.1-bin.zip
31
distributionBase=GRADLE_USER_HOME
42
distributionPath=wrapper/dists
5-
zipStorePath=wrapper/dists
63
zipStoreBase=GRADLE_USER_HOME
4+
zipStorePath=wrapper/dists
5+
distributionUrl=https\://services.gradle.org/distributions/gradle-4.3.1-bin.zip

src/main/java/hu/akarnokd/asyncenum/AsyncEnumerable.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,7 @@ public interface AsyncEnumerable<T> {
4141
/** Constant and already completed CompletionStage signalling {@code false}. */
4242
CompletionStage<Boolean> FALSE = CompletableFuture.completedStage(false);
4343

44-
CompletionStage<Boolean> CANCELLED = CompletableFuture.failedStage(new CancellationException() {
45-
@Override
46-
public synchronized Throwable fillInStackTrace() {
47-
return this;
48-
}
49-
});
44+
CompletionStage<Boolean> CANCELLED = CompletableFuture.failedStage(new CancelledEnumeratorException());
5045

5146
// -------------------------------------------------------------------------------------
5247
// Static factories

src/main/java/hu/akarnokd/asyncenum/AsyncEnumeratorHelper.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,4 +57,8 @@ static <T> boolean replace(AtomicReference<AsyncEnumerator<T>> target, AsyncEnum
5757
}
5858
}
5959
}
60+
61+
static boolean isCancelled(AsyncEnumerator<?> enumerator) {
62+
return enumerator == CANCELLED;
63+
}
6064
}

src/main/java/hu/akarnokd/asyncenum/AsyncFlatMap.java

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ static final class FlatMapEnumerator<T, R> implements AsyncEnumerator<R>, BiCons
5757

5858
final AtomicInteger upstreamWip;
5959

60+
final AtomicReference<Throwable> error;
61+
6062
R current;
6163

6264
volatile boolean cancelled;
@@ -70,6 +72,7 @@ static final class FlatMapEnumerator<T, R> implements AsyncEnumerator<R>, BiCons
7072
this.active = new AtomicInteger(1);
7173
this.inners = new ConcurrentHashMap<>();
7274
this.upstreamWip = new AtomicInteger();
75+
this.error = new AtomicReference<>();
7376
}
7477

7578
@Override
@@ -93,6 +96,10 @@ void drain() {
9396
do {
9497
CompletableFuture<Boolean> nx = next.get();
9598
if (nx != null) {
99+
if (error.get() != null) {
100+
nx.completeExceptionally(error.get());
101+
return;
102+
}
96103
int n = active.get();
97104
InnerAsyncEnumerator<R> inner = queue.peek();
98105

@@ -123,20 +130,35 @@ void finish(InnerAsyncEnumerator<R> inner) {
123130
drain();
124131
}
125132

133+
void error(InnerAsyncEnumerator<R> inner, Throwable ex) {
134+
error.compareAndSet(null, ex);
135+
cancel();
136+
active.decrementAndGet();
137+
drain();
138+
}
139+
126140
void moveNextUpstream() {
127-
if (upstreamWip.getAndIncrement() != 0) {
128-
return;
141+
if (upstreamWip.getAndIncrement() == 0) {
142+
do {
143+
upstream.moveNext().whenComplete(this);
144+
} while (upstreamWip.decrementAndGet() != 0);
129145
}
146+
}
130147

131-
do {
132-
upstream.moveNext().whenComplete(this);
133-
} while (upstreamWip.decrementAndGet() != 0);
148+
void cancelAllInner() {
149+
for (InnerAsyncEnumerator<R> inner : inners.keySet()) {
150+
inner.cancel();
151+
}
134152
}
135153

136154
@Override
137155
public void accept(Boolean aBoolean, Throwable throwable) {
138156
if (throwable != null) {
139-
// TODO manage errors
157+
error.compareAndSet(null, throwable);
158+
cancelAllInner();
159+
active.decrementAndGet();
160+
drain();
161+
return;
140162
}
141163
if (aBoolean) {
142164
T t = upstream.current();
@@ -160,9 +182,8 @@ public void accept(Boolean aBoolean, Throwable throwable) {
160182
@Override
161183
public void cancel() {
162184
cancelled = true;
163-
for (InnerAsyncEnumerator<R> inner : inners.keySet()) {
164-
inner.cancel();
165-
}
185+
upstream.cancel();
186+
cancelAllInner();
166187
}
167188

168189
static final class InnerAsyncEnumerator<R> extends AtomicInteger implements BiConsumer<Boolean, Throwable> {
@@ -181,19 +202,17 @@ R current() {
181202
}
182203

183204
void moveNext() {
184-
if (getAndIncrement() != 0) {
185-
return;
205+
if (getAndIncrement() == 0) {
206+
do {
207+
source.moveNext().whenComplete(this);
208+
} while (decrementAndGet() != 0);
186209
}
187-
188-
do {
189-
source.moveNext().whenComplete(this);
190-
} while (decrementAndGet() != 0);
191210
}
192211

193212
@Override
194213
public void accept(Boolean hasMore, Throwable throwable) {
195214
if (throwable != null) {
196-
// TODO error management
215+
parent.error(this, throwable);
197216
return;
198217
}
199218
if (hasMore) {

src/main/java/hu/akarnokd/asyncenum/AsyncFromCompletionStage.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616

1717
package hu.akarnokd.asyncenum;
1818

19-
import java.util.concurrent.CompletionStage;
20-
import java.util.function.Function;
19+
import java.util.concurrent.*;
20+
import java.util.function.*;
2121

2222
final class AsyncFromCompletionStage<T> implements AsyncEnumerable<T> {
2323

@@ -33,10 +33,12 @@ public AsyncEnumerator<T> enumerator() {
3333
}
3434

3535
static final class FromCompletionStageAsyncEnumerable<T>
36-
implements AsyncEnumerator<T>, Function<T, Boolean> {
36+
implements AsyncEnumerator<T>, BiConsumer<T, Throwable> {
3737

3838
final CompletionStage<T> stage;
3939

40+
CompletableFuture<Boolean> completable;
41+
4042
boolean once;
4143

4244
T result;
@@ -52,7 +54,10 @@ public CompletionStage<Boolean> moveNext() {
5254
return FALSE;
5355
}
5456
once = true;
55-
return stage.thenApply(this);
57+
CompletableFuture<Boolean> cf = new CompletableFuture<>();
58+
completable = cf;
59+
stage.whenComplete(this);
60+
return cf;
5661
}
5762

5863
@Override
@@ -61,9 +66,13 @@ public T current() {
6166
}
6267

6368
@Override
64-
public Boolean apply(T t) {
65-
result = t;
66-
return true;
69+
public void accept(T t, Throwable throwable) {
70+
if (throwable != null) {
71+
completable.completeExceptionally(throwable);
72+
} else {
73+
result = t;
74+
completable.complete(true);
75+
}
6776
}
6877
}
6978
}

src/main/java/hu/akarnokd/asyncenum/AsyncFromFlowPublisher.java

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -71,26 +71,25 @@ public T current() {
7171
return current;
7272
}
7373

74+
@Override
75+
public void cancel() {
76+
Flow.Subscription current = upstream.getAndSet(CancelledSubscription.CANCELLED);
77+
if (current != null && current != CancelledSubscription.CANCELLED) {
78+
current.cancel();
79+
}
80+
}
81+
7482
void deferredRequestOne() {
7583
Flow.Subscription current = upstream.get();
7684
if (current != null) {
7785
current.request(1);
7886
} else {
79-
for (;;) {
80-
long r = requested.get();
81-
long u = r + 1;
82-
if (u < 0L) {
83-
u = Long.MAX_VALUE;
84-
}
85-
if (requested.compareAndSet(r, u)) {
86-
current = upstream.get();
87-
if (current != null) {
88-
u = requested.getAndSet(0L);
89-
if (u != 0L) {
90-
current.request(u);
91-
}
92-
}
93-
break;
87+
requested.getAndIncrement();
88+
current = upstream.get();
89+
if (current != null) {
90+
long r = requested.getAndSet(0L);
91+
if (r != 0L) {
92+
current.request(r);
9493
}
9594
}
9695
}
@@ -157,4 +156,18 @@ void drain() {
157156
}
158157
}
159158
}
159+
160+
enum CancelledSubscription implements Flow.Subscription {
161+
CANCELLED;
162+
163+
@Override
164+
public void request(long n) {
165+
// Deliberately NO-OP
166+
}
167+
168+
@Override
169+
public void cancel() {
170+
// Deliberately NO-OP
171+
}
172+
}
160173
}

src/main/java/hu/akarnokd/asyncenum/AsyncMax.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ static final class SumLongEnumerator<T> extends AtomicInteger
5050
T result;
5151
boolean done;
5252

53-
CompletableFuture<Boolean> cf;
53+
CompletableFuture<Boolean> completable;
5454

5555
SumLongEnumerator(AsyncEnumerator<T> source, Comparator<? super T> comparator) {
5656
this.source = source;
@@ -63,9 +63,9 @@ public CompletionStage<Boolean> moveNext() {
6363
result = null;
6464
return FALSE;
6565
}
66-
cf = new CompletableFuture<>();
66+
completable = new CompletableFuture<>();
6767
collectSource();
68-
return cf;
68+
return completable;
6969
}
7070

7171
@Override
@@ -85,7 +85,7 @@ void collectSource() {
8585
public void accept(Boolean aBoolean, Throwable throwable) {
8686
if (throwable != null) {
8787
done = true;
88-
cf.completeExceptionally(throwable);
88+
completable.completeExceptionally(throwable);
8989
return;
9090
}
9191

@@ -104,9 +104,9 @@ public void accept(Boolean aBoolean, Throwable throwable) {
104104
done = true;
105105
if (hasValue) {
106106
result = max;
107-
cf.complete(true);
107+
completable.complete(true);
108108
} else {
109-
cf.complete(false);
109+
completable.complete(false);
110110
}
111111
}
112112
}

src/main/java/hu/akarnokd/asyncenum/AsyncOnErrorResume.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package hu.akarnokd.asyncenum;
1818

1919
import java.util.concurrent.*;
20+
import java.util.concurrent.atomic.AtomicReference;
2021
import java.util.function.*;
2122

2223
final class AsyncOnErrorResume<T> implements AsyncEnumerable<T> {
@@ -39,7 +40,7 @@ static final class OnErrorResumeEnumerator<T> implements AsyncEnumerator<T>, BiC
3940

4041
final Function<? super Throwable, ? extends AsyncEnumerable<? extends T>> resumeMapper;
4142

42-
AsyncEnumerator<T> source;
43+
final AtomicReference<AsyncEnumerator<T>> source;
4344

4445
T result;
4546

@@ -48,15 +49,15 @@ static final class OnErrorResumeEnumerator<T> implements AsyncEnumerator<T>, BiC
4849
boolean inFallback;
4950

5051
OnErrorResumeEnumerator(AsyncEnumerator<T> source, Function<? super Throwable, ? extends AsyncEnumerable<? extends T>> resumeMapper) {
51-
this.source = source;
52+
this.source = new AtomicReference<>(source);
5253
this.resumeMapper = resumeMapper;
5354
}
5455

5556
@Override
5657
public CompletionStage<Boolean> moveNext() {
5758
CompletableFuture<Boolean> cf = new CompletableFuture<>();
5859
completable = cf;
59-
source.moveNext().whenComplete(this);
60+
source.getPlain().moveNext().whenComplete(this);
6061
return cf;
6162
}
6263

@@ -79,18 +80,24 @@ public void accept(Boolean aBoolean, Throwable throwable) {
7980
if (throwable != null) {
8081
inFallback = true;
8182
result = null;
82-
source = (AsyncEnumerator<T>)resumeMapper.apply(throwable).enumerator();
83-
source.moveNext().whenComplete(this);
83+
if (AsyncEnumeratorHelper.replace(source, (AsyncEnumerator<T>)resumeMapper.apply(throwable).enumerator())) {
84+
source.getPlain().moveNext().whenComplete(this);
85+
}
8486
return;
8587
}
8688
}
8789
if (aBoolean) {
88-
result = source.current();
90+
result = source.getPlain().current();
8991
cf.complete(true);
9092
} else {
9193
result = null;
9294
cf.complete(false);
9395
}
9496
}
97+
98+
@Override
99+
public void cancel() {
100+
AsyncEnumeratorHelper.cancel(source);
101+
}
95102
}
96103
}

0 commit comments

Comments
 (0)