Skip to content

Commit

Permalink
Add support for non-trivial Stream searching
Browse files Browse the repository at this point in the history
  • Loading branch information
runeflobakk committed Jan 5, 2024
1 parent d6c3f99 commit 9ec20f0
Show file tree
Hide file tree
Showing 5 changed files with 280 additions and 0 deletions.
33 changes: 33 additions & 0 deletions src/main/java/no/digipost/DiggCollectors.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import no.digipost.concurrent.OneTimeAssignment;
import no.digipost.stream.EmptyResultIfEmptySourceCollector;
import no.digipost.stream.NonEmptyStream;
import no.digipost.stream.SubjectFilter;
import no.digipost.tuple.Tuple;
import no.digipost.tuple.ViewableAsTuple;
import no.digipost.util.ViewableAsOptional;
Expand All @@ -34,6 +35,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -49,6 +51,37 @@
public final class DiggCollectors {


/**
* Create a collector used for finding and accumulating a specific result,
* where applying a filter is not adequate. The returned
* {@link SubjectFilter subject filter} is used for
* further specifying the final (compound) condition for the result to find.
* <p>
* When searching for a result <em>in context</em> of other elements, you must carefully
* ensure the source to have appropriate ordering and parallelity (or probably rather lack thereof)
* for the correct and expected operation of the collector.
* <p>
* Note: because {@link Collector collectors} are applied to <em>all</em> elements
* in a Stream, care should be taken to exclude non-applicable elements e.g. using
* a {@link Stream#filter(Predicate) filter}, and {@link Stream#limit(long) limit}
* especially for infinite Streams, before collecting.
*
*
* @param <T> The element type which is inspected by the subject filter. This type is
* typically the same as the element type of the Stream the final collector
* is applied to.
*
* @param subjectElement the predicate for selecting a subject element for further use
* in accumulating a result from applying the final collector.
*
* @return the subject filter for the collector, which must be further specified
* to build the final collector
*/
public static <T> SubjectFilter<T> find(Predicate<T> subjectElement) {
return new SubjectFilter<>(subjectElement);
}


/**
* A <em>multituple</em> is similar to a <em>multimap</em> in that it consists of one {@link Tuple#first() first} value and a List of
* values as the {@link Tuple#second() second} value,
Expand Down
61 changes: 61 additions & 0 deletions src/main/java/no/digipost/stream/AtomicReferenceFolder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (C) Posten Norge AS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package no.digipost.stream;

import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BiPredicate;
import java.util.function.Predicate;

/**
* A consumer function for "folding" a value with an {@link AtomicReference}.
*
* @param <T> The type of the value this function folds.
*/
@FunctionalInterface
interface AtomicReferenceFolder<T> extends BiConsumer<AtomicReference<T>, T> {

static <T> AtomicReferenceFolder<T> clearReference() {
return (ref, value) -> ref.set(null);
}

static <T> AtomicReferenceFolder<T> keepFirst(Predicate<? super T> predicate) {
return (currentRef, candidateElement) ->
currentRef.accumulateAndGet(candidateElement, (current, candidate) -> current == null && predicate.test(candidate) ? candidate : current);
}

static <T> AtomicReferenceFolder<T> keepLast(Predicate<? super T> predicate) {
return (currentRef, candidateElement) -> {
if (predicate.test(candidateElement)) {
currentRef.set(candidateElement);
}
};
}

default AtomicReferenceFolder<T> doInsteadIf(Predicate<? super T> valuePredicate, AtomicReferenceFolder<T> foldOperation) {
return doInsteadIf((ref, value) -> valuePredicate.test(value), foldOperation);
}

default AtomicReferenceFolder<T> doInsteadIf(BiPredicate<? super AtomicReference<T>, ? super T> refAndValuePredicate, AtomicReferenceFolder<T> foldOperation) {
return (ref, value) -> {
if (refAndValuePredicate.test(ref, value)) {
foldOperation.accept(ref, value);
} else {
this.accept(ref, value);
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright (C) Posten Norge AS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package no.digipost.stream;

import java.util.EnumSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;

import static java.util.Collections.unmodifiableSet;
import static java.util.stream.Collector.Characteristics.CONCURRENT;

final class AtomicReferenceFoldingCollector<T> implements Collector<T, AtomicReference<T>, Optional<T>> {

private final AtomicReferenceFolder<T> accumulator;
private final Set<Characteristics> characteristics;

AtomicReferenceFoldingCollector(AtomicReferenceFolder<T> accumulator) {
this(accumulator, EnumSet.of(CONCURRENT));
}

AtomicReferenceFoldingCollector(AtomicReferenceFolder<T> accumulator, Set<Characteristics> characteristics) {
this.accumulator = accumulator;
this.characteristics = unmodifiableSet(characteristics);
}


@Override
public AtomicReferenceFolder<T> accumulator() {
return accumulator;
}

@Override
public Set<Characteristics> characteristics() {
return characteristics;
}

/**
* The combiner, while thread-safe, has essentially undefined behavior if combining two
* found elements, because there is no way to tell if one or the other should be prioritized.
* In all cases of combining one found element with a not found, the found element will be
* returned from the function.
*/
@Override
public BinaryOperator<AtomicReference<T>> combiner() {
return (ref1, ref2) -> {
ref1.compareAndSet(null, ref2.get());
return ref1;
};
}

@Override
public Function<AtomicReference<T>, Optional<T>> finisher() {
return ref -> Optional.ofNullable(ref.get());
}

@Override
public Supplier<AtomicReference<T>> supplier() {
return AtomicReference::new;
}
}
60 changes: 60 additions & 0 deletions src/main/java/no/digipost/stream/SubjectFilter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (C) Posten Norge AS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package no.digipost.stream;

import java.util.Optional;
import java.util.function.Predicate;
import java.util.stream.Collector;

import static java.util.function.Predicate.isEqual;
import static no.digipost.stream.AtomicReferenceFolder.clearReference;
import static no.digipost.stream.AtomicReferenceFolder.keepFirst;
import static no.digipost.stream.AtomicReferenceFolder.keepLast;

/**
* The initial subject filter for building a searching {@code Collector}.
* The {@link Collector} is acquired by invoking a method to finalize the
* (compound) condition for accumulating the result across the elements
* the collector will be applied to.
*
* @param <T> the type of elements this subject filter inspects
*
* @see #keepFirstNotFollowedBy(Predicate)
* @see #keepLastNotFollowedBy(Predicate)
*/
public final class SubjectFilter<T> {
private final Predicate<T> subjectElement;

public SubjectFilter(Predicate<T> subjectElement) {
this.subjectElement = subjectElement;
}

public Collector<T, ?, Optional<T>> keepFirstNotFollowedBy(T cancellingElement) {
return keepFirstNotFollowedBy(isEqual(cancellingElement));
}

public Collector<T, ?, Optional<T>> keepFirstNotFollowedBy(Predicate<? super T> cancellingElement) {
return new AtomicReferenceFoldingCollector<>(keepFirst(subjectElement).doInsteadIf(cancellingElement, clearReference()));
}

public Collector<T, ?, Optional<T>> keepLastNotFollowedBy(T cancellingElement) {
return keepLastNotFollowedBy(isEqual(cancellingElement));
}

public Collector<T, ?, Optional<T>> keepLastNotFollowedBy(Predicate<? super T> cancellingElement) {
return new AtomicReferenceFoldingCollector<>(keepLast(subjectElement).doInsteadIf(cancellingElement, clearReference()));
}
}
48 changes: 48 additions & 0 deletions src/test/java/no/digipost/DiggCollectorsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
import no.digipost.tuple.Tuple;
import no.digipost.tuple.ViewableAsTuple;
import no.digipost.util.ViewableAsOptional;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.quicktheories.core.Gen;
import uk.co.probablyfine.matchers.OptionalMatchers;

Expand All @@ -31,6 +34,8 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static java.util.Arrays.asList;
Expand All @@ -39,6 +44,7 @@
import static no.digipost.DiggCollectors.allowAtMostOne;
import static no.digipost.DiggCollectors.allowAtMostOneOrElseThrow;
import static no.digipost.DiggCollectors.asSuppressedExceptionsOf;
import static no.digipost.DiggCollectors.find;
import static no.digipost.DiggCollectors.toMultimap;
import static no.digipost.DiggCollectors.toMultituple;
import static no.digipost.DiggCollectors.toSingleExceptionWithSuppressed;
Expand All @@ -50,6 +56,7 @@
import static org.hamcrest.Matchers.emptyArray;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.doThrow;
Expand Down Expand Up @@ -207,4 +214,45 @@ public void allowAtMostOneFailsWithCustomException() {
assertThat(thrown, sameInstance(customException));
});
}

@Nested
class FindTest {

@Test
void emptyStream() {
assertThat(Stream.empty().collect(find(e -> true).keepLastNotFollowedBy(null)), whereNot(Optional::isPresent));
}

@Test
void findsLastItemInStreamNothingIsVoided() {
assertThat(Stream.of(1, 2, 3, 4).collect(find((Integer n) -> n > 2).keepLastNotFollowedBy(0)), contains(4));
}

@Test
void findsLastMatchingItemAndNotCancelledByAnyFollowingElement() {
assertThat(Stream.of(1, 2, 3, 4, 5, 6).collect(find((Integer n) -> n > 2 && n < 6).keepLastNotFollowedBy(0)), contains(5));
}

@Test
void findsLastMatchingItemNotFollowedByCertainValue() {
assertThat(Stream.of(1, 2, 3, 4, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7).collect(find((Integer n) -> n > 3 && n < 6).keepLastNotFollowedBy(8)), contains(5));
}

@Test
void findsFirstMatchingItemNotFollowedByCertainValue() {
assertThat(Stream.of(1, 2, 3, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7).collect(find((Integer n) -> n > 3 && n < 6).keepFirstNotFollowedBy(8)), contains(4));
}

@RepeatedTest(20) @Timeout(10)
void parallelStreamsWorksButBehaviorIsNotReallyDefined() {
Random random = new Random();
Optional<Integer> result = Optional.empty();
while(!result.isPresent()) {
result = IntStream.generate(() -> random.nextInt(1000)).limit(500_000).boxed()
.parallel()
.collect(find((Integer n) -> n < 100).keepLastNotFollowedBy(n -> n > 990));
}
assertThat(result, contains(lessThan(100)));
}
}
}

0 comments on commit 9ec20f0

Please sign in to comment.