17
17
package hu .akarnokd .asyncenum ;
18
18
19
19
import java .util .concurrent .*;
20
- import java .util .concurrent .atomic .AtomicInteger ;
20
+ import java .util .concurrent .atomic .* ;
21
21
import java .util .function .BiConsumer ;
22
22
23
23
final class AsyncConcatArray <T > implements AsyncEnumerable <T > {
@@ -38,33 +38,36 @@ static final class ConcatArrayEnumerator<T> extends AtomicInteger
38
38
39
39
final AsyncEnumerable <T >[] sources ;
40
40
41
- AsyncEnumerator <T > currentEnumerator ;
41
+ final AtomicReference < AsyncEnumerator <T > > currentEnumerator ;
42
42
43
43
CompletableFuture <Boolean > currentStage ;
44
44
45
45
int index ;
46
46
47
47
ConcatArrayEnumerator (AsyncEnumerable <T >[] sources ) {
48
48
this .sources = sources ;
49
+ this .currentEnumerator = new AtomicReference <>();
49
50
}
50
51
51
52
@ Override
52
53
public CompletionStage <Boolean > moveNext () {
53
- if (currentEnumerator == null ) {
54
+ if (currentEnumerator . get () == null ) {
54
55
if (index == sources .length ) {
55
56
return FALSE ;
56
57
}
57
- currentEnumerator = sources [index ++].enumerator ();
58
+ if (!AsyncEnumeratorHelper .replace (currentEnumerator , sources [index ++].enumerator ())) {
59
+ return CANCELLED ;
60
+ }
58
61
}
59
62
60
63
currentStage = new CompletableFuture <>();
61
- currentEnumerator .moveNext ().whenComplete (this );
64
+ currentEnumerator .getPlain (). moveNext ().whenComplete (this );
62
65
return currentStage ;
63
66
}
64
67
65
68
@ Override
66
69
public T current () {
67
- return currentEnumerator .current ();
70
+ return currentEnumerator .getPlain (). current ();
68
71
}
69
72
70
73
@ Override
@@ -82,11 +85,20 @@ public void accept(Boolean aBoolean, Throwable throwable) {
82
85
currentStage .complete (false );
83
86
break ;
84
87
}
85
- currentEnumerator = sources [index ++].enumerator ();
86
- currentEnumerator .moveNext ().whenComplete (this );
88
+ AsyncEnumerator <T > en = sources [index ++].enumerator ();
89
+ if (AsyncEnumeratorHelper .replace (currentEnumerator , en )) {
90
+ en .moveNext ().whenComplete (this );
91
+ } else {
92
+ break ;
93
+ }
87
94
} while (decrementAndGet () != 0 );
88
95
}
89
96
}
90
97
}
98
+
99
+ @ Override
100
+ public void cancel () {
101
+ AsyncEnumeratorHelper .cancel (currentEnumerator );
102
+ }
91
103
}
92
104
}
0 commit comments