Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes memory leak of grouped iterator #1907

Merged
merged 8 commits into from
Mar 12, 2017
67 changes: 35 additions & 32 deletions javaslang/src/main/java/javaslang/collection/Iterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import javaslang.Tuple3;
import javaslang.collection.IteratorModule.ConcatIterator;
import javaslang.collection.IteratorModule.DistinctIterator;
import javaslang.collection.IteratorModule.GroupedIterator;
import javaslang.control.Option;

import java.math.BigDecimal;
Expand All @@ -20,6 +21,7 @@
import static java.lang.Double.NEGATIVE_INFINITY;
import static java.lang.Double.POSITIVE_INFINITY;
import static java.math.RoundingMode.HALF_UP;
import static javaslang.API.*;
import static javaslang.collection.IteratorModule.BigDecimalHelper.areEqual;
import static javaslang.collection.IteratorModule.BigDecimalHelper.asDecimal;
import static javaslang.collection.IteratorModule.EmptyIterator;
Expand Down Expand Up @@ -1535,9 +1537,9 @@ default <C> Map<C, Iterator<T>> groupBy(Function<? super T, ? extends C> classif

@Override
default Iterator<Seq<T>> grouped(int size) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scala's grouped() and sliding() functions return a GroupedIterator<Seq<T>>. That GroupedIterator has the ability of padding and skipping partial results (both not implemented in our version, yet).

It would be nice if we could provide the same functionality. Returning a GroupedIterator<Seq<T>> instead of Iterator<Seq<T>> will not be backward compatible. I will target that change for 3.0.0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: I internally based the buffer on our persistent Vector, Scala users a mutable ArrayBuffer. Our version is blazing fast (a little slower than the mutable version). Congrats @paplorinc !!!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice, could you share some benchmarks :)?

return sliding(size, size);
return new GroupedIterator<>(this, size, size);
}

@Override
default boolean hasDefiniteSize() {
return false;
Expand Down Expand Up @@ -1827,37 +1829,9 @@ default Iterator<Seq<T>> sliding(int size) {

@Override
default Iterator<Seq<T>> sliding(int size, int step) {
if (size <= 0 || step <= 0) {
throw new IllegalArgumentException("size: " + size + " or step: " + step + " not positive");
}
if (!hasNext()) {
return empty();
} else {
final Stream<T> source = Stream.ofAll(this);
return new AbstractIterator<Seq<T>>() {
private Stream<T> that = source;
private IndexedSeq<T> next = null;

@Override
public boolean hasNext() {
while (next == null && !that.isEmpty()) {
final Tuple2<Stream<T>, Stream<T>> split = that.splitAt(size);
next = split._1.toVector();
that = split._2.isEmpty() ? Stream.empty() : that.drop(step);
}
return next != null;
}

@Override
public IndexedSeq<T> getNext() {
final IndexedSeq<T> result = next;
next = null;
return result;
}
};
}
return new GroupedIterator<>(this, size, step);
}

@Override
default Tuple2<Iterator<T>, Iterator<T>> span(Predicate<? super T> predicate) {
Objects.requireNonNull(predicate, "predicate is null");
Expand Down Expand Up @@ -2089,6 +2063,35 @@ public String toString() {
}
}

final class GroupedIterator<T> extends AbstractIterator<Seq<T>> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@paplorinc I simplified it...


private final Iterator<T> that;
private final int step;

private Seq<T> buffer;

GroupedIterator(Iterator<T> that, int size, int step) {
if (size < 1 || step < 1) {
throw new IllegalArgumentException("size (" + size + ") and step (" + step + ") must both be positive");
}
this.that = that;
this.step = step;
this.buffer = Vector.ofAll(that.take(size));
}

@Override
public boolean hasNext() {
return !buffer.isEmpty();
}

@Override
protected Seq<T> getNext() {
final Seq<T> result = buffer;
buffer = that.hasNext() ? buffer.appendAll(that.take(step)).drop(step) : buffer.take(0);
return result;
}
}

final class BigDecimalHelper {

@GwtIncompatible("Math::nextDown is not implemented")
Expand Down