Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make GetVersion more deterministic #1807

Merged
merged 3 commits into from
Jul 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.common;

/**
* SdkFlag represents a flag used to help version the sdk internally to make breaking changes in
* workflow logic.
*/
public enum SdkFlag {
UNSET(0),
/*
* Changes behavior of GetVersion to not yield if no previous call existed in history.
*/
SKIP_YIELD_ON_DEFAULT_VERSION(1),
UNKNOWN(Integer.MAX_VALUE);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why have an unknown flag? Just make the return from GetValue (sic) as Optional

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UNKNOWN is not the same as an empty optional

Copy link
Member

@cretz cretz Jul 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I'm saying is it shouldn't be the same. You should only have enumerates for real flags IMO. There's no reason to have all but one enumerate be a real flag IMO. Make it clear to the caller that the flag is unknown, not just that it happens to have another name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think returning UNKNOWN is clear to the caller that the flags meaning is not know. This is how we handle it in the other SDKs that implement flags as well

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eek. I hope we at least don't in TS, Python, and .NET. All those languages have a good way of representing an absent enum without having to work around that some values of the enum are representable in history and some aren't.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it was an absent enum I would use optional, but it really is the lack of knowledge about how to handle a value. I would rather leave it as is in Java since I think it is clearer and more consistent with other SDKs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't agree, but y'all's design choice here. At least it's internal.


private final int value;

SdkFlag(int value) {
this.value = value;
}

public boolean compare(int i) {
return value == i;
}

public static SdkFlag getValue(int id) {
SdkFlag[] as = SdkFlag.values();
for (int i = 0; i < as.length; i++) {
if (as[i].compare(id)) return as[i];
}
return SdkFlag.UNKNOWN;
}

public int getValue() {
return value;
}
Quinn-With-Two-Ns marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.common;

import io.temporal.workflow.Functions;
import java.util.EnumSet;

/** Represents all the flags that are currently set in a workflow execution. */
public final class SdkFlags {
private final boolean supportSdkMetadata;
private final Functions.Func<Boolean> replaying;
// Flags that have been received from the server or have not been sent yet.
private final EnumSet<SdkFlag> sdkFlags = EnumSet.noneOf(SdkFlag.class);
// Flags that have been set this WFT that have not been sent to the server.
// Keep track of them separately, so we know what to send to the server.
private final EnumSet<SdkFlag> unsentSdkFlags = EnumSet.noneOf(SdkFlag.class);

public SdkFlags(boolean supportSdkMetadata, Functions.Func<Boolean> replaying) {
this.supportSdkMetadata = supportSdkMetadata;
this.replaying = replaying;
}

/**
* Marks a flag as usable regardless of replay status.
*
* @return True, as long as the server supports SDK flags
*/
public boolean setSdkFlag(SdkFlag flag) {
if (!supportSdkMetadata) {
return false;
}
sdkFlags.add(flag);
return true;
}

/**
* @return True if this flag may currently be used.
*/
public boolean tryUseSdkFlag(SdkFlag flag) {
if (!supportSdkMetadata) {
return false;
}

if (!replaying.apply()) {
sdkFlags.add(flag);
unsentSdkFlags.add(flag);
return true;
} else {
return sdkFlags.contains(flag);
}
}

/**
* @return All flags set since the last call to takeNewSdkFlags.
*/
public EnumSet<SdkFlag> takeNewSdkFlags() {
EnumSet<SdkFlag> result = unsentSdkFlags.clone();
unsentSdkFlags.clear();
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.temporal.api.common.v1.*;
import io.temporal.api.failure.v1.Failure;
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse;
import io.temporal.internal.common.SdkFlag;
import io.temporal.internal.statemachines.ExecuteActivityParameters;
import io.temporal.internal.statemachines.ExecuteLocalActivityParameters;
import io.temporal.internal.statemachines.LocalActivityCallback;
Expand Down Expand Up @@ -269,8 +270,9 @@ void mutableSideEffect(
* @param minSupported min version supported for the change
* @param maxSupported max version supported for the change
* @param callback used to return version
* @return True if the identifier is not present in history
*/
void getVersion(
boolean getVersion(
String changeId,
int minSupported,
int maxSupported,
Expand Down Expand Up @@ -382,4 +384,9 @@ void getVersion(

/** Updates or inserts search attributes used to index workflows. */
void upsertSearchAttributes(@Nonnull SearchAttributes searchAttributes);

/**
* @return true if this flag may currently be used.
*/
boolean tryUseSdkFlag(SdkFlag flag);
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.temporal.api.history.v1.WorkflowExecutionStartedEventAttributes;
import io.temporal.failure.CanceledFailure;
import io.temporal.internal.common.ProtobufTimeUtils;
import io.temporal.internal.common.SdkFlag;
import io.temporal.internal.statemachines.*;
import io.temporal.internal.worker.SingleWorkerOptions;
import io.temporal.workflow.Functions;
Expand Down Expand Up @@ -240,6 +241,11 @@ public boolean isReplaying() {
return workflowStateMachines.isReplaying();
}

@Override
public boolean tryUseSdkFlag(SdkFlag flag) {
return workflowStateMachines.tryUseSdkFlag(flag);
}

@Override
public Functions.Proc1<RuntimeException> newTimer(
Duration delay, Functions.Proc1<RuntimeException> callback) {
Expand Down Expand Up @@ -290,12 +296,12 @@ public void mutableSideEffect(
}

@Override
public void getVersion(
public boolean getVersion(
String changeId,
int minSupported,
int maxSupported,
Functions.Proc2<Integer, RuntimeException> callback) {
workflowStateMachines.getVersion(changeId, minSupported, maxSupported, callback);
return workflowStateMachines.getVersion(changeId, minSupported, maxSupported, callback);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@
import io.temporal.api.protocol.v1.Message;
import io.temporal.api.query.v1.WorkflowQuery;
import io.temporal.api.query.v1.WorkflowQueryResult;
import io.temporal.api.workflowservice.v1.GetSystemInfoResponse;
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponseOrBuilder;
import io.temporal.internal.Config;
import io.temporal.internal.common.SdkFlag;
import io.temporal.internal.common.UpdateMessage;
import io.temporal.internal.statemachines.ExecuteLocalActivityParameters;
import io.temporal.internal.statemachines.StatesMachinesCallback;
Expand Down Expand Up @@ -90,13 +92,16 @@ class ReplayWorkflowRunTaskHandler implements WorkflowRunTaskHandler {

private final ReplayWorkflowExecutor replayWorkflowExecutor;

private final GetSystemInfoResponse.Capabilities capabilities;

ReplayWorkflowRunTaskHandler(
String namespace,
ReplayWorkflow workflow,
PollWorkflowTaskQueueResponseOrBuilder workflowTask,
SingleWorkerOptions workerOptions,
Scope metricsScope,
LocalActivityDispatcher localActivityDispatcher) {
LocalActivityDispatcher localActivityDispatcher,
GetSystemInfoResponse.Capabilities capabilities) {
HistoryEvent startedEvent = workflowTask.getHistory().getEvents(0);
if (!startedEvent.hasWorkflowExecutionStartedEventAttributes()) {
throw new IllegalArgumentException(
Expand All @@ -107,7 +112,8 @@ class ReplayWorkflowRunTaskHandler implements WorkflowRunTaskHandler {
this.localActivityDispatcher = localActivityDispatcher;
this.workflow = workflow;

this.workflowStateMachines = new WorkflowStateMachines(new StatesMachinesCallbackImpl());
this.workflowStateMachines =
new WorkflowStateMachines(new StatesMachinesCallbackImpl(), capabilities);
String fullReplayDirectQueryType =
workflowTask.hasQuery() ? workflowTask.getQuery().getQueryType() : null;
this.context =
Expand All @@ -125,6 +131,7 @@ class ReplayWorkflowRunTaskHandler implements WorkflowRunTaskHandler {
new ReplayWorkflowExecutor(workflow, workflowStateMachines, context);
this.localActivityCompletionSink = localActivityCompletionQueue::add;
this.localActivityMeteringHelper = new LocalActivityMeteringHelper();
this.capabilities = capabilities;
}

@Override
Expand Down Expand Up @@ -159,6 +166,11 @@ public WorkflowTaskResult handleWorkflowTask(
processLocalActivityRequests(wftHearbeatDeadline);
List<Command> commands = workflowStateMachines.takeCommands();
List<Message> messages = workflowStateMachines.takeMessages();
EnumSet<SdkFlag> newFlags = workflowStateMachines.takeNewSdkFlags();
List<Integer> newSdkFlags = new ArrayList<>(newFlags.size());
for (SdkFlag flag : newFlags) {
newSdkFlags.add(flag.getValue());
}
if (context.isWorkflowMethodCompleted()) {
// it's important for query, otherwise the WorkflowTaskHandler is responsible for closing
// and invalidation
Expand All @@ -175,6 +187,7 @@ public WorkflowTaskResult handleWorkflowTask(
.setFinalCommand(context.isWorkflowMethodCompleted())
.setForceWorkflowTask(localActivityTaskCount > 0 && !context.isWorkflowMethodCompleted())
.setNonfirstLocalActivityAttempts(localActivityMeteringHelper.getNonfirstAttempts())
.setSdkFlags(newSdkFlags)
.build();
} finally {
lock.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.temporal.api.failure.v1.Failure;
import io.temporal.api.history.v1.HistoryEvent;
import io.temporal.api.query.v1.WorkflowQuery;
import io.temporal.api.sdk.v1.WorkflowTaskCompletedMetadata;
import io.temporal.api.taskqueue.v1.StickyExecutionAttributes;
import io.temporal.api.taskqueue.v1.TaskQueue;
import io.temporal.api.workflowservice.v1.*;
Expand Down Expand Up @@ -235,6 +236,13 @@ private Result createCompletedWFTRequest(
}
completedRequest.setStickyAttributes(attributes);
}
if (!result.getSdkFlags().isEmpty()) {
completedRequest =
completedRequest.setSdkMetadata(
WorkflowTaskCompletedMetadata.newBuilder()
.addAllLangUsedFlags(result.getSdkFlags())
.build());
}
return new Result(
workflowType,
completedRequest.build(),
Expand Down Expand Up @@ -383,7 +391,13 @@ private WorkflowRunTaskHandler createStatefulHandler(
}
ReplayWorkflow workflow = workflowFactory.getWorkflow(workflowType, workflowExecution);
return new ReplayWorkflowRunTaskHandler(
namespace, workflow, workflowTask, options, metricsScope, localActivityDispatcher);
namespace,
workflow,
workflowTask,
options,
metricsScope,
localActivityDispatcher,
service.getServerCapabilities().get());
}

private void resetStickyTaskQueue(WorkflowExecution execution) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public static final class Builder {
private Map<String, WorkflowQueryResult> queryResults;
private boolean forceWorkflowTask;
private int nonfirstLocalActivityAttempts;
private List<Integer> sdkFlags;

public Builder setCommands(List<Command> commands) {
this.commands = commands;
Expand Down Expand Up @@ -71,14 +72,20 @@ public Builder setNonfirstLocalActivityAttempts(int nonfirstLocalActivityAttempt
return this;
}

public Builder setSdkFlags(List<Integer> sdkFlags) {
this.sdkFlags = sdkFlags;
return this;
}

public WorkflowTaskResult build() {
return new WorkflowTaskResult(
commands == null ? Collections.emptyList() : commands,
messages == null ? Collections.emptyList() : messages,
queryResults == null ? Collections.emptyMap() : queryResults,
finalCommand,
forceWorkflowTask,
nonfirstLocalActivityAttempts);
nonfirstLocalActivityAttempts,
sdkFlags == null ? Collections.emptyList() : sdkFlags);
}
}

Expand All @@ -88,14 +95,16 @@ public WorkflowTaskResult build() {
private final Map<String, WorkflowQueryResult> queryResults;
private final boolean forceWorkflowTask;
private final int nonfirstLocalActivityAttempts;
private final List<Integer> sdkFlags;

private WorkflowTaskResult(
List<Command> commands,
List<Message> messages,
Map<String, WorkflowQueryResult> queryResults,
boolean finalCommand,
boolean forceWorkflowTask,
int nonfirstLocalActivityAttempts) {
int nonfirstLocalActivityAttempts,
List<Integer> sdkFlags) {
this.commands = commands;
this.messages = messages;
this.nonfirstLocalActivityAttempts = nonfirstLocalActivityAttempts;
Expand All @@ -105,6 +114,7 @@ private WorkflowTaskResult(
this.queryResults = queryResults;
this.finalCommand = finalCommand;
this.forceWorkflowTask = forceWorkflowTask;
this.sdkFlags = sdkFlags;
}

public List<Command> getCommands() {
Expand All @@ -131,4 +141,8 @@ public boolean isForceWorkflowTask() {
public int getNonfirstLocalActivityAttempts() {
return nonfirstLocalActivityAttempts;
}

public List<Integer> getSdkFlags() {
return sdkFlags;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -369,11 +369,12 @@ private VersionStateMachine(
this.stateMachineSink = stateMachineSink;
}

public void getVersion(
public State getVersion(
int minSupported, int maxSupported, Functions.Proc2<Integer, RuntimeException> callback) {
InvocationStateMachine ism = new InvocationStateMachine(minSupported, maxSupported, callback);
ism.explicitEvent(ExplicitEvent.CHECK_EXECUTION_STATE);
ism.explicitEvent(ExplicitEvent.SCHEDULE);
return ism.getState();
}

public void handleNonMatchingEvent(HistoryEvent event) {
Expand Down
Loading
Loading