Skip to content

Commit

Permalink
Fix an edge case when async called after clear. Minor fix based on re…
Browse files Browse the repository at this point in the history
…viwer comments.
  • Loading branch information
shunping committed Jun 11, 2024
1 parent b481880 commit 65aef1a
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ public ReadableState<Boolean> readLater() {
@Override
public Iterable<TimestampedValue<T>> read() {
return readRange(
BoundedWindow.TIMESTAMP_MIN_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE);
Instant.ofEpochMilli(Long.MIN_VALUE), Instant.ofEpochMilli(Long.MAX_VALUE));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public Iterable<TimestampedValue<T>> readRange(Instant minTimestamp, Instant lim
this.timestampedValueCoder);

// Make a snapshot of the current pendingRemoves and use them to filter persistent values.
// The values of pendingRemoves are kept, so that they will still be accessible in
// The values of pendingRemoves are copied, so that they will still be accessible in
// pre-existing iterables even after a sort key is removed.
TreeRangeSet<Instant> pendingRemovesSnapshot = TreeRangeSet.create(pendingRemoves);
Iterable<TimestampedValue<T>> persistentValuesAfterRemoval =
Expand Down Expand Up @@ -263,6 +263,12 @@ public void clear() {
// so that they will still be accessible in pre-existing iterables even after the state is
// cleared.
pendingRemoves = TreeRangeSet.create();
pendingRemoves.add(
Range.range(
Instant.ofEpochMilli(Long.MIN_VALUE),
BoundType.CLOSED,
Instant.ofEpochMilli(Long.MAX_VALUE),
BoundType.OPEN));
pendingAdds.clear();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,25 +178,7 @@ public CompletableFuture<StateResponse> handle(StateRequest.Builder requestBuild

switch (request.getRequestCase()) {
case GET:
if (key.getTypeCase() != TypeCase.ORDERED_LIST_USER_STATE) {
List<ByteString> byteStrings =
data.getOrDefault(request.getStateKey(), Collections.singletonList(ByteString.EMPTY));
int block = 0;
if (!request.getGet().getContinuationToken().isEmpty()) {
block = Integer.parseInt(request.getGet().getContinuationToken().toStringUtf8());
}
ByteString returnBlock = byteStrings.get(block);
ByteString continuationToken = ByteString.EMPTY;
if (byteStrings.size() > block + 1) {
continuationToken = ByteString.copyFromUtf8(Integer.toString(block + 1));
}
response =
StateResponse.newBuilder()
.setGet(
StateGetResponse.newBuilder()
.setData(returnBlock)
.setContinuationToken(continuationToken));
} else {
if (key.getTypeCase() == TypeCase.ORDERED_LIST_USER_STATE) {
long start = key.getOrderedListUserState().getRange().getStart();
long end = key.getOrderedListUserState().getRange().getEnd();

Expand Down Expand Up @@ -271,13 +253,29 @@ public CompletableFuture<StateResponse> handle(StateRequest.Builder requestBuild
StateGetResponse.newBuilder()
.setData(returnBlock)
.setContinuationToken(continuationToken));
} else {
List<ByteString> byteStrings =
data.getOrDefault(request.getStateKey(), Collections.singletonList(ByteString.EMPTY));
int block = 0;
if (!request.getGet().getContinuationToken().isEmpty()) {
block = Integer.parseInt(request.getGet().getContinuationToken().toStringUtf8());
}
ByteString returnBlock = byteStrings.get(block);
ByteString continuationToken = ByteString.EMPTY;
if (byteStrings.size() > block + 1) {
continuationToken = ByteString.copyFromUtf8(Integer.toString(block + 1));
}
response =
StateResponse.newBuilder()
.setGet(
StateGetResponse.newBuilder()
.setData(returnBlock)
.setContinuationToken(continuationToken));
}
break;

case CLEAR:
if (key.getTypeCase() != TypeCase.ORDERED_LIST_USER_STATE) {
data.remove(request.getStateKey());
} else {
if (key.getTypeCase() == TypeCase.ORDERED_LIST_USER_STATE) {
OrderedListRange r = request.getStateKey().getOrderedListUserState().getRange();
StateKey.Builder stateKeyWithoutRange = request.getStateKey().toBuilder();
stateKeyWithoutRange.getOrderedListUserStateBuilder().clearRange();
Expand All @@ -293,16 +291,14 @@ public CompletableFuture<StateResponse> handle(StateRequest.Builder requestBuild
data.remove(keyBuilder.build());
orderedListSortKeysFromStateKey.get(stateKeyWithoutRange.build()).remove(l);
}
} else {
data.remove(request.getStateKey());
}
response = StateResponse.newBuilder().setClear(StateClearResponse.getDefaultInstance());
break;

case APPEND:
if (key.getTypeCase() != TypeCase.ORDERED_LIST_USER_STATE) {
List<ByteString> previousValue =
data.computeIfAbsent(request.getStateKey(), (unused) -> new ArrayList<>());
previousValue.add(request.getAppend().getData());
} else {
if (key.getTypeCase() == TypeCase.ORDERED_LIST_USER_STATE) {
InputStream inStream = request.getAppend().getData().newInput();
TimestampedValueCoder<byte[]> coder = TimestampedValueCoder.of(ByteArrayCoder.of());
try {
Expand Down Expand Up @@ -333,6 +329,10 @@ public CompletableFuture<StateResponse> handle(StateRequest.Builder requestBuild
} catch (IOException ex) {
throw new RuntimeException(ex);
}
} else {
List<ByteString> previousValue =
data.computeIfAbsent(request.getStateKey(), (unused) -> new ArrayList<>());
previousValue.add(request.getAppend().getData());
}
response = StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance());
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,45 @@ public void testAddClearRangeAsyncCloseAndRead() throws Exception {
}
}

@Test
public void testClearAsyncCloseAndRead() throws Exception {
FakeBeamFnStateClient fakeClient =
new FakeBeamFnStateClient(
timestampedValueCoder,
ImmutableMap.of(
createOrderedListStateKey("A", 1),
Collections.singletonList(A1),
createOrderedListStateKey("A", 2),
Collections.singletonList(A2),
createOrderedListStateKey("A", 3),
Collections.singletonList(A3),
createOrderedListStateKey("A", 4),
Collections.singletonList(A4)));
{
OrderedListUserState<String> userState =
new OrderedListUserState<>(
Caches.noop(),
fakeClient,
"instructionId",
createOrderedListStateKey("A"),
StringUtf8Coder.of());

userState.clear();
userState.asyncClose();
}
{
OrderedListUserState<String> userState =
new OrderedListUserState<>(
Caches.noop(),
fakeClient,
"instructionId",
createOrderedListStateKey("A"),
StringUtf8Coder.of());

assertThat(userState.read(), is(emptyIterable()));
}
}

@Test
public void testOperationsDuringNavigatingIterable() throws Exception {
FakeBeamFnStateClient fakeClient =
Expand Down

0 comments on commit 65aef1a

Please sign in to comment.