Skip to content

Commit 8907ec6

Browse files
committed
Improve BlockingEnumeration
1 parent e17945b commit 8907ec6

File tree

1 file changed

+75
-77
lines changed

1 file changed

+75
-77
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/BlockingEnumeration.java

Lines changed: 75 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.Enumeration;
2222
import java.util.NoSuchElementException;
2323
import java.util.concurrent.BlockingQueue;
24+
import java.util.concurrent.CompletableFuture;
2425
import java.util.concurrent.LinkedBlockingQueue;
2526

2627
import org.reactivestreams.Subscriber;
@@ -32,103 +33,70 @@
3233
/**
3334
* Implements the {@link Enumeration} interface by subscribing to a
3435
* {@link SdkPublisher} instance. The enumeration will buffer a fixed
35-
* number of events received from the publisher and block until the first is
36-
* consumed. Calls to {@link #hasMoreElements()} and {@link #nextElement()}
37-
* can also block while waiting for new events.
36+
* number of elements and only request new ones from the publisher
37+
* when they are consumed. Calls to {@link #hasMoreElements()} and
38+
* {@link #nextElement()} may block while waiting for new elements.
3839
* @param <T> the type of element.
3940
*/
4041
public final class BlockingEnumeration<T> implements Enumeration<T> {
41-
private static final class Token<T> {
42-
public T element;
43-
public Throwable error;
42+
private static final class Signal<T> {
43+
public final T element;
44+
public final Throwable error;
4445

45-
public Token() {
46-
}
47-
48-
public Token(T element) {
46+
public Signal(T element) {
4947
this.element = element;
48+
this.error = null;
5049
}
5150

52-
public Token(Throwable error) {
51+
public Signal(Throwable error) {
52+
this.element = null;
5353
this.error = error;
5454
}
5555
}
5656

57-
private final Token<T> END_TOKEN = new Token<>();
58-
private final BlockingQueue<Token<T>> blockingQueue;
59-
private Token<T> current = null;
57+
private final Signal<T> END_SIGNAL = new Signal<>((Throwable)null);
58+
private final CompletableFuture<Subscription> subscription = new CompletableFuture<>();
59+
private final BlockingQueue<Signal<T>> signalQueue;
60+
private final int bufferSize;
61+
private Signal<T> current = null;
6062

6163
/**
62-
* Create an enumeration with a fixed buffer size.
64+
* Create an enumeration with a fixed buffer size and an
65+
* optional injected first element.
6366
* @param publisher the publisher feeding the enumeration.
64-
* @param limit the buffer size.
67+
* @param bufferSize the buffer size.
68+
* @param firstElement (optional) first element the enumeration will return.
6569
*/
66-
public BlockingEnumeration(SdkPublisher<T> publisher, final int limit) {
67-
this.blockingQueue = new LinkedBlockingQueue<>(limit);
68-
publisher.subscribe(new Subscriber<T>() {
69-
private Subscription subscription;
70-
71-
@Override
72-
public void onSubscribe(Subscription s) {
73-
this.subscription = s;
74-
this.subscription.request(limit);
75-
}
76-
77-
@Override
78-
public void onNext(T t) {
79-
enqueue(new Token<>(t));
80-
subscription.request(1);
81-
}
82-
83-
@Override
84-
public void onError(Throwable t) {
85-
enqueue(new Token<>(t));
86-
}
87-
88-
@Override
89-
public void onComplete() {
90-
enqueue(END_TOKEN);
91-
}
92-
93-
private void enqueue(Token<T> token) {
94-
try {
95-
blockingQueue.put(token);
96-
} catch (InterruptedException e) {
97-
// TODO: log?
98-
}
99-
}
100-
});
70+
public BlockingEnumeration(SdkPublisher<T> publisher,
71+
final int bufferSize,
72+
final T firstElement) {
73+
this.signalQueue = new LinkedBlockingQueue<>();
74+
this.bufferSize = bufferSize;
75+
if (firstElement != null) {
76+
this.current = new Signal<>(firstElement);
77+
}
78+
publisher.subscribe(new EnumerationSubscriber());
10179
}
10280

10381
/**
104-
* Create an enumeration with a fixed buffer size and an
105-
* injected first element.
82+
* Create an enumeration with a fixed buffer size.
10683
* @param publisher the publisher feeding the enumeration.
107-
* @param limit the buffer size.
108-
* @param firstElement first element the enumeration will return.
84+
* @param bufferSize the buffer size.
10985
*/
11086
public BlockingEnumeration(SdkPublisher<T> publisher,
111-
final int limit,
112-
T firstElement) {
113-
this(publisher, limit);
114-
this.current = new Token<>(firstElement);
87+
final int bufferSize) {
88+
this(publisher, bufferSize, null);
11589
}
11690

11791
@Override
11892
public boolean hasMoreElements() {
11993
if (current == null) {
120-
current = dequeue();
121-
}
122-
return current != END_TOKEN;
123-
}
124-
125-
@Override
126-
public T nextElement() {
127-
if (current == null) {
128-
current = dequeue();
129-
}
130-
if (current == END_TOKEN) {
131-
throw new NoSuchElementException();
94+
try {
95+
current = signalQueue.take();
96+
} catch (InterruptedException e) {
97+
current = new Signal<>(e);
98+
subscription.thenAccept(Subscription::cancel);
99+
}
132100
}
133101
if (current.error != null) {
134102
if (current.error instanceof SdkException) {
@@ -137,17 +105,47 @@ public T nextElement() {
137105
throw SdkException.create("Unexpected error", current.error);
138106
}
139107
}
108+
return current != END_SIGNAL;
109+
}
110+
111+
@Override
112+
public T nextElement() {
113+
if (!hasMoreElements()) {
114+
throw new NoSuchElementException();
115+
}
140116
T element = current.element;
141117
current = null;
118+
subscription.thenAccept(s -> s.request(1));
142119
return element;
143120
}
144121

145-
private Token<T> dequeue() {
146-
try {
147-
return blockingQueue.take();
148-
} catch (InterruptedException e) {
149-
// TODO: log?
150-
return END_TOKEN;
122+
private final class EnumerationSubscriber implements Subscriber<T> {
123+
124+
@Override
125+
public void onSubscribe(Subscription s) {
126+
long request = bufferSize;
127+
if (current != null) {
128+
request--;
129+
}
130+
if (request > 0) {
131+
s.request(request);
132+
}
133+
subscription.complete(s);
134+
}
135+
136+
@Override
137+
public void onNext(T t) {
138+
signalQueue.add(new Signal<>(t));
139+
}
140+
141+
@Override
142+
public void onError(Throwable t) {
143+
signalQueue.add(new Signal<>(t));
144+
}
145+
146+
@Override
147+
public void onComplete() {
148+
signalQueue.add(END_SIGNAL);
151149
}
152150
}
153151
}

0 commit comments

Comments
 (0)