Skip to content

Commit

Permalink
Respond to PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Quinn-With-Two-Ns committed Jul 15, 2023
1 parent 75646f5 commit 354c471
Show file tree
Hide file tree
Showing 11 changed files with 36 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
public enum SdkFlag {
UNSET(0),
/*
* Changes behaviour of GetVersion to not yield if no previous call existed in history.
* Changes behavior of GetVersion to not yield if no previous call existed in history.
*/
SKIP_YIELD_ON_DEFAULT_VERSION(1),
UNKNOWN(Integer.MAX_VALUE);
Expand All @@ -38,14 +38,14 @@ public enum SdkFlag {
this.value = value;
}

public boolean Compare(int i) {
public boolean compare(int i) {
return value == i;
}

public static SdkFlag GetValue(int _id) {
public static SdkFlag getValue(int id) {
SdkFlag[] As = SdkFlag.values();
for (int i = 0; i < As.length; i++) {
if (As[i].Compare(_id)) return As[i];
if (As[i].compare(id)) return As[i];
}
return SdkFlag.UNKNOWN;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,21 @@

package io.temporal.internal.common;

import io.temporal.api.workflowservice.v1.GetSystemInfoResponse;
import io.temporal.workflow.Functions;
import java.util.ArrayList;
import java.util.List;
import java.util.EnumSet;

/** Represents all the flags that are currently set in a workflow execution. */
public final class SdkFlags {
private final GetSystemInfoResponse.Capabilities capabilities;
private final boolean supportSdkMetadata;
private final Functions.Func<Boolean> replaying;
// Flags that have been received from the server
private final List<SdkFlag> sdkFlags = new ArrayList<>();
// Flags that have been received from the server or have not been sent yet.
private final EnumSet<SdkFlag> sdkFlags = EnumSet.noneOf(SdkFlag.class);
// Flags that have been set this WFT that have not been sent to the server.
// Keep track of them separately, so we know what to send to the server.
private final List<SdkFlag> unsentSdkFlags = new ArrayList<>();
private final EnumSet<SdkFlag> unsentSdkFlags = EnumSet.noneOf(SdkFlag.class);

public SdkFlags(
GetSystemInfoResponse.Capabilities capabilities, Functions.Func<Boolean> replaying) {
this.capabilities = capabilities;
public SdkFlags(boolean supportSdkMetadata, Functions.Func<Boolean> replaying) {
this.supportSdkMetadata = supportSdkMetadata;
this.replaying = replaying;
}

Expand All @@ -47,7 +44,7 @@ public SdkFlags(
* @return True, as long as the server supports SDK flags
*/
public boolean setSdkFlag(SdkFlag flag) {
if (!capabilities.getSdkMetadata()) {
if (!supportSdkMetadata) {
return false;
}
sdkFlags.add(flag);
Expand All @@ -58,27 +55,24 @@ public boolean setSdkFlag(SdkFlag flag) {
* @return True if this flag may currently be used.
*/
public boolean tryUseSdkFlag(SdkFlag flag) {
if (!capabilities.getSdkMetadata()) {
if (!supportSdkMetadata) {
return false;
}

if (!replaying.apply()) {
sdkFlags.add(flag);
unsentSdkFlags.add(flag);
return true;
} else {
return unsentSdkFlags.contains(flag) || sdkFlags.contains(flag);
return sdkFlags.contains(flag);
}
}

/***
/**
* @return All flags set since the last call to takeNewSdkFlags.
*/
public List<Integer> takeNewSdkFlags() {
List<Integer> result = new ArrayList<>(unsentSdkFlags.size());
for (SdkFlag flag : unsentSdkFlags) {
sdkFlags.add(flag);
result.add(flag.getValue());
}
public EnumSet<SdkFlag> takeNewSdkFlags() {
EnumSet<SdkFlag> result = unsentSdkFlags.clone();
unsentSdkFlags.clear();
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.temporal.api.workflowservice.v1.GetSystemInfoResponse;
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponseOrBuilder;
import io.temporal.internal.Config;
import io.temporal.internal.common.SdkFlag;
import io.temporal.internal.common.UpdateMessage;
import io.temporal.internal.statemachines.ExecuteLocalActivityParameters;
import io.temporal.internal.statemachines.StatesMachinesCallback;
Expand Down Expand Up @@ -165,7 +166,11 @@ public WorkflowTaskResult handleWorkflowTask(
processLocalActivityRequests(wftHearbeatDeadline);
List<Command> commands = workflowStateMachines.takeCommands();
List<Message> messages = workflowStateMachines.takeMessages();
List<Integer> newSdkFlags = workflowStateMachines.takeNewSdkFlags();
EnumSet<SdkFlag> newFlags = workflowStateMachines.takeNewSdkFlags();
List<Integer> newSdkFlags = new ArrayList<>(newFlags.size());
for (SdkFlag flag : newFlags) {
newSdkFlags.add(flag.getValue());
}
if (context.isWorkflowMethodCompleted()) {
// it's important for query, otherwise the WorkflowTaskHandler is responsible for closing
// and invalidation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ private Result createCompletedWFTRequest(
}
completedRequest.setStickyAttributes(attributes);
}
if (result.getSdkFlags().size() > 0) {
if (!result.getSdkFlags().isEmpty()) {
completedRequest =
completedRequest.setSdkMetadata(
WorkflowTaskCompletedMetadata.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public WorkflowStateMachines(
this.commandSink = cancellableCommands::add;
this.stateMachineSink = stateMachineSink;
this.localActivityRequestSink = (request) -> localActivityRequests.add(request);
this.flags = new SdkFlags(capabilities, this::isReplaying);
this.flags = new SdkFlags(capabilities.getSdkMetadata(), this::isReplaying);
}

@VisibleForTesting
Expand All @@ -179,8 +179,7 @@ public WorkflowStateMachines(
this.commandSink = cancellableCommands::add;
this.stateMachineSink = stateMachineSink;
this.localActivityRequestSink = (request) -> localActivityRequests.add(request);
this.flags =
new SdkFlags(GetSystemInfoResponse.Capabilities.newBuilder().build(), this::isReplaying);
this.flags = new SdkFlags(false, this::isReplaying);
}

// TODO revisit and potentially remove workflowTaskStartedEventId at all from the state machines.
Expand Down Expand Up @@ -310,7 +309,7 @@ private void handleSingleEventLookahead(HistoryEvent event) {
WorkflowTaskCompletedEventAttributes completedEvent =
event.getWorkflowTaskCompletedEventAttributes();
for (Integer flag : completedEvent.getSdkMetadata().getLangUsedFlagsList()) {
SdkFlag sdkFlag = SdkFlag.GetValue(flag);
SdkFlag sdkFlag = SdkFlag.getValue(flag);
if (sdkFlag.equals(SdkFlag.UNKNOWN)) {
throw new IllegalArgumentException("Unknown SDK flag:" + flag);
}
Expand Down Expand Up @@ -563,9 +562,9 @@ public boolean tryUseSdkFlag(SdkFlag flag) {
}

/**
* @return List of all new flags set since the last call
* @return Set of all new flags set since the last call
*/
public List<Integer> takeNewSdkFlags() {
public EnumSet<SdkFlag> takeNewSdkFlags() {
return flags.takeNewSdkFlags();
}

Expand Down Expand Up @@ -924,7 +923,6 @@ public boolean getVersion(
versions.computeIfAbsent(
changeId,
(idKey) -> {
System.out.println("Creating VersionStateMachine");
return VersionStateMachine.newInstance(
changeId, this::isReplaying, commandSink, stateMachineSink);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void testAsyncActivityRetry() {
}

@Test
public void testAsyncActivityRetry_replay() throws Exception {
public void testAsyncActivityRetryReplay() throws Exception {
WorkflowReplayer.replayWorkflowExecutionFromResource(
"testAsyncActivityRetryHistory.json", TestAsyncActivityRetry.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public void testChildWorkflowRetry() {
* compatible with the client that supports the server side retry.
*/
@Test
public void testChildWorkflowRetry_replay() throws Exception {
public void testChildWorkflowRetryReplay() throws Exception {
WorkflowReplayer.replayWorkflowExecutionFromResource(
"testChildWorkflowRetryHistory.json", TestChildWorkflowRetryWorkflow.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void testGetVersionAndCancelTimer() {
}

@Test
public void testGetVersionAndCancelTimer_replay() throws Exception {
public void testGetVersionAndCancelTimerReplay() throws Exception {
WorkflowReplayer.replayWorkflowExecutionFromResource(
"get_version_after_scope_cancellation.json", testWorkflowRule.getWorker());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void testGetVersionInSignal() {
}

@Test
public void testGetVersionInSignal_replay() throws Exception {
public void testGetVersionInSignalReplay() throws Exception {
WorkflowReplayer.replayWorkflowExecutionFromResource(
"testGetVersionInSignalHistory.json", TestGetVersionInSignal.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void testGetVersionDefaultMultithreading() {
}

@Test
public void testGetVersionDefaultMultithreading_replay() throws Exception {
public void testGetVersionDefaultMultithreadingReplay() throws Exception {
WorkflowReplayer.replayWorkflowExecutionFromResource(
"testGetVersionDefaultMultithreadingHistory.json", TestGetVersionWorkflowImpl.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void testGetVersion() {
}

@Test
public void testGetVersion_replay() throws Exception {
public void testGetVersionReplay() throws Exception {
WorkflowReplayer.replayWorkflowExecutionFromResource(
"testGetVersionHistory.json", TestGetVersionWorkflowImpl.class);
}
Expand Down

0 comments on commit 354c471

Please sign in to comment.