Skip to content

Commit a43aae0

Browse files
committed
Additional operators
1 parent 452b2c7 commit a43aae0

17 files changed

+1359
-3
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ Prototype Java 9 library based on the asynchronous enumerable concept (where mov
99
### Gradle
1010

1111
```groovy
12-
compile "com.github.akarnokd:async-enumerable:0.3.0"
12+
compile "com.github.akarnokd:async-enumerable:0.4.0"
1313
```
1414

1515
### Getting started

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
version=0.3.0
1+
version=0.4.0
22
org.gradle.jvmargs=-XX:+IgnoreUnrecognizedVMOptions --permit-illegal-access --show-version
33

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Copyright 2017 David Karnok
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package hu.akarnokd.asyncenum;
18+
19+
import java.util.*;
20+
import java.util.concurrent.ExecutionException;
21+
22+
final class AsyncBlockingIterable<T> implements Iterable<T> {
23+
24+
final AsyncEnumerable<T> source;
25+
26+
AsyncBlockingIterable(AsyncEnumerable<T> source) {
27+
this.source = source;
28+
}
29+
30+
@Override
31+
public Iterator<T> iterator() {
32+
return new BlockingIterator<>(source.enumerator());
33+
}
34+
35+
static final class BlockingIterator<T> implements Iterator<T> {
36+
37+
final AsyncEnumerator<T> source;
38+
39+
boolean hasValue;
40+
41+
boolean done;
42+
43+
T value;
44+
45+
BlockingIterator(AsyncEnumerator<T> source) {
46+
this.source = source;
47+
}
48+
49+
@Override
50+
public boolean hasNext() {
51+
if (!hasValue && !done) {
52+
try {
53+
Boolean b = source
54+
.moveNext()
55+
.toCompletableFuture()
56+
.get();
57+
if (b) {
58+
hasValue = true;
59+
value = source.current();
60+
} else {
61+
done = true;
62+
}
63+
} catch (InterruptedException ex) {
64+
throw new RuntimeException(ex);
65+
} catch (ExecutionException ex) {
66+
throw ThrowableHelper.wrapOrThrow(ex.getCause());
67+
}
68+
}
69+
return hasValue;
70+
}
71+
72+
@Override
73+
public T next() {
74+
if (hasValue || hasNext()) {
75+
T v = value;
76+
value = null;
77+
hasValue = false;
78+
return v;
79+
}
80+
throw new NoSuchElementException();
81+
}
82+
}
83+
}
Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
/*
2+
* Copyright 2017 David Karnok
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package hu.akarnokd.asyncenum;
18+
19+
import java.util.*;
20+
import java.util.concurrent.*;
21+
import java.util.concurrent.atomic.*;
22+
import java.util.function.BiConsumer;
23+
24+
final class AsyncCache<T> extends AtomicInteger
25+
implements AsyncEnumerable<T>, BiConsumer<Boolean, Throwable> {
26+
27+
final AsyncEnumerable<T> source;
28+
29+
final AtomicBoolean once;
30+
31+
final AtomicReference<CacheEnumerator<T>[]> enumerators;
32+
33+
@SuppressWarnings("unchecked")
34+
static final CacheEnumerator[] EMPTY = new CacheEnumerator[0];
35+
36+
@SuppressWarnings("unchecked")
37+
static final CacheEnumerator[] TERMINATED = new CacheEnumerator[0];
38+
39+
final List<T> list;
40+
41+
AsyncEnumerator<T> sourceEnumerator;
42+
43+
volatile int size;
44+
45+
volatile boolean done;
46+
Throwable error;
47+
48+
@SuppressWarnings("unchecked")
49+
AsyncCache(AsyncEnumerable<T> source) {
50+
this.source = source;
51+
this.once = new AtomicBoolean();
52+
this.enumerators = new AtomicReference<>(EMPTY);
53+
this.list = new ArrayList<>();
54+
}
55+
56+
@SuppressWarnings("unchecked")
57+
@Override
58+
public void accept(Boolean aBoolean, Throwable throwable) {
59+
if (throwable != null) {
60+
error = throwable;
61+
done = true;
62+
for (CacheEnumerator<T> en : enumerators.getAndSet(TERMINATED)) {
63+
signal(en);
64+
}
65+
return;
66+
}
67+
68+
if (aBoolean) {
69+
list.add(sourceEnumerator.current());
70+
size = size + 1;
71+
for (CacheEnumerator<T> en : enumerators.getAcquire()) {
72+
signal(en);
73+
}
74+
nextSource();
75+
} else {
76+
done = true;
77+
for (CacheEnumerator<T> en : enumerators.getAndSet(TERMINATED)) {
78+
signal(en);
79+
}
80+
}
81+
}
82+
83+
@Override
84+
public AsyncEnumerator<T> enumerator() {
85+
CacheEnumerator<T> en = new CacheEnumerator<>(this);
86+
if (add(en)) {
87+
if (!once.get() && once.compareAndSet(false, true)) {
88+
sourceEnumerator = source.enumerator();
89+
nextSource();
90+
}
91+
}
92+
signal(en);
93+
return en;
94+
}
95+
96+
void nextSource() {
97+
if (getAndIncrement() == 0) {
98+
do {
99+
sourceEnumerator.moveNext().whenComplete(this);
100+
} while (decrementAndGet() != 0);
101+
}
102+
}
103+
104+
boolean add(CacheEnumerator<T> inner) {
105+
for (;;) {
106+
CacheEnumerator<T>[] a = enumerators.get();
107+
if (a == TERMINATED) {
108+
return false;
109+
}
110+
int n = a.length;
111+
@SuppressWarnings("unchecked")
112+
CacheEnumerator<T>[] b = new CacheEnumerator[n + 1];
113+
System.arraycopy(a, 0, b, 0, n);
114+
b[n] = inner;
115+
if (enumerators.compareAndSet(a, b)) {
116+
return true;
117+
}
118+
}
119+
}
120+
121+
@SuppressWarnings("unchecked")
122+
void remove(CacheEnumerator<T> inner) {
123+
for (;;) {
124+
CacheEnumerator<T>[] a = enumerators.get();
125+
int n = a.length;
126+
if (n == 0) {
127+
break;
128+
}
129+
int j = -1;
130+
for (int i = 0; i < n; i++) {
131+
if (a[i] == inner) {
132+
j = i;
133+
break;
134+
}
135+
}
136+
137+
if (j < 0) {
138+
break;
139+
}
140+
CacheEnumerator<T>[] b;
141+
if (n == 1) {
142+
b = EMPTY;
143+
} else {
144+
b = new CacheEnumerator[n - 1];
145+
System.arraycopy(a, 0, b, 0, j);
146+
System.arraycopy(a, j + 1, b, j, n - j - 1);
147+
}
148+
if (enumerators.compareAndSet(a, b)) {
149+
break;
150+
}
151+
}
152+
}
153+
154+
void signal(CacheEnumerator<T> target) {
155+
if (target.getAndIncrement() == 0) {
156+
do {
157+
CompletableFuture<Boolean> cf = target.completable;
158+
if (cf != null) {
159+
int index = target.index;
160+
161+
boolean d = done;
162+
int s = size;
163+
boolean empty = s == index;
164+
165+
if (d && empty) {
166+
target.completable = null;
167+
Throwable ex = error;
168+
if (ex != null) {
169+
cf.completeExceptionally(ex);
170+
} else {
171+
cf.complete(false);
172+
}
173+
}
174+
175+
if (!empty) {
176+
target.result = list.get(index);
177+
target.index = index + 1;
178+
target.completable = null;
179+
cf.complete(true);
180+
}
181+
}
182+
} while (target.decrementAndGet() != 0);
183+
}
184+
}
185+
186+
static final class CacheEnumerator<T> extends AtomicInteger implements AsyncEnumerator<T> {
187+
188+
final AsyncCache<T> parent;
189+
190+
volatile CompletableFuture<Boolean> completable;
191+
192+
int index;
193+
194+
T result;
195+
196+
CacheEnumerator(AsyncCache<T> parent) {
197+
this.parent = parent;
198+
}
199+
200+
@Override
201+
public CompletionStage<Boolean> moveNext() {
202+
result = null;
203+
CompletableFuture<Boolean> cf = new CompletableFuture<>();
204+
completable = cf;
205+
parent.signal(this);
206+
return cf;
207+
}
208+
209+
@Override
210+
public T current() {
211+
return result;
212+
}
213+
214+
@Override
215+
public void cancel() {
216+
parent.remove(this);
217+
}
218+
}
219+
}

0 commit comments

Comments
 (0)