Skip to content

Commit

Permalink
Refactor some variable names. Add a notes on the order of pendingAdds…
Browse files Browse the repository at this point in the history
… and pendingRemoves during async_close()
  • Loading branch information
shunping committed Jun 12, 2024
1 parent 65aef1a commit c158b32
Showing 1 changed file with 21 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,17 @@
*/
public class OrderedListUserState<T> {
private final BeamFnStateClient beamFnStateClient;
private final StateRequest request;
private final StateRequest requestTemplate;
private final TimestampedValueCoder<T> timestampedValueCoder;
// Pending updates to persistent storage
// (a) The elements in pendingAdds are the ones that should be added to the persistent storage
// during the next async_close(). It doesn't include the ones that are removed by
// clear_range() or clear() after the last add.
// (b) The elements in pendingRemoves are the sort keys that should be removed from the persistent
// storage.
// (c) When syncing local copy with persistent storage, pendingRemoves are performed first and
// then pendingAdds. Switching this order may result in wrong results, because a value added
// later could be removed from an earlier clear.
private NavigableMap<Instant, Collection<T>> pendingAdds = Maps.newTreeMap();
private TreeRangeSet<Instant> pendingRemoves = TreeRangeSet.create();

Expand Down Expand Up @@ -108,10 +116,11 @@ public Object structuralValue(TimestampedValue<T> value) {
}

@Override
public void encode(TimestampedValue<T> windowedElem, OutputStream outStream)
public void encode(TimestampedValue<T> timestampedValue, OutputStream outStream)
throws IOException {
internalKvCoder.encode(
KV.of(windowedElem.getTimestamp().getMillis(), windowedElem.getValue()), outStream);
KV.of(timestampedValue.getTimestamp().getMillis(), timestampedValue.getValue()),
outStream);
}

@Override
Expand Down Expand Up @@ -159,15 +168,15 @@ public OrderedListUserState(
stateKey);
this.beamFnStateClient = beamFnStateClient;
this.timestampedValueCoder = TimestampedValueCoder.of(valueCoder);
this.request =
this.requestTemplate =
StateRequest.newBuilder().setInstructionId(instructionId).setStateKey(stateKey).build();
}

public void add(TimestampedValue<T> value) {
checkState(
!isClosed,
"OrderedList user state is no longer usable because it is closed for %s",
request.getStateKey());
requestTemplate.getStateKey());
Instant timestamp = value.getTimestamp();
pendingAdds.putIfAbsent(timestamp, new ArrayList<>());
pendingAdds.get(timestamp).add(value.getValue());
Expand All @@ -177,7 +186,7 @@ public Iterable<TimestampedValue<T>> readRange(Instant minTimestamp, Instant lim
checkState(
!isClosed,
"OrderedList user state is no longer usable because it is closed for %s",
request.getStateKey());
requestTemplate.getStateKey());

// Store pendingAdds whose sort key is in the query range and values are truncated by the
// current size. The values (collections) of pendingAdds are kept, so that they will still be
Expand All @@ -195,7 +204,7 @@ public Iterable<TimestampedValue<T>> readRange(Instant minTimestamp, Instant lim
Iterable<TimestampedValue<T>> valuesInRange = Iterables.concat(pendingAddsInRange);

if (!isCleared) {
StateRequest.Builder getRequestBuilder = this.request.toBuilder();
StateRequest.Builder getRequestBuilder = this.requestTemplate.toBuilder();
getRequestBuilder
.getStateKeyBuilder()
.getOrderedListUserStateBuilder()
Expand Down Expand Up @@ -231,7 +240,7 @@ public Iterable<TimestampedValue<T>> read() {
checkState(
!isClosed,
"OrderedList user state is no longer usable because it is closed for %s",
request.getStateKey());
requestTemplate.getStateKey());

return readRange(Instant.ofEpochMilli(Long.MIN_VALUE), Instant.ofEpochMilli(Long.MAX_VALUE));
}
Expand All @@ -240,7 +249,7 @@ public void clearRange(Instant minTimestamp, Instant limitTimestamp) {
checkState(
!isClosed,
"OrderedList user state is no longer usable because it is closed for %s",
request.getStateKey());
requestTemplate.getStateKey());

// Remove items (in a collection) in the specific range from pendingAdds.
// The old values of the removed sub map are kept, so that they will still be accessible in
Expand All @@ -256,7 +265,7 @@ public void clear() {
checkState(
!isClosed,
"OrderedList user state is no longer usable because it is closed for %s",
request.getStateKey());
requestTemplate.getStateKey());
isCleared = true;
// Create a new object for pendingRemoves and clear the mappings in pendingAdds.
// The entire tree range set of pendingRemoves and the old values in the pendingAdds are kept,
Expand All @@ -277,7 +286,7 @@ public void asyncClose() throws Exception {

if (!pendingRemoves.isEmpty()) {
for (Range<Instant> r : pendingRemoves.asRanges()) {
StateRequest.Builder stateRequest = this.request.toBuilder();
StateRequest.Builder stateRequest = this.requestTemplate.toBuilder();
stateRequest.setClear(StateClearRequest.newBuilder().build());
stateRequest
.getStateKeyBuilder()
Expand Down Expand Up @@ -307,7 +316,7 @@ public void asyncClose() throws Exception {
}
}
}
StateRequest.Builder stateRequest = this.request.toBuilder();
StateRequest.Builder stateRequest = this.requestTemplate.toBuilder();
stateRequest.getAppendBuilder().setData(outStream.toByteString());

CompletableFuture<StateResponse> response = beamFnStateClient.handle(stateRequest);
Expand Down

0 comments on commit c158b32

Please sign in to comment.