diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java index 1381b605ff..d515817b29 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java @@ -748,9 +748,19 @@ public DynamicUpdateHandler getHandler() { R sideEffect(Class resultClass, Type resultType, Func func); + R sideEffect(Class resultClass, Type resultType, Func func, SideEffectOptions options); + R mutableSideEffect( String id, Class resultClass, Type resultType, BiPredicate updated, Func func); + R mutableSideEffect( + String id, + Class resultClass, + Type resultType, + BiPredicate updated, + Func func, + MutableSideEffectOptions options); + int getVersion(String changeId, int minSupported, int maxSupported); void continueAsNew(ContinueAsNewInput input); diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptorBase.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptorBase.java index d0fb32a32d..9d99d4c78b 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptorBase.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptorBase.java @@ -3,7 +3,9 @@ import com.uber.m3.tally.Scope; import io.temporal.common.SearchAttributeUpdate; import io.temporal.workflow.Functions.Func; +import io.temporal.workflow.MutableSideEffectOptions; import io.temporal.workflow.Promise; +import io.temporal.workflow.SideEffectOptions; import io.temporal.workflow.TimerOptions; import java.lang.reflect.Type; import java.time.Duration; @@ -88,12 +90,29 @@ public R sideEffect(Class resultClass, Type resultType, Func func) { return next.sideEffect(resultClass, resultType, func); } + @Override + public R sideEffect( + Class resultClass, Type resultType, Func func, SideEffectOptions options) { + return next.sideEffect(resultClass, resultType, func, options); + } + @Override public R mutableSideEffect( String id, Class resultClass, Type resultType, BiPredicate updated, Func func) { return next.mutableSideEffect(id, resultClass, resultType, updated, func); } + @Override + public R mutableSideEffect( + String id, + Class resultClass, + Type resultType, + BiPredicate updated, + Func func, + MutableSideEffectOptions options) { + return next.mutableSideEffect(id, resultClass, resultType, updated, func, options); + } + @Override public int getVersion(String changeId, int minSupported, int maxSupported) { return next.getVersion(changeId, minSupported, maxSupported); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java index 6006ee19c9..19a488e775 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java @@ -224,9 +224,13 @@ Functions.Proc1 newTimer( * executing operations that rely on non-global dependencies and can fail. * * @param func function that is called once to return a value. + * @param userMetadata user metadata to be associated with the side effect. * @param callback function that accepts the result of the side effect. */ - void sideEffect(Func> func, Functions.Proc1> callback); + void sideEffect( + Func> func, + UserMetadata userMetadata, + Functions.Proc1> callback); /** * {@code mutableSideEffect} is similar to {@code sideEffect} in allowing calls of @@ -247,6 +251,7 @@ Functions.Proc1 newTimer( * * @param id id of the side effect call. It links multiple calls together. Calls with different * ids are completely independent. + * @param userMetadata user metadata to attach to the marker event. * @param func function that gets as input a result of a previous {@code mutableSideEffect} call. * The function executes its business logic (like checking config value) and if value didn't * change returns {@link Optional#empty()}. If value has changed and needs to be recorded in @@ -256,6 +261,7 @@ Functions.Proc1 newTimer( */ void mutableSideEffect( String id, + UserMetadata userMetadata, Func1, Optional> func, Functions.Proc1> callback); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java index 3f0c2bcc95..dd1844a316 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java @@ -315,16 +315,19 @@ private void handleTimerCallback(Functions.Proc1 callback, His @Override public void sideEffect( - Func> func, Functions.Proc1> callback) { - workflowStateMachines.sideEffect(func, callback); + Func> func, + UserMetadata metadata, + Functions.Proc1> callback) { + workflowStateMachines.sideEffect(func, metadata, callback); } @Override public void mutableSideEffect( String id, + UserMetadata metadata, Func1, Optional> func, Functions.Proc1> callback) { - workflowStateMachines.mutableSideEffect(id, func, callback); + workflowStateMachines.mutableSideEffect(id, metadata, func, callback); } @Override diff --git a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/MutableSideEffectStateMachine.java b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/MutableSideEffectStateMachine.java index df6d1de5d1..238924b27d 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/MutableSideEffectStateMachine.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/MutableSideEffectStateMachine.java @@ -8,6 +8,7 @@ import io.temporal.api.enums.v1.EventType; import io.temporal.api.history.v1.HistoryEvent; import io.temporal.api.history.v1.MarkerRecordedEventAttributes; +import io.temporal.api.sdk.v1.UserMetadata; import io.temporal.common.converter.DefaultDataConverter; import io.temporal.common.converter.StdConverterBackwardsCompatAdapter; import io.temporal.workflow.Functions; @@ -24,6 +25,7 @@ final class MutableSideEffectStateMachine { static final String MUTABLE_SIDE_EFFECT_MARKER_NAME = "MutableSideEffect"; private final String id; + private UserMetadata metadata; private final Functions.Func replaying; private final Functions.Proc1 commandSink; @@ -174,7 +176,8 @@ State createMarker() { .setMarkerName(MUTABLE_SIDE_EFFECT_MARKER_NAME) .putAllDetails(details) .build(); - addCommand(StateMachineCommandUtils.createRecordMarker(markerAttributes)); + addCommand(StateMachineCommandUtils.createRecordMarker(markerAttributes, metadata)); + metadata = null; // only used once currentSkipCount = 0; return State.MARKER_COMMAND_CREATED; } @@ -244,18 +247,22 @@ void cancelCommandNotifyCachedResult() { /** Creates new MutableSideEffectStateMachine */ public static MutableSideEffectStateMachine newInstance( String id, + UserMetadata metadata, Functions.Func replaying, Functions.Proc1 commandSink, Functions.Proc1 stateMachineSink) { - return new MutableSideEffectStateMachine(id, replaying, commandSink, stateMachineSink); + return new MutableSideEffectStateMachine( + id, metadata, replaying, commandSink, stateMachineSink); } private MutableSideEffectStateMachine( String id, + UserMetadata metadata, Functions.Func replaying, Functions.Proc1 commandSink, Functions.Proc1 stateMachineSink) { this.id = Objects.requireNonNull(id); + this.metadata = metadata; this.replaying = Objects.requireNonNull(replaying); this.commandSink = Objects.requireNonNull(commandSink); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/SideEffectStateMachine.java b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/SideEffectStateMachine.java index 4c85033448..44034a7497 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/SideEffectStateMachine.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/SideEffectStateMachine.java @@ -6,6 +6,7 @@ import io.temporal.api.enums.v1.CommandType; import io.temporal.api.enums.v1.EventType; import io.temporal.api.history.v1.MarkerRecordedEventAttributes; +import io.temporal.api.sdk.v1.UserMetadata; import io.temporal.workflow.Functions; import java.util.HashMap; import java.util.Map; @@ -33,6 +34,7 @@ enum State { static final String MARKER_DATA_KEY = "data"; static final String SIDE_EFFECT_MARKER_NAME = "SideEffect"; + private UserMetadata metadata; private final Functions.Proc1> callback; private final Functions.Func> func; private final Functions.Func replaying; @@ -72,26 +74,30 @@ enum State { /** * Creates new SideEffect Marker * + * @param metadata user metadata to attach to the side effect marker. * @param func used to produce side effect value. null if replaying. * @param callback returns side effect value or failure * @param commandSink callback to send commands to */ public static void newInstance( + UserMetadata metadata, Functions.Func replaying, Functions.Func> func, Functions.Proc1> callback, Functions.Proc1 commandSink, Functions.Proc1 stateMachineSink) { - new SideEffectStateMachine(replaying, func, callback, commandSink, stateMachineSink); + new SideEffectStateMachine(metadata, replaying, func, callback, commandSink, stateMachineSink); } private SideEffectStateMachine( + UserMetadata metadata, Functions.Func replaying, Functions.Func> func, Functions.Proc1> callback, Functions.Proc1 commandSink, Functions.Proc1 stateMachineSink) { super(STATE_MACHINE_DEFINITION, commandSink, stateMachineSink); + this.metadata = metadata; this.replaying = replaying; this.func = func; this.callback = callback; @@ -121,11 +127,18 @@ private State createMarkerCommand() { .build(); transitionTo = State.MARKER_COMMAND_CREATED; } - addCommand( + + Command.Builder command = Command.newBuilder() .setCommandType(CommandType.COMMAND_TYPE_RECORD_MARKER) - .setRecordMarkerCommandAttributes(markerAttributes) - .build()); + .setRecordMarkerCommandAttributes(markerAttributes); + + if (metadata != null) { + command.setUserMetadata(metadata); + metadata = null; + } + + addCommand(command.build()); return transitionTo; } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/StateMachineCommandUtils.java b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/StateMachineCommandUtils.java index fe2c7bd7f2..7a1fa95d93 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/StateMachineCommandUtils.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/StateMachineCommandUtils.java @@ -3,20 +3,27 @@ import io.temporal.api.command.v1.Command; import io.temporal.api.command.v1.RecordMarkerCommandAttributes; import io.temporal.api.enums.v1.CommandType; +import io.temporal.api.sdk.v1.UserMetadata; +import javax.annotation.Nullable; class StateMachineCommandUtils { public static final Command RECORD_MARKER_FAKE_COMMAND = - createRecordMarker(RecordMarkerCommandAttributes.getDefaultInstance()); + createRecordMarker(RecordMarkerCommandAttributes.getDefaultInstance(), null); - public static Command createRecordMarker(RecordMarkerCommandAttributes attributes) { - return Command.newBuilder() - .setCommandType(CommandType.COMMAND_TYPE_RECORD_MARKER) - .setRecordMarkerCommandAttributes(attributes) - .build(); + public static Command createRecordMarker( + RecordMarkerCommandAttributes attributes, @Nullable UserMetadata metadata) { + Command.Builder command = + Command.newBuilder() + .setCommandType(CommandType.COMMAND_TYPE_RECORD_MARKER) + .setRecordMarkerCommandAttributes(attributes); + if (metadata != null) { + command.setUserMetadata(metadata); + } + return command.build(); } public static Command createFakeMarkerCommand(String markerName) { return createRecordMarker( - RecordMarkerCommandAttributes.newBuilder().setMarkerName(markerName).build()); + RecordMarkerCommandAttributes.newBuilder().setMarkerName(markerName).build(), null); } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/VersionStateMachine.java b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/VersionStateMachine.java index a541d7e81a..560920cac4 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/VersionStateMachine.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/VersionStateMachine.java @@ -240,7 +240,7 @@ State createMarkerExecuting() { writeVersionChangeSA = sa != null; RecordMarkerCommandAttributes markerAttributes = VersionMarkerUtils.createMarkerAttributes(changeId, version, writeVersionChangeSA); - Command markerCommand = StateMachineCommandUtils.createRecordMarker(markerAttributes); + Command markerCommand = StateMachineCommandUtils.createRecordMarker(markerAttributes, null); addCommand(markerCommand); if (writeVersionChangeSA) { hasWrittenVersionChangeSA = true; diff --git a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java index 15efc2add7..eda2b4767d 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java @@ -1157,9 +1157,12 @@ public Random newRandom() { } public void sideEffect( - Functions.Func> func, Functions.Proc1> callback) { + Functions.Func> func, + UserMetadata userMetadata, + Functions.Proc1> callback) { checkEventLoopExecuting(); SideEffectStateMachine.newInstance( + userMetadata, this::isReplaying, func, (payloads) -> { @@ -1179,6 +1182,7 @@ public void sideEffect( */ public void mutableSideEffect( String id, + UserMetadata userMetadata, Functions.Func1, Optional> func, Functions.Proc1> callback) { checkEventLoopExecuting(); @@ -1187,7 +1191,7 @@ public void mutableSideEffect( id, (idKey) -> MutableSideEffectStateMachine.newInstance( - idKey, this::isReplaying, commandSink, stateMachineSink)); + idKey, userMetadata, this::isReplaying, commandSink, stateMachineSink)); stateMachine.mutableSideEffect( func, (r) -> { diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java index 73b58e0932..ad805e0eb1 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java @@ -1027,6 +1027,15 @@ public Promise newTimer(Duration delay, TimerOptions options) { @Override public R sideEffect(Class resultClass, Type resultType, Func func) { + return sideEffect(resultClass, resultType, func, SideEffectOptions.newBuilder().build()); + } + + @Override + public R sideEffect( + Class resultClass, Type resultType, Func func, SideEffectOptions options) { + @Nullable + UserMetadata userMetadata = + makeUserMetaData(options.getSummary(), null, dataConverterWithCurrentWorkflowContext); try { CompletablePromise> result = Workflow.newPromise(); replayContext.sideEffect( @@ -1039,6 +1048,7 @@ public R sideEffect(Class resultClass, Type resultType, Func func) { readOnly = false; } }, + userMetadata, (p) -> runner.executeInWorkflowThread( "side-effect-callback", () -> result.complete(Objects.requireNonNull(p)))); @@ -1054,8 +1064,23 @@ public R sideEffect(Class resultClass, Type resultType, Func func) { @Override public R mutableSideEffect( String id, Class resultClass, Type resultType, BiPredicate updated, Func func) { + return mutableSideEffect( + id, resultClass, resultType, updated, func, MutableSideEffectOptions.newBuilder().build()); + } + + @Override + public R mutableSideEffect( + String id, + Class resultClass, + Type resultType, + BiPredicate updated, + Func func, + MutableSideEffectOptions options) { + @Nullable + UserMetadata userMetadata = + makeUserMetaData(options.getSummary(), null, dataConverterWithCurrentWorkflowContext); try { - return mutableSideEffectImpl(id, resultClass, resultType, updated, func); + return mutableSideEffectImpl(id, userMetadata, resultClass, resultType, updated, func); } catch (Exception e) { // MutableSideEffect cannot throw normal exception as it can lead to non-deterministic // behavior. So fail the workflow task by throwing an Error. @@ -1064,11 +1089,17 @@ public R mutableSideEffect( } private R mutableSideEffectImpl( - String id, Class resultClass, Type resultType, BiPredicate updated, Func func) { + String id, + UserMetadata metadata, + Class resultClass, + Type resultType, + BiPredicate updated, + Func func) { CompletablePromise> result = Workflow.newPromise(); AtomicReference unserializedResult = new AtomicReference<>(); replayContext.mutableSideEffect( id, + metadata, (storedBinary) -> { Optional stored = storedBinary.map( diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java index 42e50c9a3d..099b2f9b48 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java @@ -525,6 +525,12 @@ public static R sideEffect(Class resultClass, Type resultType, Func fu return getWorkflowOutboundInterceptor().sideEffect(resultClass, resultType, func); } + public static R sideEffect( + Class resultClass, Type resultType, Func func, SideEffectOptions options) { + assertNotReadOnly("side effect"); + return getWorkflowOutboundInterceptor().sideEffect(resultClass, resultType, func, options); + } + public static R mutableSideEffect( String id, Class resultClass, Type resultType, BiPredicate updated, Func func) { assertNotReadOnly("mutable side effect"); @@ -532,6 +538,18 @@ public static R mutableSideEffect( .mutableSideEffect(id, resultClass, resultType, updated, func); } + public static R mutableSideEffect( + String id, + Class resultClass, + Type resultType, + BiPredicate updated, + Func func, + MutableSideEffectOptions options) { + assertNotReadOnly("mutable side effect"); + return getWorkflowOutboundInterceptor() + .mutableSideEffect(id, resultClass, resultType, updated, func, options); + } + public static int getVersion(String changeId, int minSupported, int maxSupported) { assertNotReadOnly("get version"); return getWorkflowOutboundInterceptor().getVersion(changeId, minSupported, maxSupported); diff --git a/temporal-sdk/src/main/java/io/temporal/workflow/MutableSideEffectOptions.java b/temporal-sdk/src/main/java/io/temporal/workflow/MutableSideEffectOptions.java new file mode 100644 index 0000000000..f475d9aa2c --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/workflow/MutableSideEffectOptions.java @@ -0,0 +1,87 @@ +package io.temporal.workflow; + +import io.temporal.common.Experimental; +import java.util.Objects; + +/** MutableSideEffectOptions is used to specify options for a side effect. */ +public class MutableSideEffectOptions { + + public static Builder newBuilder() { + return new MutableSideEffectOptions.Builder(); + } + + public static Builder newBuilder(MutableSideEffectOptions options) { + return new MutableSideEffectOptions.Builder(options); + } + + public static MutableSideEffectOptions getDefaultInstance() { + return DEFAULT_INSTANCE; + } + + private static final MutableSideEffectOptions DEFAULT_INSTANCE; + + static { + DEFAULT_INSTANCE = MutableSideEffectOptions.newBuilder().build(); + } + + public static final class Builder { + private String summary; + + private Builder() {} + + private Builder(MutableSideEffectOptions options) { + if (options == null) { + return; + } + this.summary = options.summary; + } + + /** + * Single-line fixed summary for this mutable side effect that will appear in UI/CLI. This can + * be in single-line Temporal Markdown format. + * + *

Default is none/empty. + */ + @Experimental + public MutableSideEffectOptions.Builder setSummary(String summary) { + this.summary = summary; + return this; + } + + public MutableSideEffectOptions build() { + return new MutableSideEffectOptions(summary); + } + } + + private final String summary; + + private MutableSideEffectOptions(String summary) { + this.summary = summary; + } + + public String getSummary() { + return summary; + } + + public MutableSideEffectOptions.Builder toBuilder() { + return new MutableSideEffectOptions.Builder(this); + } + + @Override + public String toString() { + return "MutableSideEffectOptions{" + "summary='" + summary + '\'' + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + MutableSideEffectOptions that = (MutableSideEffectOptions) o; + return Objects.equals(summary, that.summary); + } + + @Override + public int hashCode() { + return Objects.hash(summary); + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/workflow/SideEffectOptions.java b/temporal-sdk/src/main/java/io/temporal/workflow/SideEffectOptions.java new file mode 100644 index 0000000000..9a56d2ebb3 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/workflow/SideEffectOptions.java @@ -0,0 +1,87 @@ +package io.temporal.workflow; + +import io.temporal.common.Experimental; +import java.util.Objects; + +/** SideEffectOptions is used to specify options for a side effect. */ +public class SideEffectOptions { + + public static Builder newBuilder() { + return new SideEffectOptions.Builder(); + } + + public static Builder newBuilder(SideEffectOptions options) { + return new SideEffectOptions.Builder(options); + } + + public static SideEffectOptions getDefaultInstance() { + return DEFAULT_INSTANCE; + } + + private static final SideEffectOptions DEFAULT_INSTANCE; + + static { + DEFAULT_INSTANCE = SideEffectOptions.newBuilder().build(); + } + + public static final class Builder { + private String summary; + + private Builder() {} + + private Builder(SideEffectOptions options) { + if (options == null) { + return; + } + this.summary = options.summary; + } + + /** + * Single-line fixed summary for this side effect that will appear in UI/CLI. This can be in + * single-line Temporal Markdown format. + * + *

Default is none/empty. + */ + @Experimental + public SideEffectOptions.Builder setSummary(String summary) { + this.summary = summary; + return this; + } + + public SideEffectOptions build() { + return new SideEffectOptions(summary); + } + } + + private final String summary; + + private SideEffectOptions(String summary) { + this.summary = summary; + } + + public String getSummary() { + return summary; + } + + public SideEffectOptions.Builder toBuilder() { + return new SideEffectOptions.Builder(this); + } + + @Override + public String toString() { + return "SideEffectOptions{" + "summary='" + summary + '\'' + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + SideEffectOptions that = (SideEffectOptions) o; + return Objects.equals(summary, that.summary); + } + + @Override + public int hashCode() { + return Objects.hash(summary); + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java b/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java index 50412665cf..61c787757f 100644 --- a/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java +++ b/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java @@ -821,6 +821,118 @@ public static R sideEffect(Class resultClass, Type resultType, Func fu return WorkflowInternal.sideEffect(resultClass, resultType, func); } + /** + * Executes the provided function once, records its result into the workflow history. The recorded + * result on history will be returned without executing the provided function during replay. This + * guarantees the deterministic requirement for workflow as the exact same result will be returned + * in replay. Common use case is to run some short non-deterministic code in workflow, like + * getting random number. The only way to fail SideEffect is to panic which causes workflow task + * failure. The workflow task after timeout is rescheduled and re-executed giving SideEffect + * another chance to succeed. + * + *

Caution: do not use sideEffect function to modify any workflow state. Only use the + * SideEffect's return value. For example this code is BROKEN: + * + *


+   *  // Bad example:
+   *  AtomicInteger random = new AtomicInteger();
+   *  Workflow.sideEffect(() -> {
+   *         random.set(random.nextInt(100));
+   *         return null;
+   *  });
+   *  // random will always be 0 in replay, thus this code is non-deterministic
+   *  if random.get() < 50 {
+   *         ....
+   *  } else {
+   *         ....
+   *  }
+   * 
+ * + * On replay the provided function is not executed, the random will always be 0, and the workflow + * could take a different path breaking the determinism. + * + *

Here is the correct way to use sideEffect: + * + *


+   *  // Good example:
+   *  int random = Workflow.sideEffect(Integer.class, () -> random.nextInt(100));
+   *  if random < 50 {
+   *         ....
+   *  } else {
+   *         ....
+   *  }
+   * 
+ * + * If function throws any exception it is not delivered to the workflow code. It is wrapped in + * {@link Error} causing failure of the current workflow task. + * + * @param resultClass class of the side effect + * @param func function that returns side effect value + * @param options side effect options + * @return value of the side effect + * @see #mutableSideEffect(String, Class, BiPredicate, Functions.Func) + */ + public static R sideEffect(Class resultClass, Func func, SideEffectOptions options) { + return WorkflowInternal.sideEffect(resultClass, resultClass, func, options); + } + + /** + * Executes the provided function once, records its result into the workflow history. The recorded + * result on history will be returned without executing the provided function during replay. This + * guarantees the deterministic requirement for workflow as the exact same result will be returned + * in replay. Common use case is to run some short non-deterministic code in workflow, like + * getting random number. The only way to fail SideEffect is to panic which causes workflow task + * failure. The workflow task after timeout is rescheduled and re-executed giving SideEffect + * another chance to succeed. + * + *

Caution: do not use sideEffect function to modify any workflow state. Only use the + * SideEffect's return value. For example this code is BROKEN: + * + *


+   *  // Bad example:
+   *  AtomicInteger random = new AtomicInteger();
+   *  Workflow.sideEffect(() -> {
+   *         random.set(random.nextInt(100));
+   *         return null;
+   *  });
+   *  // random will always be 0 in replay, thus this code is non-deterministic
+   *  if random.get() < 50 {
+   *         ....
+   *  } else {
+   *         ....
+   *  }
+   * 
+ * + * On replay the provided function is not executed, the random will always be 0, and the workflow + * could take a different path breaking the determinism. + * + *

Here is the correct way to use sideEffect: + * + *


+   *  // Good example:
+   *  int random = Workflow.sideEffect(Integer.class, () -> random.nextInt(100));
+   *  if random < 50 {
+   *         ....
+   *  } else {
+   *         ....
+   *  }
+   * 
+ * + * If function throws any exception it is not delivered to the workflow code. It is wrapped in + * {@link Error} causing failure of the current workflow task. + * + * @param resultClass class of the side effect + * @param resultType type of the side effect. Differs from resultClass for generic types. + * @param func function that returns side effect value + * @param options side effect options + * @return value of the side effect + * @see #mutableSideEffect(String, Class, BiPredicate, Functions.Func) + */ + public static R sideEffect( + Class resultClass, Type resultType, Func func, SideEffectOptions options) { + return WorkflowInternal.sideEffect(resultClass, resultType, func, options); + } + /** * {@code mutableSideEffect} is similar to {@link #sideEffect(Class, Functions.Func)} in allowing * calls of non-deterministic functions from workflow code. @@ -890,6 +1002,86 @@ public static R mutableSideEffect( return WorkflowInternal.mutableSideEffect(id, resultClass, resultType, updated, func); } + /** + * {@code mutableSideEffect} is similar to {@link #sideEffect(Class, Functions.Func)} in allowing + * calls of non-deterministic functions from workflow code. + * + *

The difference between {@code mutableSideEffect} and {@link #sideEffect(Class, + * Functions.Func)} is that every new {@code sideEffect} call in non-replay mode results in a new + * marker event recorded into the history. However, {@code mutableSideEffect} only records a new + * marker if a value has changed. During the replay, {@code mutableSideEffect} will not execute + * the function again, but it will return the exact same value as it was returning during the + * non-replay run. + * + *

One good use case of {@code mutableSideEffect} is to access a dynamically changing config + * without breaking determinism. Even if called very frequently the config value is recorded only + * when it changes not causing any performance degradation due to a large history size. + * + *

Caution: do not use {@code mutableSideEffect} function to modify any workflow state. Only + * use the mutableSideEffect's return value. + * + *

If function throws any exception it is not delivered to the workflow code. It is wrapped in + * {@link Error} causing failure of the current workflow task. + * + * @param id unique identifier of this side effect + * @param updated used to decide if a new value should be recorded. A func result is recorded only + * if call to updated with stored and a new value as arguments returns true. It is not called + * for the first value. + * @param resultClass class of the side effect + * @param func function that produces a value. This function can contain non-deterministic code. + * @param options options for mutable side effect + * @see #sideEffect(Class, Functions.Func) + */ + public static R mutableSideEffect( + String id, + Class resultClass, + BiPredicate updated, + Func func, + MutableSideEffectOptions options) { + return WorkflowInternal.mutableSideEffect(id, resultClass, resultClass, updated, func, options); + } + + /** + * {@code mutableSideEffect} is similar to {@link #sideEffect(Class, Functions.Func)} in allowing + * calls of non-deterministic functions from workflow code. + * + *

The difference between {@code mutableSideEffect} and {@link #sideEffect(Class, + * Functions.Func)} is that every new {@code sideEffect} call in non-replay mode results in a new + * marker event recorded into the history. However, {@code mutableSideEffect} only records a new + * marker if a value has changed. During the replay, {@code mutableSideEffect} will not execute + * the function again, but it will return the exact same value as it was returning during the + * non-replay run. + * + *

One good use case of {@code mutableSideEffect} is to access a dynamically changing config + * without breaking determinism. Even if called very frequently the config value is recorded only + * when it changes not causing any performance degradation due to a large history size. + * + *

Caution: do not use {@code mutableSideEffect} function to modify any workflow state. Only + * use the mutableSideEffect's return value. + * + *

If function throws any exception it is not delivered to the workflow code. It is wrapped in + * {@link Error} causing failure of the current workflow task. + * + * @param id unique identifier of this side effect + * @param updated used to decide if a new value should be recorded. A func result is recorded only + * if call to updated with stored and a new value as arguments returns true. It is not called + * for the first value. + * @param resultClass class of the side effect + * @param resultType type of the side effect. Differs from resultClass for generic types. + * @param func function that produces a value. This function can contain non-deterministic code. + * @param options options for mutable side effect + * @see #sideEffect(Class, Functions.Func) + */ + public static R mutableSideEffect( + String id, + Class resultClass, + Type resultType, + BiPredicate updated, + Func func, + MutableSideEffectOptions options) { + return WorkflowInternal.mutableSideEffect(id, resultClass, resultType, updated, func, options); + } + /** * {@code getVersion} is used to safely perform backwards incompatible changes to workflow * definitions. It is not allowed to update workflow code while there are workflows running as it diff --git a/temporal-sdk/src/test/java/io/temporal/internal/statemachines/MutableSideEffectStateMachineTest.java b/temporal-sdk/src/test/java/io/temporal/internal/statemachines/MutableSideEffectStateMachineTest.java index 5f53c1b435..e65d900d09 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/statemachines/MutableSideEffectStateMachineTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/statemachines/MutableSideEffectStateMachineTest.java @@ -72,7 +72,7 @@ protected void buildWorkflow(AsyncWorkflowBuilder builder) { .>add1( (v, c) -> stateMachines.mutableSideEffect( - "id1", (p) -> converter.toPayloads("result1"), c)) + "id1", null, (p) -> converter.toPayloads("result1"), c)) .add((r) -> stateMachines.completeWorkflow(r)); } } @@ -129,13 +129,13 @@ class TestListener extends TestEntityManagerListenerBase { protected void buildWorkflow(AsyncWorkflowBuilder builder) { builder .>add1( - (v, c) -> stateMachines.mutableSideEffect("id1", (p) -> Optional.empty(), c)) + (v, c) -> stateMachines.mutableSideEffect("id1", null, (p) -> Optional.empty(), c)) .>add1( - (v, c) -> stateMachines.mutableSideEffect("id1", (p) -> Optional.empty(), c)) + (v, c) -> stateMachines.mutableSideEffect("id1", null, (p) -> Optional.empty(), c)) .>add1( (v, c) -> stateMachines.mutableSideEffect( - "id1", (p) -> converter.toPayloads("result1"), c)) + "id1", null, (p) -> converter.toPayloads("result1"), c)) .add((r) -> stateMachines.completeWorkflow(r)); } } @@ -205,9 +205,9 @@ protected void buildWorkflow(AsyncWorkflowBuilder builder) { .>add1( (v, c) -> stateMachines.mutableSideEffect( - "id1", (p) -> converter.toPayloads("result1"), c)) + "id1", null, (p) -> converter.toPayloads("result1"), c)) .>add1( - (v, c) -> stateMachines.mutableSideEffect("id1", (p) -> Optional.empty(), c)) + (v, c) -> stateMachines.mutableSideEffect("id1", null, (p) -> Optional.empty(), c)) .add1( (v, c) -> stateMachines.newTimer( @@ -225,11 +225,11 @@ protected void buildWorkflow(AsyncWorkflowBuilder builder) { null, c)) .>add1( - (v, c) -> stateMachines.mutableSideEffect("id1", (p) -> Optional.empty(), c)) + (v, c) -> stateMachines.mutableSideEffect("id1", null, (p) -> Optional.empty(), c)) .>add1( (v, c) -> stateMachines.mutableSideEffect( - "id1", (p) -> converter.toPayloads("result2"), c)) + "id1", null, (p) -> converter.toPayloads("result2"), c)) .add((r) -> stateMachines.completeWorkflow(r)); } } diff --git a/temporal-sdk/src/test/java/io/temporal/internal/statemachines/SideEffectStateMachineTest.java b/temporal-sdk/src/test/java/io/temporal/internal/statemachines/SideEffectStateMachineTest.java index e99c486b60..8459ceb49d 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/statemachines/SideEffectStateMachineTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/statemachines/SideEffectStateMachineTest.java @@ -69,7 +69,8 @@ protected void buildWorkflow(AsyncWorkflowBuilder builder) { builder .>add1( (v, c) -> - stateMachines.sideEffect(() -> converter.toPayloads("m1Arg1", "m1Arg2"), c)) + stateMachines.sideEffect( + () -> converter.toPayloads("m1Arg1", "m1Arg2"), null, c)) .>add1((r, c) -> result = r); } @@ -80,7 +81,7 @@ protected void signal(HistoryEvent signalEvent, AsyncWorkflowBuilder build builder .>add1( (r, c) -> { - stateMachines.sideEffect(() -> converter.toPayloads("m2Arg1"), c); + stateMachines.sideEffect(() -> converter.toPayloads("m2Arg1"), null, c); }) .add((r) -> stateMachines.completeWorkflow(Optional.empty())); } diff --git a/temporal-sdk/src/test/java/io/temporal/internal/statemachines/UpdateProtocolStateMachineTest.java b/temporal-sdk/src/test/java/io/temporal/internal/statemachines/UpdateProtocolStateMachineTest.java index 3d7d6c7266..f184ca2874 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/statemachines/UpdateProtocolStateMachineTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/statemachines/UpdateProtocolStateMachineTest.java @@ -251,7 +251,8 @@ protected void update(UpdateMessage message, AsyncWorkflowBuilder builder) .>add1( (v, c) -> { message.getCallbacks().accept(); - stateMachines.mutableSideEffect("id1", (p) -> converter.toPayloads("result1"), c); + stateMachines.mutableSideEffect( + "id1", null, (p) -> converter.toPayloads("result1"), c); }) .add( (r) -> { @@ -673,7 +674,8 @@ protected void update(UpdateMessage message, AsyncWorkflowBuilder builder) .>add1( (v, c) -> { message.getCallbacks().accept(); - stateMachines.mutableSideEffect("id1", (p) -> converter.toPayloads("result1"), c); + stateMachines.mutableSideEffect( + "id1", null, (p) -> converter.toPayloads("result1"), c); }) .add( (r) -> { @@ -906,7 +908,7 @@ protected void signal(HistoryEvent signalEvent, AsyncWorkflowBuilder build "signal1", signalEvent.getWorkflowExecutionSignaledEventAttributes().getSignalName()); builder.>add1( (r, c) -> { - stateMachines.sideEffect(() -> converter.toPayloads("m2Arg1"), c); + stateMachines.sideEffect(() -> converter.toPayloads("m2Arg1"), null, c); }); } diff --git a/temporal-sdk/src/test/java/io/temporal/testUtils/HistoryUtils.java b/temporal-sdk/src/test/java/io/temporal/testUtils/HistoryUtils.java index b846b45a85..36f525f204 100644 --- a/temporal-sdk/src/test/java/io/temporal/testUtils/HistoryUtils.java +++ b/temporal-sdk/src/test/java/io/temporal/testUtils/HistoryUtils.java @@ -3,8 +3,11 @@ import static io.temporal.internal.common.InternalUtils.createNormalTaskQueue; import static io.temporal.internal.common.InternalUtils.createStickyTaskQueue; import static io.temporal.testing.internal.TestServiceUtils.*; +import static org.junit.Assert.assertEquals; +import io.temporal.api.history.v1.HistoryEvent; import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse; +import io.temporal.common.converter.DefaultDataConverter; import io.temporal.serviceclient.WorkflowServiceStubs; import io.temporal.serviceclient.WorkflowServiceStubsOptions; import io.temporal.testserver.TestServer; @@ -82,4 +85,19 @@ public static PollWorkflowTaskQueueResponse generateWorkflowTaskWithPartialHisto return pollWorkflowTaskQueue( namespace, createStickyTaskQueue(stickyTaskQueueName, normalTaskQueueName), service); } + + public static void assertEventMetadata(HistoryEvent event, String summary, String details) { + if (summary != null) { + String describedSummary = + DefaultDataConverter.STANDARD_INSTANCE.fromPayload( + event.getUserMetadata().getSummary(), String.class, String.class); + assertEquals(summary, describedSummary); + } + if (details != null) { + String describedDetails = + DefaultDataConverter.STANDARD_INSTANCE.fromPayload( + event.getUserMetadata().getDetails(), String.class, String.class); + assertEquals(details, describedDetails); + } + } } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/MutableSideEffectTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/MutableSideEffectTest.java index fd307c2011..2518f164fb 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/MutableSideEffectTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/MutableSideEffectTest.java @@ -1,14 +1,21 @@ package io.temporal.workflow; +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.api.history.v1.HistoryEvent; +import io.temporal.client.WorkflowStub; +import io.temporal.common.WorkflowExecutionHistory; +import io.temporal.testUtils.HistoryUtils; import io.temporal.testing.internal.SDKTestWorkflowRule; import io.temporal.workflow.shared.TestWorkflows.TestWorkflow1; import java.time.Duration; import java.util.*; +import java.util.stream.Collectors; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; public class MutableSideEffectTest { + static final String mutableSideEffectSummary = "mutable-side-effect-summary"; private static final Map> mutableSideEffectValue = Collections.synchronizedMap(new HashMap<>()); @@ -35,6 +42,15 @@ public void testMutableSideEffect() { mutableSideEffectValue.put(testWorkflowRule.getTaskQueue(), values); String result = workflowStub.execute(testWorkflowRule.getTaskQueue()); Assert.assertEquals("1234, 1234, 1234, 3456, 3456, 4234, 4234, 4234", result); + + WorkflowExecution exec = WorkflowStub.fromTyped(workflowStub).getExecution(); + WorkflowExecutionHistory workflowExecutionHistory = + testWorkflowRule.getWorkflowClient().fetchHistory(exec.getWorkflowId()); + List sideEffectMarkerEvents = + workflowExecutionHistory.getEvents().stream() + .filter(HistoryEvent::hasMarkerRecordedEventAttributes) + .collect(Collectors.toList()); + HistoryUtils.assertEventMetadata(sideEffectMarkerEvents.get(0), mutableSideEffectSummary, null); } public static class TestMutableSideEffectWorkflowImpl implements TestWorkflow1 { @@ -49,7 +65,10 @@ public String execute(String taskQueue) { "id1", Long.class, (o, n) -> n > o, - () -> mutableSideEffectValue.get(taskQueue).poll()); + () -> mutableSideEffectValue.get(taskQueue).poll(), + MutableSideEffectOptions.newBuilder() + .setSummary(mutableSideEffectSummary) + .build()); if (result.length() > 0) { result.append(", "); } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/SideEffectTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/SideEffectTest.java index 1d661e454f..5ffc5aa89d 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/SideEffectTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/SideEffectTest.java @@ -2,6 +2,11 @@ import static org.junit.Assert.assertEquals; +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.api.history.v1.HistoryEvent; +import io.temporal.client.WorkflowStub; +import io.temporal.common.WorkflowExecutionHistory; +import io.temporal.testUtils.HistoryUtils; import io.temporal.testing.internal.SDKTestOptions; import io.temporal.testing.internal.SDKTestWorkflowRule; import io.temporal.testing.internal.TracingWorkerInterceptor; @@ -9,11 +14,14 @@ import io.temporal.workflow.shared.TestActivities.VariousTestActivities; import io.temporal.workflow.shared.TestWorkflows.TestWorkflow1; import java.time.Duration; +import java.util.List; +import java.util.stream.Collectors; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; public class SideEffectTest { + static final String sideEffectSummary = "side-effect-summary"; @Rule public SDKTestWorkflowRule testWorkflowRule = @@ -39,6 +47,15 @@ public void testSideEffect() { "sleep PT1S", "executeActivity customActivity1", "activity customActivity1"); + + WorkflowExecution exec = WorkflowStub.fromTyped(workflowStub).getExecution(); + WorkflowExecutionHistory workflowExecutionHistory = + testWorkflowRule.getWorkflowClient().fetchHistory(exec.getWorkflowId()); + List sideEffectMarkerEvents = + workflowExecutionHistory.getEvents().stream() + .filter(HistoryEvent::hasMarkerRecordedEventAttributes) + .collect(Collectors.toList()); + HistoryUtils.assertEventMetadata(sideEffectMarkerEvents.get(0), sideEffectSummary, null); } public static class TestSideEffectWorkflowImpl implements TestWorkflow1 { @@ -51,7 +68,11 @@ public String execute(String taskQueue) { SDKTestOptions.newActivityOptionsForTaskQueue(taskQueue)); long workflowTime = Workflow.currentTimeMillis(); - long time1 = Workflow.sideEffect(long.class, () -> workflowTime); + long time1 = + Workflow.sideEffect( + long.class, + () -> workflowTime, + SideEffectOptions.newBuilder().setSummary(sideEffectSummary).build()); long time2 = Workflow.sideEffect(long.class, () -> workflowTime); assertEquals(time1, time2); Workflow.sleep(Duration.ofSeconds(1)); diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/ActivityMetadataTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/ActivityMetadataTest.java index 29fe19c49d..62017c460c 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/ActivityMetadataTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/ActivityMetadataTest.java @@ -1,13 +1,11 @@ package io.temporal.workflow.activityTests; -import static org.junit.Assert.assertEquals; - import io.temporal.activity.ActivityOptions; import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.api.history.v1.HistoryEvent; import io.temporal.client.WorkflowStub; import io.temporal.common.WorkflowExecutionHistory; -import io.temporal.common.converter.DefaultDataConverter; +import io.temporal.testUtils.HistoryUtils; import io.temporal.testing.internal.SDKTestWorkflowRule; import io.temporal.workflow.Workflow; import io.temporal.workflow.shared.TestActivities; @@ -41,22 +39,7 @@ public void testActivityWithMetaData() { workflowExecutionHistory.getEvents().stream() .filter(HistoryEvent::hasActivityTaskScheduledEventAttributes) .collect(Collectors.toList()); - assertEventMetadata(activityScheduledEvents.get(0), activitySummary, null); - } - - private void assertEventMetadata(HistoryEvent event, String summary, String details) { - if (summary != null) { - String describedSummary = - DefaultDataConverter.STANDARD_INSTANCE.fromPayload( - event.getUserMetadata().getSummary(), String.class, String.class); - assertEquals(summary, describedSummary); - } - if (details != null) { - String describedDetails = - DefaultDataConverter.STANDARD_INSTANCE.fromPayload( - event.getUserMetadata().getDetails(), String.class, String.class); - assertEquals(details, describedDetails); - } + HistoryUtils.assertEventMetadata(activityScheduledEvents.get(0), activitySummary, null); } public static class TestWorkflowImpl implements TestWorkflow1 { diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/LocalActivityMetadataTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/LocalActivityMetadataTest.java index aa1048beb5..235d40ae66 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/LocalActivityMetadataTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/LocalActivityMetadataTest.java @@ -1,13 +1,11 @@ package io.temporal.workflow.activityTests; -import static org.junit.Assert.assertEquals; - import io.temporal.activity.LocalActivityOptions; import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.api.history.v1.HistoryEvent; import io.temporal.client.WorkflowStub; import io.temporal.common.WorkflowExecutionHistory; -import io.temporal.common.converter.DefaultDataConverter; +import io.temporal.testUtils.HistoryUtils; import io.temporal.testing.internal.SDKTestWorkflowRule; import io.temporal.workflow.Workflow; import io.temporal.workflow.shared.TestActivities; @@ -41,22 +39,8 @@ public void testLocalActivityWithMetaData() { workflowExecutionHistory.getEvents().stream() .filter(HistoryEvent::hasMarkerRecordedEventAttributes) .collect(Collectors.toList()); - assertEventMetadata(localActivityScheduledEvents.get(0), localActivitySummary, null); - } - - private void assertEventMetadata(HistoryEvent event, String summary, String details) { - if (summary != null) { - String describedSummary = - DefaultDataConverter.STANDARD_INSTANCE.fromPayload( - event.getUserMetadata().getSummary(), String.class, String.class); - assertEquals(summary, describedSummary); - } - if (details != null) { - String describedDetails = - DefaultDataConverter.STANDARD_INSTANCE.fromPayload( - event.getUserMetadata().getDetails(), String.class, String.class); - assertEquals(details, describedDetails); - } + HistoryUtils.assertEventMetadata( + localActivityScheduledEvents.get(0), localActivitySummary, null); } public static class TestWorkflowImpl implements TestWorkflow1 { diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/childWorkflowTests/ChildWorkflowMetadataTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/childWorkflowTests/ChildWorkflowMetadataTest.java index 2ca95016dc..c235300892 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/childWorkflowTests/ChildWorkflowMetadataTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/childWorkflowTests/ChildWorkflowMetadataTest.java @@ -8,7 +8,7 @@ import io.temporal.client.WorkflowOptions; import io.temporal.client.WorkflowStub; import io.temporal.common.WorkflowExecutionHistory; -import io.temporal.common.converter.DefaultDataConverter; +import io.temporal.testUtils.HistoryUtils; import io.temporal.testing.internal.SDKTestWorkflowRule; import io.temporal.workflow.ChildWorkflowOptions; import io.temporal.workflow.TimerOptions; @@ -57,7 +57,7 @@ public void testChildWorkflowWithMetaData() { workflowExecutionHistory.getEvents().stream() .filter(HistoryEvent::hasWorkflowExecutionStartedEventAttributes) .collect(Collectors.toList()); - assertEventMetadata(workflowStartedEvents.get(0), summary, details); + HistoryUtils.assertEventMetadata(workflowStartedEvents.get(0), summary, details); assertWorkflowMetadata(childWorkflowId, childSummary, childDetails); @@ -67,13 +67,13 @@ public void testChildWorkflowWithMetaData() { childWorkflowExecutionHistory.getEvents().stream() .filter(HistoryEvent::hasWorkflowExecutionStartedEventAttributes) .collect(Collectors.toList()); - assertEventMetadata(childWorkflowStartedEvents.get(0), childSummary, childDetails); + HistoryUtils.assertEventMetadata(childWorkflowStartedEvents.get(0), childSummary, childDetails); List timerStartedEvents = childWorkflowExecutionHistory.getEvents().stream() .filter(HistoryEvent::hasTimerStartedEventAttributes) .collect(Collectors.toList()); - assertEventMetadata(timerStartedEvents.get(0), childTimerSummary, null); + HistoryUtils.assertEventMetadata(timerStartedEvents.get(0), childTimerSummary, null); } private void assertWorkflowMetadata(String workflowId, String summary, String details) { @@ -83,21 +83,6 @@ private void assertWorkflowMetadata(String workflowId, String summary, String de assertEquals(details, describe.getStaticDetails()); } - private void assertEventMetadata(HistoryEvent event, String summary, String details) { - if (summary != null) { - String describedSummary = - DefaultDataConverter.STANDARD_INSTANCE.fromPayload( - event.getUserMetadata().getSummary(), String.class, String.class); - assertEquals(summary, describedSummary); - } - if (details != null) { - String describedDetails = - DefaultDataConverter.STANDARD_INSTANCE.fromPayload( - event.getUserMetadata().getDetails(), String.class, String.class); - assertEquals(details, describedDetails); - } - } - public static class TestParentWorkflow implements TestWorkflow1 { private final ITestChild child1 = diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/NexusOperationMetadataTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/NexusOperationMetadataTest.java index 9cf6bc70b8..1ef17fea39 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/NexusOperationMetadataTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/NexusOperationMetadataTest.java @@ -1,7 +1,5 @@ package io.temporal.workflow.nexus; -import static org.junit.Assert.assertEquals; - import io.nexusrpc.handler.OperationHandler; import io.nexusrpc.handler.OperationImpl; import io.nexusrpc.handler.ServiceImpl; @@ -9,7 +7,7 @@ import io.temporal.api.history.v1.HistoryEvent; import io.temporal.client.WorkflowStub; import io.temporal.common.WorkflowExecutionHistory; -import io.temporal.common.converter.DefaultDataConverter; +import io.temporal.testUtils.HistoryUtils; import io.temporal.testing.internal.SDKTestWorkflowRule; import io.temporal.workflow.NexusOperationOptions; import io.temporal.workflow.NexusServiceOptions; @@ -46,22 +44,8 @@ public void testOperationSummary() { workflowExecutionHistory.getEvents().stream() .filter(HistoryEvent::hasNexusOperationScheduledEventAttributes) .collect(Collectors.toList()); - assertEventMetadata(nexusOperationScheduledEvents.get(0), NEXUS_OPERATION_SUMMARY, null); - } - - private void assertEventMetadata(HistoryEvent event, String summary, String details) { - if (summary != null) { - String describedSummary = - DefaultDataConverter.STANDARD_INSTANCE.fromPayload( - event.getUserMetadata().getSummary(), String.class, String.class); - assertEquals(summary, describedSummary); - } - if (details != null) { - String describedDetails = - DefaultDataConverter.STANDARD_INSTANCE.fromPayload( - event.getUserMetadata().getDetails(), String.class, String.class); - assertEquals(details, describedDetails); - } + HistoryUtils.assertEventMetadata( + nexusOperationScheduledEvents.get(0), NEXUS_OPERATION_SUMMARY, null); } public static class TestNexus implements TestWorkflows.TestWorkflow1 { diff --git a/temporal-testing/src/main/java/io/temporal/internal/sync/DummySyncWorkflowContext.java b/temporal-testing/src/main/java/io/temporal/internal/sync/DummySyncWorkflowContext.java index 8cb818b751..9f7aa44b6d 100644 --- a/temporal-testing/src/main/java/io/temporal/internal/sync/DummySyncWorkflowContext.java +++ b/temporal-testing/src/main/java/io/temporal/internal/sync/DummySyncWorkflowContext.java @@ -253,13 +253,16 @@ public void run() { @Override public void sideEffect( - Functions.Func> func, Functions.Proc1> callback) { + Functions.Func> func, + UserMetadata userMetadata, + Functions.Proc1> callback) { callback.apply(func.apply()); } @Override public void mutableSideEffect( String id, + UserMetadata userMetadata, Functions.Func1, Optional> func, Functions.Proc1> callback) { callback.apply(func.apply(Optional.empty())); diff --git a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java index a266c45f22..1221dec557 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java +++ b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java @@ -37,11 +37,8 @@ import io.temporal.serviceclient.WorkflowServiceStubs; import io.temporal.serviceclient.WorkflowServiceStubsOptions; import io.temporal.worker.WorkerOptions; -import io.temporal.workflow.Functions; +import io.temporal.workflow.*; import io.temporal.workflow.Functions.Func; -import io.temporal.workflow.Promise; -import io.temporal.workflow.TimerOptions; -import io.temporal.workflow.Workflow; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Type; import java.nio.charset.StandardCharsets; @@ -412,12 +409,29 @@ public R sideEffect(Class resultClass, Type resultType, Func func) { throw new UnsupportedOperationException("not implemented"); } + @Override + public R sideEffect( + Class resultClass, Type resultType, Func func, SideEffectOptions options) { + throw new UnsupportedOperationException("not implemented"); + } + @Override public R mutableSideEffect( String id, Class resultClass, Type resultType, BiPredicate updated, Func func) { throw new UnsupportedOperationException("not implemented"); } + @Override + public R mutableSideEffect( + String id, + Class resultClass, + Type resultType, + BiPredicate updated, + Func func, + MutableSideEffectOptions options) { + throw new UnsupportedOperationException("not implemented"); + } + @Override public int getVersion(String changeId, int minSupported, int maxSupported) { throw new UnsupportedOperationException("not implemented"); diff --git a/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java b/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java index 8dfcd48b69..77d82bdaee 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java +++ b/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java @@ -134,8 +134,7 @@ List getImpl() { } } - private static class TracingWorkflowOutboundCallsInterceptor - implements WorkflowOutboundCallsInterceptor { + static class TracingWorkflowOutboundCallsInterceptor implements WorkflowOutboundCallsInterceptor { private final FilteredTrace trace; private final WorkflowOutboundCallsInterceptor next; @@ -258,6 +257,15 @@ public R sideEffect(Class resultClass, Type resultType, Functions.Func return next.sideEffect(resultClass, resultType, func); } + @Override + public R sideEffect( + Class resultClass, Type resultType, Functions.Func func, SideEffectOptions options) { + if (!WorkflowUnsafe.isReplaying()) { + trace.add("sideEffect"); + } + return next.sideEffect(resultClass, resultType, func, options); + } + @Override public R mutableSideEffect( String id, @@ -271,6 +279,20 @@ public R mutableSideEffect( return next.mutableSideEffect(id, resultClass, resultType, updated, func); } + @Override + public R mutableSideEffect( + String id, + Class resultClass, + Type resultType, + BiPredicate updated, + Functions.Func func, + MutableSideEffectOptions options) { + if (!WorkflowUnsafe.isReplaying()) { + trace.add("mutableSideEffect"); + } + return next.mutableSideEffect(id, resultClass, resultType, updated, func, options); + } + @Override public int getVersion(String changeId, int minSupported, int maxSupported) { if (!WorkflowUnsafe.isReplaying()) {