From ac4071666b4e35c6a41b65efb63148100ed606ab Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 14 Feb 2024 15:46:33 -0500 Subject: [PATCH] Apply spotless to reformat --- .../fn/harness/state/FnApiStateAccessor.java | 26 +- .../harness/state/OrderedListUserState.java | 68 +++-- .../harness/state/StateFetchingIterators.java | 10 +- .../harness/state/FakeBeamFnStateClient.java | 213 ++++++------- .../state/OrderedListUserStateTest.java | 285 +++++++++++------- 5 files changed, 341 insertions(+), 261 deletions(-) diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java index 202d1d3489a98..357e24744fc29 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java @@ -616,13 +616,13 @@ public OrderedListState bindOrderedList( @Override public Object apply(StateKey key) { return new OrderedListState() { - private final OrderedListUserState impl = createOrderedListUserState( - key, elemCoder); + private final OrderedListUserState impl = + createOrderedListUserState(key, elemCoder); @Override public void clear() { - clearRange(BoundedWindow.TIMESTAMP_MIN_VALUE, - BoundedWindow.TIMESTAMP_MAX_VALUE); + clearRange( + BoundedWindow.TIMESTAMP_MIN_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE); } @Override @@ -648,18 +648,19 @@ public ReadableState readLater() { @Nullable @Override public Iterable> read() { - return readRange(BoundedWindow.TIMESTAMP_MIN_VALUE, - BoundedWindow.TIMESTAMP_MAX_VALUE); + return readRange( + BoundedWindow.TIMESTAMP_MIN_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE); } @Override - public GroupingState, Iterable>> readLater() { + public GroupingState, Iterable>> + readLater() { throw new UnsupportedOperationException(); } @Override - public Iterable> readRange(Instant minTimestamp, - Instant limitTimestamp) { + public Iterable> readRange( + Instant minTimestamp, Instant limitTimestamp) { return impl.readRange(minTimestamp, limitTimestamp); } @@ -669,8 +670,8 @@ public void clearRange(Instant minTimestamp, Instant limitTimestamp) { } @Override - public OrderedListState readRangeLater(Instant minTimestamp, - Instant limitTimestamp) { + public OrderedListState readRangeLater( + Instant minTimestamp, Instant limitTimestamp) { throw new UnsupportedOperationException(); } }; @@ -925,7 +926,8 @@ private StateKey createMultimapKeysUserStateKey(String stateId) { return builder.build(); } - private OrderedListUserState createOrderedListUserState(StateKey stateKey, Coder valueCoder) { + private OrderedListUserState createOrderedListUserState( + StateKey stateKey, Coder valueCoder) { OrderedListUserState rval = new OrderedListUserState<>( getCacheFor(stateKey), diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/OrderedListUserState.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/OrderedListUserState.java index a8e1f96df249e..8734a9b2db5e7 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/OrderedListUserState.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/OrderedListUserState.java @@ -24,34 +24,25 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; -import java.util.List; -import java.util.Map; import java.util.Map.Entry; import java.util.NavigableMap; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import org.apache.beam.fn.harness.Cache; import org.apache.beam.fn.harness.Caches; import org.apache.beam.fn.harness.state.StateFetchingIterators.CachingStateIterable; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.OrderedListEntry; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.OrderedListRange; import org.apache.beam.model.fnexecution.v1.BeamFnApi.OrderedListStateUpdateRequest; -import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest; -import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateGetRequest; import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey; import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest; import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse; -import org.apache.beam.model.fnexecution.v1.BeamFnApi.OrderedListStateGetRequest; -import org.apache.beam.model.fnexecution.v1.BeamFnApi.OrderedListStateGetResponse; -import org.apache.beam.model.fnexecution.v1.BeamFnApi.OrderedListRange; -import org.apache.beam.model.fnexecution.v1.BeamFnApi.OrderedListEntry; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.fn.stream.PrefetchableIterable; import org.apache.beam.sdk.fn.stream.PrefetchableIterables; import org.apache.beam.sdk.util.ByteStringOutputStream; -import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BoundType; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; @@ -121,11 +112,12 @@ public Iterable> readRange(Instant minTimestamp, Instant lim // (1) a sort key is added to or removed from pendingAdds, or // (2) a new value is added to an existing sort key ArrayList>> pendingAddsInRange = new ArrayList<>(); - for (Entry> kv : pendingAdds.subMap(minTimestamp, - limitTimestamp).entrySet()) { - pendingAddsInRange.add(PrefetchableIterables.limit( - Iterables.transform(kv.getValue(), (v) -> TimestampedValue.of(v, kv.getKey())), - kv.getValue().size())); + for (Entry> kv : + pendingAdds.subMap(minTimestamp, limitTimestamp).entrySet()) { + pendingAddsInRange.add( + PrefetchableIterables.limit( + Iterables.transform(kv.getValue(), (v) -> TimestampedValue.of(v, kv.getKey())), + kv.getValue().size())); } Iterable> valuesInRange = Iterables.concat(pendingAddsInRange); @@ -140,7 +132,9 @@ public Iterable> readRange(Instant minTimestamp, Instant lim // TODO: consider use cache here CachingStateIterable> persistentValues = StateFetchingIterators.readAllAndDecodeStartingFrom( - Caches.noop(), this.beamFnStateClient, getRequestBuilder.build(), + Caches.noop(), + this.beamFnStateClient, + getRequestBuilder.build(), this.timestampedValueCoder); // Make a snapshot of the current pendingRemoves and use them to filter persistent values. @@ -148,10 +142,12 @@ public Iterable> readRange(Instant minTimestamp, Instant lim // pre-existing iterables even after a sort key is removed. TreeRangeSet pendingRemovesSnapshot = TreeRangeSet.create(pendingRemoves); Iterable> persistentValuesAfterRemoval = - Iterables.filter(persistentValues, v -> !pendingRemovesSnapshot.contains(v.getTimestamp())); + Iterables.filter( + persistentValues, v -> !pendingRemovesSnapshot.contains(v.getTimestamp())); - return Iterables.mergeSorted(ImmutableList.of(persistentValuesAfterRemoval, - valuesInRange), Comparator.comparing(TimestampedValue::getTimestamp)); + return Iterables.mergeSorted( + ImmutableList.of(persistentValuesAfterRemoval, valuesInRange), + Comparator.comparing(TimestampedValue::getTimestamp)); } return valuesInRange; @@ -177,7 +173,8 @@ public void clearRange(Instant minTimestamp, Instant limitTimestamp) { // pre-existing iterables even after the sort key is cleared. pendingAdds.subMap(minTimestamp, true, limitTimestamp, false).clear(); if (!isCleared) - pendingRemoves.add(Range.range(minTimestamp, BoundType.CLOSED, limitTimestamp, BoundType.OPEN)); + pendingRemoves.add( + Range.range(minTimestamp, BoundType.CLOSED, limitTimestamp, BoundType.OPEN)); } public void clear() { @@ -197,23 +194,30 @@ public void clear() { public void asyncClose() throws Exception { isClosed = true; - OrderedListStateUpdateRequest.Builder updateRequestBuilder = OrderedListStateUpdateRequest.newBuilder(); + OrderedListStateUpdateRequest.Builder updateRequestBuilder = + OrderedListStateUpdateRequest.newBuilder(); if (!pendingRemoves.isEmpty()) { - updateRequestBuilder - .addAllDeletes(Iterables.transform(pendingRemoves.asRanges(), - (r) -> OrderedListRange.newBuilder() - .setStart(r.lowerEndpoint().getMillis()) - .setEnd(r.upperEndpoint().getMillis()).build())); + updateRequestBuilder.addAllDeletes( + Iterables.transform( + pendingRemoves.asRanges(), + (r) -> + OrderedListRange.newBuilder() + .setStart(r.lowerEndpoint().getMillis()) + .setEnd(r.upperEndpoint().getMillis()) + .build())); pendingRemoves.clear(); } if (!pendingAdds.isEmpty()) { for (Entry> entry : pendingAdds.entrySet()) { - updateRequestBuilder - .addAllInserts(Iterables.transform(entry.getValue(), - (v) -> OrderedListEntry.newBuilder() - .setSortKey(entry.getKey().getMillis()) - .setData(encodeValue(v)).build())); + updateRequestBuilder.addAllInserts( + Iterables.transform( + entry.getValue(), + (v) -> + OrderedListEntry.newBuilder() + .setSortKey(entry.getKey().getMillis()) + .setData(encodeValue(v)) + .build())); } pendingAdds.clear(); } diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java index b9ad0fe6f8c95..db3742474082b 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java @@ -37,7 +37,6 @@ import org.apache.beam.fn.harness.state.StateFetchingIterators.CachingStateIterable.Blocks; import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateGetRequest; import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest; -import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest.RequestCase; import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.fn.data.WeightedList; @@ -579,7 +578,8 @@ public CompletableFuture loadPrefetchedResponse(ByteString contin stateRequestForFirstChunk .toBuilder() .setOrderedListGet( - stateRequestForFirstChunk.getOrderedListGet() + stateRequestForFirstChunk + .getOrderedListGet() .toBuilder() .setContinuationToken(continuationToken))); } else { @@ -620,8 +620,7 @@ public ByteString next() { ByteString tokenFromResponse; if (stateRequestForFirstChunk.getStateKey().getTypeCase() == ORDERED_LIST_USER_STATE) tokenFromResponse = stateResponse.getOrderedListGet().getContinuationToken(); - else - tokenFromResponse = stateResponse.getGet().getContinuationToken(); + else tokenFromResponse = stateResponse.getGet().getContinuationToken(); // If the continuation token is empty, that means we have reached EOF. if (ByteString.EMPTY.equals(tokenFromResponse)) { @@ -634,8 +633,7 @@ public ByteString next() { ByteString ret; if (stateRequestForFirstChunk.getStateKey().getTypeCase() == ORDERED_LIST_USER_STATE) ret = stateResponse.getOrderedListGet().getData(); - else - ret = stateResponse.getGet().getData(); + else ret = stateResponse.getGet().getData(); return ret; } } diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java index 4cee9c5f93d01..8691b9998d074 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java @@ -32,7 +32,6 @@ import java.util.TreeSet; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; import org.apache.beam.model.fnexecution.v1.BeamFnApi.OrderedListEntry; import org.apache.beam.model.fnexecution.v1.BeamFnApi.OrderedListRange; import org.apache.beam.model.fnexecution.v1.BeamFnApi.OrderedListStateGetResponse; @@ -46,18 +45,15 @@ import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest.RequestCase; import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.InstantCoder; import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.coders.InstantCoder; +import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.util.ByteStringOutputStream; import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; -import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.InvalidProtocolBufferException; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.TreeRangeSet; import org.joda.time.Instant; /** A fake implementation of a {@link BeamFnStateClient} to aid with testing. */ @@ -117,14 +113,16 @@ public FakeBeamFnStateClient(Map, List>> initialData, i return chunks; })); - Map> orderedListInitialData = new HashMap<>(Maps.transformValues( - Maps.filterKeys(initialData, - (k) -> k.getTypeCase() == TypeCase.ORDERED_LIST_USER_STATE), - (v) -> { - // make sure the provided coder is a TimestampedValueCoder. - assert v.getKey() instanceof TimestampedValueCoder; - return ((TimestampedValueCoder) v.getKey()).getValueCoder(); - })); + Map> orderedListInitialData = + new HashMap<>( + Maps.transformValues( + Maps.filterKeys( + initialData, (k) -> k.getTypeCase() == TypeCase.ORDERED_LIST_USER_STATE), + (v) -> { + // make sure the provided coder is a TimestampedValueCoder. + assert v.getKey() instanceof TimestampedValueCoder; + return ((TimestampedValueCoder) v.getKey()).getValueCoder(); + })); this.orderedListKeys = new HashMap<>(); for (Map.Entry> entry : orderedListInitialData.entrySet()) { @@ -133,7 +131,9 @@ public FakeBeamFnStateClient(Map, List>> initialData, i StateKey.Builder keyBuilder = entry.getKey().toBuilder(); keyBuilder.getOrderedListUserStateBuilder().clearSortKey(); - this.orderedListKeys.computeIfAbsent(keyBuilder.build(), (unused) -> new TreeSet<>()).add(sortKey); + this.orderedListKeys + .computeIfAbsent(keyBuilder.build(), (unused) -> new TreeSet<>()) + .add(sortKey); } this.data = @@ -181,25 +181,26 @@ public CompletableFuture handle(StateRequest.Builder requestBuild } switch (request.getRequestCase()) { - case GET: { - List byteStrings = - data.getOrDefault(request.getStateKey(), Collections.singletonList(ByteString.EMPTY)); - int block = 0; - if (request.getGet().getContinuationToken().size() > 0) { - block = Integer.parseInt(request.getGet().getContinuationToken().toStringUtf8()); - } - ByteString returnBlock = byteStrings.get(block); - ByteString continuationToken = ByteString.EMPTY; - if (byteStrings.size() > block + 1) { - continuationToken = ByteString.copyFromUtf8(Integer.toString(block + 1)); + case GET: + { + List byteStrings = + data.getOrDefault(request.getStateKey(), Collections.singletonList(ByteString.EMPTY)); + int block = 0; + if (request.getGet().getContinuationToken().size() > 0) { + block = Integer.parseInt(request.getGet().getContinuationToken().toStringUtf8()); + } + ByteString returnBlock = byteStrings.get(block); + ByteString continuationToken = ByteString.EMPTY; + if (byteStrings.size() > block + 1) { + continuationToken = ByteString.copyFromUtf8(Integer.toString(block + 1)); + } + response = + StateResponse.newBuilder() + .setGet( + StateGetResponse.newBuilder() + .setData(returnBlock) + .setContinuationToken(continuationToken)); } - response = - StateResponse.newBuilder() - .setGet( - StateGetResponse.newBuilder() - .setData(returnBlock) - .setContinuationToken(continuationToken)); - } break; case CLEAR: @@ -214,83 +215,87 @@ public CompletableFuture handle(StateRequest.Builder requestBuild response = StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance()); break; - case ORDERED_LIST_GET: { - long start = request.getOrderedListGet().getRange().getStart(); - long end = request.getOrderedListGet().getRange().getEnd(); - - KvCoder coder = KvCoder.of(VarLongCoder.of(), VarIntCoder.of()); - long sortKey = start; - int index = 0; - if (request.getOrderedListGet().getContinuationToken().size() > 0) { - try { - // The continuation format here is the sort key (long) followed by an index (int) - KV cursor = coder.decode(request.getOrderedListGet(). - getContinuationToken().newInput()); - sortKey = cursor.getKey(); - index = cursor.getValue(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - ByteString continuationToken; - ByteString returnBlock = ByteString.EMPTY;; - try { - if (sortKey < start || sortKey >= end) { - throw new IndexOutOfBoundsException("sort key out of range"); + case ORDERED_LIST_GET: + { + long start = request.getOrderedListGet().getRange().getStart(); + long end = request.getOrderedListGet().getRange().getEnd(); + + KvCoder coder = KvCoder.of(VarLongCoder.of(), VarIntCoder.of()); + long sortKey = start; + int index = 0; + if (request.getOrderedListGet().getContinuationToken().size() > 0) { + try { + // The continuation format here is the sort key (long) followed by an index (int) + KV cursor = + coder.decode(request.getOrderedListGet().getContinuationToken().newInput()); + sortKey = cursor.getKey(); + index = cursor.getValue(); + } catch (IOException e) { + throw new RuntimeException(e); + } } - NavigableSet subset = orderedListKeys - .getOrDefault(request.getStateKey(), new TreeSet<>()) - .subSet(sortKey, true, end, false); + ByteString continuationToken; + ByteString returnBlock = ByteString.EMPTY; + ; + try { + if (sortKey < start || sortKey >= end) { + throw new IndexOutOfBoundsException("sort key out of range"); + } - // get the effective sort key currently, can throw NoSuchElementException - Long nextSortKey = subset.first(); + NavigableSet subset = + orderedListKeys + .getOrDefault(request.getStateKey(), new TreeSet<>()) + .subSet(sortKey, true, end, false); - StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); - keyBuilder.getOrderedListUserStateBuilder().setSortKey(nextSortKey); - List byteStrings = - data.getOrDefault(keyBuilder.build(), - Collections.singletonList(ByteString.EMPTY)); - - // get the block specified in continuation token, can throw IndexOutOfBoundsException - returnBlock = byteStrings.get(index); - - if (byteStrings.size() > index + 1) { - // more blocks from this sort key - index += 1; - } else { - // finish navigating the current sort key and need to find the next one, - // can throw NoSuchElementException - nextSortKey = subset.tailSet(nextSortKey, false).first(); - index = 0; - } + // get the effective sort key currently, can throw NoSuchElementException + Long nextSortKey = subset.first(); - ByteStringOutputStream outputStream = new ByteStringOutputStream(); - try { - KV cursor = KV.of(nextSortKey, index); - coder.encode(cursor, outputStream); - } catch (IOException e) { - throw new RuntimeException(e); + StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); + keyBuilder.getOrderedListUserStateBuilder().setSortKey(nextSortKey); + List byteStrings = + data.getOrDefault(keyBuilder.build(), Collections.singletonList(ByteString.EMPTY)); + + // get the block specified in continuation token, can throw IndexOutOfBoundsException + returnBlock = byteStrings.get(index); + + if (byteStrings.size() > index + 1) { + // more blocks from this sort key + index += 1; + } else { + // finish navigating the current sort key and need to find the next one, + // can throw NoSuchElementException + nextSortKey = subset.tailSet(nextSortKey, false).first(); + index = 0; + } + + ByteStringOutputStream outputStream = new ByteStringOutputStream(); + try { + KV cursor = KV.of(nextSortKey, index); + coder.encode(cursor, outputStream); + } catch (IOException e) { + throw new RuntimeException(e); + } + continuationToken = outputStream.toByteString(); + } catch (NoSuchElementException | IndexOutOfBoundsException e) { + continuationToken = ByteString.EMPTY; } - continuationToken = outputStream.toByteString(); - } catch (NoSuchElementException|IndexOutOfBoundsException e) { - continuationToken = ByteString.EMPTY; + response = + StateResponse.newBuilder() + .setOrderedListGet( + OrderedListStateGetResponse.newBuilder() + .setData(returnBlock) + .setContinuationToken(continuationToken)); } - response = - StateResponse.newBuilder() - .setOrderedListGet( - OrderedListStateGetResponse.newBuilder() - .setData(returnBlock) - .setContinuationToken(continuationToken)); - } - break; + break; case ORDERED_LIST_UPDATE: for (OrderedListRange r : request.getOrderedListUpdate().getDeletesList()) { - List keysToRemove = new ArrayList<>( - orderedListKeys.getOrDefault(request.getStateKey(), new TreeSet<>()) - .subSet(r.getStart(), true, r.getEnd(), false)); + List keysToRemove = + new ArrayList<>( + orderedListKeys + .getOrDefault(request.getStateKey(), new TreeSet<>()) + .subSet(r.getStart(), true, r.getEnd(), false)); for (Long l : keysToRemove) { StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); @@ -319,11 +324,13 @@ public CompletableFuture handle(StateRequest.Builder requestBuild data.computeIfAbsent(keyBuilder.build(), (unused) -> new ArrayList<>()); previousValues.add(output); - orderedListKeys.computeIfAbsent(request.getStateKey(), (unused) -> new TreeSet<>()) + orderedListKeys + .computeIfAbsent(request.getStateKey(), (unused) -> new TreeSet<>()) .add(e.getSortKey()); } - response = StateResponse.newBuilder() - .setOrderedListUpdate(OrderedListStateUpdateResponse.getDefaultInstance()); + response = + StateResponse.newBuilder() + .setOrderedListUpdate(OrderedListStateUpdateResponse.getDefaultInstance()); break; default: diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/OrderedListUserStateTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/OrderedListUserStateTest.java index 0f41c75e52c66..65d665c8ea515 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/OrderedListUserStateTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/OrderedListUserStateTest.java @@ -26,7 +26,6 @@ import java.io.IOException; import java.util.Collections; -import java.util.List; import org.apache.beam.fn.harness.Caches; import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -43,12 +42,18 @@ @RunWith(JUnit4.class) public class OrderedListUserStateTest { - private static final TimestampedValue A1 = TimestampedValue.of("A1", Instant.ofEpochMilli(1)); - private static final TimestampedValue B1 = TimestampedValue.of("B1", Instant.ofEpochMilli(1)); - private static final TimestampedValue A2 = TimestampedValue.of("A2", Instant.ofEpochMilli(2)); - private static final TimestampedValue B2 = TimestampedValue.of("B2", Instant.ofEpochMilli(2)); - private static final TimestampedValue A3 = TimestampedValue.of("A3", Instant.ofEpochMilli(3)); - private static final TimestampedValue A4 = TimestampedValue.of("A4", Instant.ofEpochMilli(4)); + private static final TimestampedValue A1 = + TimestampedValue.of("A1", Instant.ofEpochMilli(1)); + private static final TimestampedValue B1 = + TimestampedValue.of("B1", Instant.ofEpochMilli(1)); + private static final TimestampedValue A2 = + TimestampedValue.of("A2", Instant.ofEpochMilli(2)); + private static final TimestampedValue B2 = + TimestampedValue.of("B2", Instant.ofEpochMilli(2)); + private static final TimestampedValue A3 = + TimestampedValue.of("A3", Instant.ofEpochMilli(3)); + private static final TimestampedValue A4 = + TimestampedValue.of("A4", Instant.ofEpochMilli(4)); private final String pTransformId = "pTransformId"; private final String stateId = "stateId"; @@ -81,7 +86,8 @@ public void testRead() throws Exception { createOrderedListStateKey("A"), StringUtf8Coder.of()); - assertArrayEquals(asList(A1, B1).toArray(), Iterables.toArray(userState.read(), TimestampedValue.class)); + assertArrayEquals( + asList(A1, B1).toArray(), Iterables.toArray(userState.read(), TimestampedValue.class)); userState.asyncClose(); assertThrows(IllegalStateException.class, () -> userState.read()); } @@ -91,9 +97,10 @@ public void testReadRange() throws Exception { FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient( TimestampedValueCoder.of(StringUtf8Coder.of()), - ImmutableMap.of(createOrderedListStateKey("A", 1), asList(A1, B1), - createOrderedListStateKey("A", 4), Collections.singletonList(A4), - createOrderedListStateKey("A", 2), Collections.singletonList(A2))); + ImmutableMap.of( + createOrderedListStateKey("A", 1), asList(A1, B1), + createOrderedListStateKey("A", 4), Collections.singletonList(A4), + createOrderedListStateKey("A", 2), Collections.singletonList(A2))); OrderedListUserState userState = new OrderedListUserState<>( @@ -103,28 +110,38 @@ public void testReadRange() throws Exception { createOrderedListStateKey("A"), StringUtf8Coder.of()); - Iterable> stateBeforeB2 = userState.readRange(Instant.ofEpochMilli(2), Instant.ofEpochMilli(4)); - assertArrayEquals(Collections.singletonList(A2).toArray(), + Iterable> stateBeforeB2 = + userState.readRange(Instant.ofEpochMilli(2), Instant.ofEpochMilli(4)); + assertArrayEquals( + Collections.singletonList(A2).toArray(), Iterables.toArray(stateBeforeB2, TimestampedValue.class)); // Add a new value to an existing sort key userState.add(B2); - assertArrayEquals(Collections.singletonList(A2).toArray(), + assertArrayEquals( + Collections.singletonList(A2).toArray(), Iterables.toArray(stateBeforeB2, TimestampedValue.class)); - assertArrayEquals(asList(A2, B2).toArray(), - Iterables.toArray(userState.readRange(Instant.ofEpochMilli(2), Instant.ofEpochMilli(4)), - TimestampedValue.class)); + assertArrayEquals( + asList(A2, B2).toArray(), + Iterables.toArray( + userState.readRange(Instant.ofEpochMilli(2), Instant.ofEpochMilli(4)), + TimestampedValue.class)); // Add a new value to a new sort key userState.add(A3); - assertArrayEquals(Collections.singletonList(A2).toArray(), + assertArrayEquals( + Collections.singletonList(A2).toArray(), Iterables.toArray(stateBeforeB2, TimestampedValue.class)); - assertArrayEquals(asList(A2, B2, A3).toArray(), - Iterables.toArray(userState.readRange(Instant.ofEpochMilli(2), Instant.ofEpochMilli(4)), + assertArrayEquals( + asList(A2, B2, A3).toArray(), + Iterables.toArray( + userState.readRange(Instant.ofEpochMilli(2), Instant.ofEpochMilli(4)), TimestampedValue.class)); userState.asyncClose(); - assertThrows(IllegalStateException.class, () -> userState.readRange(Instant.ofEpochMilli(1), Instant.ofEpochMilli(2))); + assertThrows( + IllegalStateException.class, + () -> userState.readRange(Instant.ofEpochMilli(1), Instant.ofEpochMilli(2))); } @Test @@ -132,9 +149,13 @@ public void testAdd() throws Exception { FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient( TimestampedValueCoder.of(StringUtf8Coder.of()), - ImmutableMap.of(createOrderedListStateKey("A", 1), Collections.singletonList(A1), - createOrderedListStateKey("A", 4), Collections.singletonList(A4), - createOrderedListStateKey("A", 2), asList(A2, B2))); + ImmutableMap.of( + createOrderedListStateKey("A", 1), + Collections.singletonList(A1), + createOrderedListStateKey("A", 4), + Collections.singletonList(A4), + createOrderedListStateKey("A", 2), + asList(A2, B2))); OrderedListUserState userState = new OrderedListUserState<>( @@ -146,17 +167,20 @@ public void testAdd() throws Exception { // add to an existing timestamp userState.add(B1); - assertArrayEquals(asList(A1, B1, A2, B2, A4).toArray(), + assertArrayEquals( + asList(A1, B1, A2, B2, A4).toArray(), Iterables.toArray(userState.read(), TimestampedValue.class)); // add to a nonexistent timestamp userState.add(A3); - assertArrayEquals(asList(A1, B1, A2, B2, A3, A4).toArray(), + assertArrayEquals( + asList(A1, B1, A2, B2, A3, A4).toArray(), Iterables.toArray(userState.read(), TimestampedValue.class)); // add a duplicated value userState.add(B1); - assertArrayEquals(asList(A1, B1, B1, A2, B2, A3, A4).toArray(), + assertArrayEquals( + asList(A1, B1, B1, A2, B2, A3, A4).toArray(), Iterables.toArray(userState.read(), TimestampedValue.class)); userState.asyncClose(); @@ -168,11 +192,15 @@ public void testClearRange() throws Exception { FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient( TimestampedValueCoder.of(StringUtf8Coder.of()), - ImmutableMap.of(createOrderedListStateKey("A", 1), asList(A1, B1), - createOrderedListStateKey("A", 4), Collections.singletonList(A4), - createOrderedListStateKey("A", 2), asList(A2, B2), - createOrderedListStateKey("A", 3), Collections.singletonList(A3)) - ); + ImmutableMap.of( + createOrderedListStateKey("A", 1), + asList(A1, B1), + createOrderedListStateKey("A", 4), + Collections.singletonList(A4), + createOrderedListStateKey("A", 2), + asList(A2, B2), + createOrderedListStateKey("A", 3), + Collections.singletonList(A3))); OrderedListUserState userState = new OrderedListUserState<>( @@ -182,45 +210,49 @@ public void testClearRange() throws Exception { createOrderedListStateKey("A"), StringUtf8Coder.of()); - Iterable> initStateFrom2To3 = userState.readRange( - Instant.ofEpochMilli(2), Instant.ofEpochMilli(4)); + Iterable> initStateFrom2To3 = + userState.readRange(Instant.ofEpochMilli(2), Instant.ofEpochMilli(4)); // clear range below the current timestamp range userState.clearRange(Instant.ofEpochMilli(-1), Instant.ofEpochMilli(0)); - assertArrayEquals(asList(A2, B2, A3).toArray(), - Iterables.toArray(initStateFrom2To3, TimestampedValue.class)); - assertArrayEquals(asList(A1, B1, A2, B2, A3, A4).toArray(), + assertArrayEquals( + asList(A2, B2, A3).toArray(), Iterables.toArray(initStateFrom2To3, TimestampedValue.class)); + assertArrayEquals( + asList(A1, B1, A2, B2, A3, A4).toArray(), Iterables.toArray(userState.read(), TimestampedValue.class)); // clear range above the current timestamp range userState.clearRange(Instant.ofEpochMilli(5), Instant.ofEpochMilli(10)); - assertArrayEquals(asList(A2, B2, A3).toArray(), - Iterables.toArray(initStateFrom2To3, TimestampedValue.class)); - assertArrayEquals(asList(A1, B1, A2, B2, A3, A4).toArray(), + assertArrayEquals( + asList(A2, B2, A3).toArray(), Iterables.toArray(initStateFrom2To3, TimestampedValue.class)); + assertArrayEquals( + asList(A1, B1, A2, B2, A3, A4).toArray(), Iterables.toArray(userState.read(), TimestampedValue.class)); // clear range that falls inside the current timestamp range userState.clearRange(Instant.ofEpochMilli(2), Instant.ofEpochMilli(4)); - assertArrayEquals(asList(A2, B2, A3).toArray(), - Iterables.toArray(initStateFrom2To3, TimestampedValue.class)); - assertArrayEquals(asList(A1, B1, A4).toArray(), - Iterables.toArray(userState.read(), TimestampedValue.class)); + assertArrayEquals( + asList(A2, B2, A3).toArray(), Iterables.toArray(initStateFrom2To3, TimestampedValue.class)); + assertArrayEquals( + asList(A1, B1, A4).toArray(), Iterables.toArray(userState.read(), TimestampedValue.class)); // clear range that partially covers the current timestamp range userState.clearRange(Instant.ofEpochMilli(3), Instant.ofEpochMilli(5)); - assertArrayEquals(asList(A2, B2, A3).toArray(), - Iterables.toArray(initStateFrom2To3, TimestampedValue.class)); - assertArrayEquals(asList(A1, B1).toArray(), - Iterables.toArray(userState.read(), TimestampedValue.class)); + assertArrayEquals( + asList(A2, B2, A3).toArray(), Iterables.toArray(initStateFrom2To3, TimestampedValue.class)); + assertArrayEquals( + asList(A1, B1).toArray(), Iterables.toArray(userState.read(), TimestampedValue.class)); // clear range that fully covers the current timestamp range userState.clearRange(Instant.ofEpochMilli(-1), Instant.ofEpochMilli(10)); - assertArrayEquals(asList(A2, B2, A3).toArray(), - Iterables.toArray(initStateFrom2To3, TimestampedValue.class)); + assertArrayEquals( + asList(A2, B2, A3).toArray(), Iterables.toArray(initStateFrom2To3, TimestampedValue.class)); assertThat(userState.read(), is(emptyIterable())); userState.asyncClose(); - assertThrows(IllegalStateException.class, () -> userState.clearRange(Instant.ofEpochMilli(1), Instant.ofEpochMilli(2))); + assertThrows( + IllegalStateException.class, + () -> userState.clearRange(Instant.ofEpochMilli(1), Instant.ofEpochMilli(2))); } @Test @@ -228,11 +260,15 @@ public void testClear() throws Exception { FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient( TimestampedValueCoder.of(StringUtf8Coder.of()), - ImmutableMap.of(createOrderedListStateKey("A", 1), asList(A1, B1), - createOrderedListStateKey("A", 4), Collections.singletonList(A4), - createOrderedListStateKey("A", 2), asList(A2, B2), - createOrderedListStateKey("A", 3), Collections.singletonList(A3)) - ); + ImmutableMap.of( + createOrderedListStateKey("A", 1), + asList(A1, B1), + createOrderedListStateKey("A", 4), + Collections.singletonList(A4), + createOrderedListStateKey("A", 2), + asList(A2, B2), + createOrderedListStateKey("A", 3), + Collections.singletonList(A3))); OrderedListUserState userState = new OrderedListUserState<>( @@ -244,7 +280,8 @@ public void testClear() throws Exception { Iterable> stateBeforeClear = userState.read(); userState.clear(); - assertArrayEquals(asList(A1, B1, A2, B2, A3, A4).toArray(), + assertArrayEquals( + asList(A1, B1, A2, B2, A3, A4).toArray(), Iterables.toArray(stateBeforeClear, TimestampedValue.class)); assertThat(userState.read(), is(emptyIterable())); @@ -257,10 +294,13 @@ public void testAddAndClearRange() throws Exception { FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient( TimestampedValueCoder.of(StringUtf8Coder.of()), - ImmutableMap.of(createOrderedListStateKey("A", 1), Collections.singletonList(A1), - createOrderedListStateKey("A", 3), Collections.singletonList(A3), - createOrderedListStateKey("A", 4), Collections.singletonList(A4)) - ); + ImmutableMap.of( + createOrderedListStateKey("A", 1), + Collections.singletonList(A1), + createOrderedListStateKey("A", 3), + Collections.singletonList(A3), + createOrderedListStateKey("A", 4), + Collections.singletonList(A4))); OrderedListUserState userState = new OrderedListUserState<>( @@ -274,40 +314,49 @@ public void testAddAndClearRange() throws Exception { userState.add(A2); Iterable> stateBeforeFirstClearRange = userState.read(); userState.clearRange(Instant.ofEpochMilli(2), Instant.ofEpochMilli(3)); - assertArrayEquals(asList(A1, A2, A3, A4).toArray(), + assertArrayEquals( + asList(A1, A2, A3, A4).toArray(), Iterables.toArray(stateBeforeFirstClearRange, TimestampedValue.class)); - assertArrayEquals(asList(A1, A3, A4).toArray(), - Iterables.toArray(userState.read(), TimestampedValue.class)); + assertArrayEquals( + asList(A1, A3, A4).toArray(), Iterables.toArray(userState.read(), TimestampedValue.class)); userState.add(B2); - assertArrayEquals(asList(A1, A2, A3, A4).toArray(), + assertArrayEquals( + asList(A1, A2, A3, A4).toArray(), Iterables.toArray(stateBeforeFirstClearRange, TimestampedValue.class)); - assertArrayEquals(asList(A1, B2, A3, A4).toArray(), + assertArrayEquals( + asList(A1, B2, A3, A4).toArray(), Iterables.toArray(userState.read(), TimestampedValue.class)); // add to an existing timestamp, clear, and then add userState.add(B1); userState.clearRange(Instant.ofEpochMilli(1), Instant.ofEpochMilli(2)); - assertArrayEquals(asList(A1, A2, A3, A4).toArray(), + assertArrayEquals( + asList(A1, A2, A3, A4).toArray(), Iterables.toArray(stateBeforeFirstClearRange, TimestampedValue.class)); - assertArrayEquals(asList(B2, A3, A4).toArray(), - Iterables.toArray(userState.read(), TimestampedValue.class)); + assertArrayEquals( + asList(B2, A3, A4).toArray(), Iterables.toArray(userState.read(), TimestampedValue.class)); userState.add(B1); - assertArrayEquals(asList(A1, A2, A3, A4).toArray(), + assertArrayEquals( + asList(A1, A2, A3, A4).toArray(), Iterables.toArray(stateBeforeFirstClearRange, TimestampedValue.class)); - assertArrayEquals(asList(B1, B2, A3, A4).toArray(), + assertArrayEquals( + asList(B1, B2, A3, A4).toArray(), Iterables.toArray(userState.read(), TimestampedValue.class)); // add a duplicated value, clear, and then add userState.add(A3); userState.clearRange(Instant.ofEpochMilli(3), Instant.ofEpochMilli(4)); - assertArrayEquals(asList(A1, A2, A3, A4).toArray(), + assertArrayEquals( + asList(A1, A2, A3, A4).toArray(), Iterables.toArray(stateBeforeFirstClearRange, TimestampedValue.class)); - assertArrayEquals(asList(B1, B2, A4).toArray(), - Iterables.toArray(userState.read(), TimestampedValue.class)); + assertArrayEquals( + asList(B1, B2, A4).toArray(), Iterables.toArray(userState.read(), TimestampedValue.class)); userState.add(A3); - assertArrayEquals(asList(A1, A2, A3, A4).toArray(), + assertArrayEquals( + asList(A1, A2, A3, A4).toArray(), Iterables.toArray(stateBeforeFirstClearRange, TimestampedValue.class)); - assertArrayEquals(asList(B1, B2, A3, A4).toArray(), + assertArrayEquals( + asList(B1, B2, A3, A4).toArray(), Iterables.toArray(userState.read(), TimestampedValue.class)); } @@ -316,10 +365,13 @@ public void testAddAndClearRangeAfterClear() throws Exception { FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient( TimestampedValueCoder.of(StringUtf8Coder.of()), - ImmutableMap.of(createOrderedListStateKey("A", 1), Collections.singletonList(A1), - createOrderedListStateKey("A", 3), Collections.singletonList(A3), - createOrderedListStateKey("A", 4), Collections.singletonList(A4)) - ); + ImmutableMap.of( + createOrderedListStateKey("A", 1), + Collections.singletonList(A1), + createOrderedListStateKey("A", 3), + Collections.singletonList(A3), + createOrderedListStateKey("A", 4), + Collections.singletonList(A4))); OrderedListUserState userState = new OrderedListUserState<>( @@ -334,17 +386,18 @@ public void testAddAndClearRangeAfterClear() throws Exception { assertThat(userState.read(), is(emptyIterable())); userState.add(A1); - assertArrayEquals(Collections.singletonList(A1).toArray(), + assertArrayEquals( + Collections.singletonList(A1).toArray(), Iterables.toArray(userState.read(), TimestampedValue.class)); userState.add(A2); userState.add(A3); - assertArrayEquals(asList(A1, A2, A3).toArray(), - Iterables.toArray(userState.read(), TimestampedValue.class)); + assertArrayEquals( + asList(A1, A2, A3).toArray(), Iterables.toArray(userState.read(), TimestampedValue.class)); userState.clearRange(Instant.ofEpochMilli(2), Instant.ofEpochMilli(3)); - assertArrayEquals(asList(A1, A3).toArray(), - Iterables.toArray(userState.read(), TimestampedValue.class)); + assertArrayEquals( + asList(A1, A3).toArray(), Iterables.toArray(userState.read(), TimestampedValue.class)); } @Test @@ -352,10 +405,13 @@ public void testNoopAsyncCloseAndRead() throws Exception { FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient( TimestampedValueCoder.of(StringUtf8Coder.of()), - ImmutableMap.of(createOrderedListStateKey("A", 1), Collections.singletonList(A1), - createOrderedListStateKey("A", 3), Collections.singletonList(A3), - createOrderedListStateKey("A", 4), Collections.singletonList(A4)) - ); + ImmutableMap.of( + createOrderedListStateKey("A", 1), + Collections.singletonList(A1), + createOrderedListStateKey("A", 3), + Collections.singletonList(A3), + createOrderedListStateKey("A", 4), + Collections.singletonList(A4))); { OrderedListUserState userState = new OrderedListUserState<>( @@ -375,9 +431,10 @@ public void testNoopAsyncCloseAndRead() throws Exception { fakeClient, "instructionId", createOrderedListStateKey("A"), - StringUtf8Coder.of()); + StringUtf8Coder.of()); - assertArrayEquals(asList(A1, A3, A4).toArray(), + assertArrayEquals( + asList(A1, A3, A4).toArray(), Iterables.toArray(userState.read(), TimestampedValue.class)); } } @@ -387,10 +444,13 @@ public void testAddAsyncCloseAndRead() throws Exception { FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient( TimestampedValueCoder.of(StringUtf8Coder.of()), - ImmutableMap.of(createOrderedListStateKey("A", 1), Collections.singletonList(A1), - createOrderedListStateKey("A", 3), Collections.singletonList(A3), - createOrderedListStateKey("A", 4), Collections.singletonList(A4)) - ); + ImmutableMap.of( + createOrderedListStateKey("A", 1), + Collections.singletonList(A1), + createOrderedListStateKey("A", 3), + Collections.singletonList(A3), + createOrderedListStateKey("A", 4), + Collections.singletonList(A4))); { OrderedListUserState userState = new OrderedListUserState<>( @@ -413,7 +473,8 @@ public void testAddAsyncCloseAndRead() throws Exception { createOrderedListStateKey("A"), StringUtf8Coder.of()); - assertArrayEquals(asList(A1, B1, A2, A3, A4).toArray(), + assertArrayEquals( + asList(A1, B1, A2, A3, A4).toArray(), Iterables.toArray(userState.read(), TimestampedValue.class)); } } @@ -423,11 +484,15 @@ public void testClearRangeAsyncCloseAndRead() throws Exception { FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient( TimestampedValueCoder.of(StringUtf8Coder.of()), - ImmutableMap.of(createOrderedListStateKey("A", 1), Collections.singletonList(A1), - createOrderedListStateKey("A", 2), Collections.singletonList(A2), - createOrderedListStateKey("A", 3), Collections.singletonList(A3), - createOrderedListStateKey("A", 4), Collections.singletonList(A4)) - ); + ImmutableMap.of( + createOrderedListStateKey("A", 1), + Collections.singletonList(A1), + createOrderedListStateKey("A", 2), + Collections.singletonList(A2), + createOrderedListStateKey("A", 3), + Collections.singletonList(A3), + createOrderedListStateKey("A", 4), + Collections.singletonList(A4))); { OrderedListUserState userState = new OrderedListUserState<>( @@ -450,7 +515,8 @@ public void testClearRangeAsyncCloseAndRead() throws Exception { createOrderedListStateKey("A"), StringUtf8Coder.of()); - assertArrayEquals(Collections.singletonList(A3).toArray(), + assertArrayEquals( + Collections.singletonList(A3).toArray(), Iterables.toArray(userState.read(), TimestampedValue.class)); } } @@ -460,9 +526,11 @@ public void testAddClearRangeAsyncCloseAndRead() throws Exception { FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient( TimestampedValueCoder.of(StringUtf8Coder.of()), - ImmutableMap.of(createOrderedListStateKey("A", 1), Collections.singletonList(A1), - createOrderedListStateKey("A", 4), Collections.singletonList(A4)) - ); + ImmutableMap.of( + createOrderedListStateKey("A", 1), + Collections.singletonList(A1), + createOrderedListStateKey("A", 4), + Collections.singletonList(A4))); { OrderedListUserState userState = new OrderedListUserState<>( @@ -488,12 +556,13 @@ public void testAddClearRangeAsyncCloseAndRead() throws Exception { createOrderedListStateKey("A"), StringUtf8Coder.of()); - assertArrayEquals(Collections.singletonList(A3).toArray(), + assertArrayEquals( + Collections.singletonList(A3).toArray(), Iterables.toArray(userState.read(), TimestampedValue.class)); } } - private ByteString encode (String...values) throws IOException { + private ByteString encode(String... values) throws IOException { ByteStringOutputStream out = new ByteStringOutputStream(); for (String value : values) { StringUtf8Coder.of().encode(value, out); @@ -523,4 +592,4 @@ private StateKey createOrderedListStateKey(String key, long sort_key) throws IOE .setSortKey(sort_key)) .build(); } -} \ No newline at end of file +}