Skip to content

Commit

Permalink
add Publisher
Browse files Browse the repository at this point in the history
  • Loading branch information
davidmoten committed Mar 5, 2024
1 parent 580327a commit b1357eb
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 8 deletions.
70 changes: 70 additions & 0 deletions kool/src/main/java/org/davidmoten/kool/Publisher.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package org.davidmoten.kool;

import java.util.ArrayDeque;
import java.util.NoSuchElementException;
import java.util.Queue;

public final class Publisher<T> implements Stream<T> {

private final StreamIterable<T> stream;
private final Queue<T> queue;

public Publisher(StreamIterable<T> stream) {
this.stream = stream;
this.queue = new ArrayDeque<>();
}

@Override
public StreamIterator<T> iterator() {
return new StreamIterator<T>() {

StreamIterator<T> it = stream.iteratorNullChecked();
boolean itHasNext = true;
boolean disposed = false;

@Override
public boolean hasNext() {
load();
if (itHasNext) {
return true;
} else {
return !queue.isEmpty();
}
}

@Override
public T next() {
load();
if (itHasNext) {
return it.next();
} else if (queue.isEmpty()) {
throw new NoSuchElementException();
} else {
return queue.poll();
}
}

@Override
public void dispose() {
if (!disposed) {
it.dispose();
}
disposed = true;
}

private void load() {
if (itHasNext) {
itHasNext = it.hasNext();
if (!itHasNext) {
it.dispose();
}
}
}
};
}

public void onNext(T value) {
queue.add(value);
}

}
4 changes: 4 additions & 0 deletions kool/src/main/java/org/davidmoten/kool/Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -1095,6 +1095,10 @@ default <R extends Number> Single<Statistics> statistics(Function<? super T, ? e
default Stream<T> repeatLast() {
return repeatLast(Long.MAX_VALUE);
}

default Publisher<T> publish() {
return new Publisher<T>(this);
}

static Stream<Set<Integer>> powerSet(int n) {
return new PowerSet(n);
Expand Down
35 changes: 27 additions & 8 deletions kool/src/test/java/org/davidmoten/kool/StreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ private static void checkTrue(Consumer<AtomicBoolean> consumer) {
public void testPrepend() {
Stream.of(1, 2, 3).prepend(0).test().assertValuesOnly(0, 1, 2, 3);
}

@Test
public void testPrependIterable() {
Stream.of(1, 2, 3).prepend(Arrays.asList(5, 6, 7)).test().assertValuesOnly(5, 6, 7, 1, 2, 3);
Expand Down Expand Up @@ -433,7 +433,7 @@ public Integer next() {
public void testConcatWith() {
Stream.of(1, 2).concatWith(Stream.of(3, 4)).test().assertValuesOnly(1, 2, 3, 4);
}

@Test
public void testConcatWithIterable() {
Stream.of(1, 2).concatWith(Arrays.asList(3, 4)).test().assertValuesOnly(1, 2, 3, 4);
Expand All @@ -458,7 +458,7 @@ public void testDoOnComplete() {
.forEach();
assertEquals(Lists.newArrayList(1, 2, 3), list);
}

@Test
public void testDoOnCompleteCount() {
List<Integer> list = new ArrayList<>();
Expand Down Expand Up @@ -592,27 +592,27 @@ public void testSkip() {
public void testSkipMoreThanAvailable() {
Stream.of(1, 2, 3, 4).skip(5).test().assertNoValuesOnly();
}

@Test
public void testSkipLast() {
Stream.of(1, 2, 3, 4).skipLast(2).test().assertValuesOnly(1, 2);
}

@Test
public void testSkipLastZero() {
Stream.of(1, 2, 3, 4).skipLast(0).test().assertValuesOnly(1, 2, 3, 4);
}

@Test
public void testSkipLastMoreThanAvailable() {
Stream.of(1, 2, 3, 4).skipLast(5).test().assertNoValuesOnly();
}

@Test(expected = NoSuchElementException.class)
public void testSkipLastNoSuchElement() {
Stream.empty().skipLast(5).iterator().next();
}

@Test(expected = IllegalArgumentException.class)
public void testSkipLastNegative() {
Stream.of(1, 2, 3, 4).skipLast(-1);
Expand Down Expand Up @@ -2117,6 +2117,25 @@ public void testFlatMapGeneratorEmptyWithOnFinish() {
assertEquals(Arrays.asList("a", "b"), list);
}

@Test
public void testPublisher() {
Publisher<Integer> p = Stream.of(1, 2, 3).publish();
p.onNext(4);
p.onNext(5);
StreamIterator<Integer> it = p.iterator();
assertEquals(1, it.next().intValue());
assertEquals(2, it.next().intValue());
assertEquals(3, it.next().intValue());
assertEquals(4, it.next().intValue());
assertEquals(5, it.next().intValue());
p.onNext(6);
assertEquals(6, it.next().intValue());
assertFalse(it.hasNext());

// queue empty
assertEquals(3, p.count().get().intValue());
}

public static void main(String[] args) throws MalformedURLException {
URL url = new URL("https://doesnotexist.zz");
Stream.using(() -> url.openStream(), in -> Stream.bytes(in))
Expand Down

0 comments on commit b1357eb

Please sign in to comment.